Skip to main content
Version: 0.43.1

Update Network State Service

An implementation of the UpdateNetworkStateService will provide an object-oriented wrapper for the gRPC library, with the ability to update information about the state of the network. This is done with the following 3 steps:

  1. Create callbacks which will be called when gRPC messages are received from a client.
  2. Register the callbacks with an instance of the UpdateNetworkStateService.
  3. Add your UpdateNetworkStateService instance as a gRPC service.

Creating callbacks

Each supported message in UpdateNetworkStateService requires a callback, which will be triggered when any client messages are received.

on_set_current_states

The on_set_current_states callback is triggered for each request passing in a batch of current state events, and should return a batch result response to reflect the success or failure of the update process.

from typing import AsyncGenerator, Tuple

from zepben.evolve import CurrentStateEvent, SetCurrentStatesStatus


async def on_set_current_states(batches: AsyncGenerator[Tuple[int, Tuple[CurrentStateEvent, ...]], None]) -> AsyncGenerator[
SetCurrentStatesStatus, None]:
async for batch_id, events in batches:
# process updating of events here and return a batch result response

Registering callbacks

Registering the callbacks with the service is as simple as passing them into the UpdateNetworkStateService constructor.

service = UpdateNetworkStateService(on_set_current_states)

Registering the service

For the above code to have any effect, you need to register the service with a gRPC server. Once this has been done, you should start to receive callbacks for each request sent from a gRPC client.

import grpc
from zepben.protobuf.ns.network_state_pb2_grpc import add_UpdateNetworkStateServiceServicer_to_server

from zepben.evolve import UpdateNetworkStateService

server = grpc.aio.server()
host = 'localhost:50051'
server.add_insecure_port(host)

add_UpdateNetworkStateServiceServicer_to_server(service, server)

await server.start()

Putting it all together

Putting each of the steps above together, you can build the scaffold of a working application

from typing import AsyncGenerator, Tuple

import grpc
from zepben.protobuf.ns.network_state_pb2_grpc import add_UpdateNetworkStateServiceServicer_to_server

from zepben.evolve import CurrentStateEvent, SetCurrentStatesStatus, UpdateNetworkStateService, BatchSuccessful


class UpdateNetworkStateServiceImpl:
def __init__(self):
self.service = UpdateNetworkStateService(self.on_set_current_states)

async def on_set_current_states(self, batches: AsyncGenerator[Tuple[int, Tuple[CurrentStateEvent, ...]], None]) -> AsyncGenerator[
SetCurrentStatesStatus, None]:
async for batch_id, events in batches:
# process updating of events here and return a batch result response

async def main():
server = grpc.aio.server()
host = 'localhost:50051'
server.add_insecure_port(host)

service = UpdateNetworkStateServiceImpl().service
add_UpdateNetworkStateServiceServicer_to_server(service, server)

await server.start()