Skip to main content
Version: Next

Query Network State Service

An implementation of the QueryNetworkStateService will provide an object-oriented wrapper for the gRPC library, with the ability to retrieve information about the state of the network. This is done with the following 3 steps:

  1. Create callbacks which will be called when gRPC messages are received from a client.
  2. Register the callbacks with an instance of the QueryNetworkStateService.
  3. Add your QueryNetworkStateService instance as a gRPC service.

Creating callbacks

Each supported message in QueryNetworkStateService requires a callback, which will be triggered when any client messages are received.

on_get_current_states

The on_get_current_states callback is triggered for each request for the current states between two date/times, and should return an async iterator of the current state events that occurred between those date/times (inclusive)

from datetime import datetime
from typing import AsyncGenerator, Iterable

from zepben.evolve import CurrentStateEvent


async def on_get_current_states(from_datetime: datetime, to_datetime: datetime) -> AsyncGenerator[Iterable[CurrentStateEvent], None]:
events = []
# build the batch of events
yield events

Registering callbacks

Registering the callbacks with the service is as simple as passing them into the QueryNetworkStateService constructor.

service = QueryNetworkStateService(on_get_current_states)

Registering the service

For the above code to have any effect, you need to register the service with a gRPC server. Once this has been done, you should start to receive callbacks for each request sent from a gRPC client.

import grpc
from zepben.protobuf.ns.network_state_pb2_grpc import add_QueryNetworkStateServiceServicer_to_server


server = grpc.aio.server()
host = 'localhost:50051'
server.add_insecure_port(host)

add_QueryNetworkStateServiceServicer_to_server(service, server)

await server.start()

Putting it all together

Putting each of the steps above together, you can build the scaffold of a working application

from datetime import datetime
from typing import AsyncGenerator, Iterable

import grpc
from zepben.protobuf.ns.network_state_pb2_grpc import add_QueryNetworkStateServiceServicer_to_server

from zepben.evolve import CurrentStateEvent, QueryNetworkStateService

class QueryNetworkStateServiceImpl:

def __init__(self):
self.service = QueryNetworkStateService(self.on_get_current_states)

async def on_get_current_states(self, from_datetime: datetime, to_datetime: datetime) -> AsyncGenerator[Iterable[CurrentStateEvent], None]:
events = []
# build the batch of events
yield events

async def main():
server = grpc.aio.server()
host = 'localhost:50051'
server.add_insecure_port(host)

service = QueryNetworkStateServiceImpl().service
add_QueryNetworkStateServiceServicer_to_server(service, server)

await server.start()