Query Network State Service
An implementation of the QueryNetworkStateService
will provide an object-oriented wrapper for the gRPC library, with the ability to retrieve information about
the state of the network. This is done with the following 3 steps:
- Create callbacks which will be called when gRPC messages are received from a client.
- Register the callbacks with an instance of the
QueryNetworkStateService
. - Add your
QueryNetworkStateService
instance as a gRPC service.
Creating callbacks
Each supported message in QueryNetworkStateService
requires a callback, which will be triggered when any client messages are received.
on_get_current_states
The on_get_current_states
callback is triggered for each request for the current states between two date/times, and should return an async iterator of the
current state events that occurred between those date/times (inclusive)
from datetime import datetime
from typing import AsyncGenerator, Iterable
from zepben.evolve import CurrentStateEvent
async def on_get_current_states(from_datetime: datetime, to_datetime: datetime) -> AsyncGenerator[Iterable[CurrentStateEvent], None]:
events = []
# build the batch of events
yield events
Registering callbacks
Registering the callbacks with the service is as simple as passing them into the QueryNetworkStateService
constructor.
service = QueryNetworkStateService(on_get_current_states)
Registering the service
For the above code to have any effect, you need to register the service with a gRPC server. Once this has been done, you should start to receive callbacks for each request sent from a gRPC client.
import grpc
from zepben.protobuf.ns.network_state_pb2_grpc import add_QueryNetworkStateServiceServicer_to_server
server = grpc.aio.server()
host = 'localhost:50051'
server.add_insecure_port(host)
add_QueryNetworkStateServiceServicer_to_server(service, server)
await server.start()
Putting it all together
Putting each of the steps above together, you can build the scaffold of a working application
from datetime import datetime
from typing import AsyncGenerator, Iterable
import grpc
from zepben.protobuf.ns.network_state_pb2_grpc import add_QueryNetworkStateServiceServicer_to_server
from zepben.evolve import CurrentStateEvent, QueryNetworkStateService
class QueryNetworkStateServiceImpl:
def __init__(self):
self.service = QueryNetworkStateService(self.on_get_current_states)
async def on_get_current_states(self, from_datetime: datetime, to_datetime: datetime) -> AsyncGenerator[Iterable[CurrentStateEvent], None]:
events = []
# build the batch of events
yield events
async def main():
server = grpc.aio.server()
host = 'localhost:50051'
server.add_insecure_port(host)
service = QueryNetworkStateServiceImpl().service
add_QueryNetworkStateServiceServicer_to_server(service, server)
await server.start()