Update Network State Service
An implementation of the UpdateNetworkStateService
will provide an object-oriented wrapper for the gRPC library, with the ability to update information about
the state of the network. This is done with the following 3 steps:
- Create callbacks which will be called when gRPC messages are received from a client.
- Register the callbacks with an instance of the
UpdateNetworkStateService
. - Add your
UpdateNetworkStateService
instance as a gRPC service.
Creating callbacks
Each supported message in UpdateNetworkStateService
requires a callback, which will be triggered when any client messages are received.
on_set_current_states
The on_set_current_states
callback is triggered for each request passing in a batch of current state events, and should return a
batch result response to reflect the success or failure of the update process.
from typing import AsyncGenerator, Tuple
from zepben.evolve import CurrentStateEvent, SetCurrentStatesStatus
async def on_set_current_states(batches: AsyncGenerator[Tuple[int, Tuple[CurrentStateEvent, ...]], None]) -> AsyncGenerator[
SetCurrentStatesStatus, None]:
async for batch_id, events in batches:
# process updating of events here and return a batch result response
Registering callbacks
Registering the callbacks with the service is as simple as passing them into the UpdateNetworkStateService
constructor.
service = UpdateNetworkStateService(on_set_current_states)
Registering the service
For the above code to have any effect, you need to register the service with a gRPC server. Once this has been done, you should start to receive callbacks for each request sent from a gRPC client.
import grpc
from zepben.protobuf.ns.network_state_pb2_grpc import add_UpdateNetworkStateServiceServicer_to_server
from zepben.evolve import UpdateNetworkStateService
server = grpc.aio.server()
host = 'localhost:50051'
server.add_insecure_port(host)
add_UpdateNetworkStateServiceServicer_to_server(service, server)
await server.start()
Putting it all together
Putting each of the steps above together, you can build the scaffold of a working application
from typing import AsyncGenerator, Tuple
import grpc
from zepben.protobuf.ns.network_state_pb2_grpc import add_UpdateNetworkStateServiceServicer_to_server
from zepben.evolve import CurrentStateEvent, SetCurrentStatesStatus, UpdateNetworkStateService, BatchSuccessful
class UpdateNetworkStateServiceImpl:
def __init__(self):
self.service = UpdateNetworkStateService(self.on_set_current_states)
async def on_set_current_states(self, batches: AsyncGenerator[Tuple[int, Tuple[CurrentStateEvent, ...]], None]) -> AsyncGenerator[
SetCurrentStatesStatus, None]:
async for batch_id, events in batches:
# process updating of events here and return a batch result response
async def main():
server = grpc.aio.server()
host = 'localhost:50051'
server.add_insecure_port(host)
service = UpdateNetworkStateServiceImpl().service
add_UpdateNetworkStateServiceServicer_to_server(service, server)
await server.start()