Skip to main content
Version: Next

Query Network State Service

An implementation of the QueryNetworkStateService will provide an object-oriented wrapper for the gRPC library, with the ability to retrieve information about the state of the network. This is done with the following 3 steps:

  1. Create callbacks which will be called when gRPC messages are received from a client.
  2. Register the callbacks with an instance of the QueryNetworkStateService.
  3. Add your QueryNetworkStateService instance as a gRPC service.

Creating callbacks

Each supported message in QueryNetworkStateService requires a callback, which will be triggered when any client messages are received.

on_get_current_states

The on_get_current_states callback is triggered for each request for the current states between two date/times, and should return an async iterator of the current state events that occurred between those date/times (inclusive)

from datetime import datetime
from typing import AsyncGenerator, Iterable

from zepben.evolve import CurrentStateEventBatch


async def on_get_current_states(from_datetime: datetime, to_datetime: datetime) -> AsyncGenerator[CurrentStateEventBatch, None]:
events = []
# build the batch of events
yield events

onCurrentStatesStatus

The onCurrentStatesStatus callback is triggered for each status response sent by the client. You should expect to receive one of these for every batch returned from onGetCurrentStates.

from zepben.evolve import SetCurrentStatesStatus


def on_current_states_status(event_status: SetCurrentStatesStatus):
# Do something with the `eventStatus`.

onProcessingError

The onProcessingError callback is triggered for any errors in your onCurrentStatesStatus callback, or if any [SetCurrentStatesResponse] is for an unknown event status.

def on_processing_error(error: Exception):
# Do something with the `error`.

Registering callbacks

Registering the callbacks with the service is as simple as passing them into the QueryNetworkStateService constructor.

service = QueryNetworkStateService(on_get_current_states, on_current_states_status, on_processing_error)

Registering the service

For the above code to have any effect, you need to register the service with a gRPC server. Once this has been done, you should start to receive callbacks for each request sent from a gRPC client.

import grpc
from zepben.protobuf.ns.network_state_pb2_grpc import add_QueryNetworkStateServiceServicer_to_server


server = grpc.aio.server()
host = 'localhost:50051'
server.add_insecure_port(host)

add_QueryNetworkStateServiceServicer_to_server(service, server)

await server.start()

Putting it all together

Putting each of the steps above together, you can build the scaffold of a working application

from datetime import datetime
from typing import AsyncGenerator, Iterable

import grpc
from zepben.protobuf.ns.network_state_pb2_grpc import add_QueryNetworkStateServiceServicer_to_server

from zepben.evolve import CurrentStateEventBatch, QueryNetworkStateService, SetCurrentStatesStatus

class QueryNetworkStateServiceImpl:

def __init__(self):
self.service = QueryNetworkStateService(self.on_get_current_states, self.on_current_states_status, self.on_processing_error)

async def on_get_current_states(self, from_datetime: datetime, to_datetime: datetime) -> AsyncGenerator[CurrentStateEventBatch, None]:
events = []
# build the batch of events
yield events

def on_current_states_status(event_status: SetCurrentStatesStatus):
# Do something with the `eventStatus`.

def on_processing_error(error: Exception):
# Do something with the `error`.

async def main():
server = grpc.aio.server()
host = 'localhost:50051'
server.add_insecure_port(host)

service = QueryNetworkStateServiceImpl().service
add_QueryNetworkStateServiceServicer_to_server(service, server)

await server.start()