Query Network State Service
An implementation of the QueryNetworkStateService
will provide an object-oriented wrapper for the gRPC library, with the ability to retrieve information about
the state of the network. This is done with the following 3 steps:
- Create callbacks which will be called when gRPC messages are received from a client.
- Register the callbacks with an instance of the
QueryNetworkStateService
. - Add your
QueryNetworkStateService
instance as a gRPC service.
Creating callbacks
Each supported message in QueryNetworkStateService
requires a callback, which will be triggered when any client messages are received.
on_get_current_states
The on_get_current_states
callback is triggered for each request for the current states between two date/times, and should return an async iterator of the
current state events that occurred between those date/times (inclusive)
from datetime import datetime
from typing import AsyncGenerator, Iterable
from zepben.evolve import CurrentStateEventBatch
async def on_get_current_states(from_datetime: datetime, to_datetime: datetime) -> AsyncGenerator[CurrentStateEventBatch, None]:
events = []
# build the batch of events
yield events
onCurrentStatesStatus
The onCurrentStatesStatus
callback is triggered for each status response sent by the client. You should expect to receive one of these for every batch
returned from onGetCurrentStates
.
from zepben.evolve import SetCurrentStatesStatus
def on_current_states_status(event_status: SetCurrentStatesStatus):
# Do something with the `eventStatus`.
onProcessingError
The onProcessingError
callback is triggered for any errors in your onCurrentStatesStatus
callback, or if any [SetCurrentStatesResponse] is for an unknown
event status.
def on_processing_error(error: Exception):
# Do something with the `error`.
Registering callbacks
Registering the callbacks with the service is as simple as passing them into the QueryNetworkStateService
constructor.
service = QueryNetworkStateService(on_get_current_states, on_current_states_status, on_processing_error)
Registering the service
For the above code to have any effect, you need to register the service with a gRPC server. Once this has been done, you should start to receive callbacks for each request sent from a gRPC client.
import grpc
from zepben.protobuf.ns.network_state_pb2_grpc import add_QueryNetworkStateServiceServicer_to_server
server = grpc.aio.server()
host = 'localhost:50051'
server.add_insecure_port(host)
add_QueryNetworkStateServiceServicer_to_server(service, server)
await server.start()
Putting it all together
Putting each of the steps above together, you can build the scaffold of a working application
from datetime import datetime
from typing import AsyncGenerator, Iterable
import grpc
from zepben.protobuf.ns.network_state_pb2_grpc import add_QueryNetworkStateServiceServicer_to_server
from zepben.evolve import CurrentStateEventBatch, QueryNetworkStateService, SetCurrentStatesStatus
class QueryNetworkStateServiceImpl:
def __init__(self):
self.service = QueryNetworkStateService(self.on_get_current_states, self.on_current_states_status, self.on_processing_error)
async def on_get_current_states(self, from_datetime: datetime, to_datetime: datetime) -> AsyncGenerator[CurrentStateEventBatch, None]:
events = []
# build the batch of events
yield events
def on_current_states_status(event_status: SetCurrentStatesStatus):
# Do something with the `eventStatus`.
def on_processing_error(error: Exception):
# Do something with the `error`.
async def main():
server = grpc.aio.server()
host = 'localhost:50051'
server.add_insecure_port(host)
service = QueryNetworkStateServiceImpl().service
add_QueryNetworkStateServiceServicer_to_server(service, server)
await server.start()