Query Network State Service
An implementation of the QueryNetworkStateService
will provide an object-oriented wrapper for the gRPC library, with the ability to retrieve information about
the state of the network. This is done with the following 3 steps:
- Create callbacks which will be called when gRPC messages are received from a client.
- Register the callbacks with an instance of the
QueryNetworkStateService
. - Add your
QueryNetworkStateService
instance as a gRPC service.
The below examples make use of a helper library provided by Zepben, which is included as a transitive dependency of the SDK, or can be added directly from Maven Central.
You do NOT have to do this, you can register the services directly with a Netty gRPC (or other) server instead.
The classes you need for performing these actions can be imported from the SDK:
- Java
- Kotlin
import com.zepben.evolve.conn.grpc.GrpcServer;
import com.zepben.evolve.streaming.data.CurrentStateEvent;
import com.zepben.evolve.streaming.get.QueryNetworkStateService;
import com.zepben.evolve.conn.grpc.GrpcServer
import com.zepben.evolve.streaming.data.CurrentStateEvent
import com.zepben.evolve.streaming.get.QueryNetworkStateService
Creating callbacks
Each supported message in QueryNetworkStateService
requires a callback, which will be triggered when any client messages are received. Implementation of these
callbacks can be done as either lambdas, or full method/function implementations that can be passed as method references.
onGetCurrentStates
The onGetCurrentStates
callback is triggered for each request for the current states between two date/times, and should return a list of the current state
events that occurred between those date/times (inclusive)
- Java
- Kotlin
// Using a lambda expression
QueryNetworkStateService.GetCurrentStates getCurrentStatesLambda = (from, to) ->
Stream.of(
// Put the batches you wish to return here, e.g. List.of(item1, item2), List.of(item3).
);
// Using a method
class QueryNetworkStateServiceImpl {
Stream<List<CurrentStateEvent>> getCurrentStates(LocalDateTime from, LocalDateTime to){
var batches = ... // Some logic to build the batches of events.
return batches.stream();
}
}
// Using a lambda expression
var getCurrentStatesLambda = { from: LocalDateTime?, to: LocalDateTime? ->
sequenceOf<List<CurrentStateEvent>>(
// Put the batches you wish to return here, e.g. List.of(item1, item2), List.of(item3).
)
}
// Using a method
class QueryNetworkStateServiceImpl {
fun getCurrentStates(from: LocalDateTime?, to: LocalDateTime?): Sequence<List<CurrentStateEvent>> {
val batches = ... // Some logic to build the batches of events.
return batches.asSequence()
}
}
Registering callbacks
Registering the callbacks with the service is as simple as passing them into the QueryNetworkStateService
constructor.
- Java
- Kotlin
// Using lambda expressions
QueryNetworkStateService service = new QueryNetworkStateService(getCurrentStatesLambda);
// Using method references
class QueryNetworkStateServiceImpl {
QueryNetworkStateService service = new QueryNetworkStateService(this::getCurrentStates);
}
// Using lambda expressions
val service = QueryNetworkStateService(getCurrentStatesLambda)
// Using method references
class QueryNetworkStateServiceImpl {
val service = QueryNetworkStateService(::getCurrentStates);
}
Registering the service
For the above code to have any effect, you need to register the service with a gRPC server. Once this has been done, you should start to receive callbacks for each request sent from a gRPC client.
- Java
- Kotlin
class Main {
public static void main(String[] args) {
var grpcServer = new GrpcServerImpl(9001, service);
grpcServer.start();
}
class GrpcServerImpl extends GrpcServer {
GrpcServerImpl(int port, QueryNetworkStateService service) {
super(port, 0, null, List.of());
getServerBuilder().addService(service);
}
}
}
fun main() {
val grpcServer = object : GrpcServer(9001) {
init {
serverBuilder.addService(service)
}
}
grpcServer.start()
}
Putting it all together
Putting each of the steps above together, you can build the scaffold of a working application
- Java Lambdas
- Java Methods
- Kotlin Lambdas
- Kotlin Methods
Main.java
:
import com.zepben.evolve.streaming.get.QueryNetworkStateService;
import com.zepben.evolve.streaming.data.CurrentStateEvent;
import java.util.stream.Stream;
class Main {
public static void main(String[] args) {
QueryNetworkStateService service = new QueryNetworkStateService(
(QueryNetworkStateService.GetCurrentStates) (from, to) -> Stream.of(
// Put the batches you wish to return here, e.g. List.of(item1, item2), List.of(item3).
)
);
var grpcServer = new GrpcServerImpl(9001, service);
grpcServer.start();
}
}
GrpcServerImpl.java
:
import com.zepben.evolve.conn.grpc.GrpcServer;
import com.zepben.evolve.streaming.get.QueryNetworkStateService;
import java.util.List;
class GrpcServerImpl extends GrpcServer {
GrpcServerImpl(int port, QueryNetworkStateService service) {
super(port, 0, null, List.of());
getServerBuilder().addService(service);
}
}
Main.java
:
class Main {
public static void main(String[] args) {
var service = new QueryNetworkStateServiceImpl();
var grpcServer = new GrpcServerImpl(9001, service.service);
grpcServer.start();
}
}
GrpcServerImpl.java
:
import com.zepben.evolve.conn.grpc.GrpcServer;
import com.zepben.evolve.streaming.get.QueryNetworkStateService;
import java.util.List;
class GrpcServerImpl extends GrpcServer {
GrpcServerImpl(int port, QueryNetworkStateService service) {
super(port, 0, null, List.of());
getServerBuilder().addService(service);
}
}
QueryNetworkStateServiceImpl.java
:
import com.zepben.evolve.streaming.data.CurrentStateEvent;
import com.zepben.evolve.streaming.get.QueryNetworkStateService;
import java.time.LocalDateTime;
import java.util.List;
import java.util.stream.Stream;
class QueryNetworkStateServiceImpl {
QueryNetworkStateService service = new QueryNetworkStateService(this::getCurrentStates);
Stream<List<CurrentStateEvent>> getCurrentStates(LocalDateTime from, LocalDateTime to){
var batches = ... // Some logic to build the batches of events.
return batches.stream();
}
}
import com.zepben.evolve.conn.grpc.GrpcServer
import com.zepben.evolve.streaming.data.CurrentStateEvent
import com.zepben.evolve.streaming.get.QueryNetworkStateService
fun main() {
val service = QueryNetworkStateService(
onGetCurrentStates = { from, to ->
sequenceOf(
// Put the batches you wish to return here, e.g. List.of(item1, item2), List.of(item3).
)
}
)
val grpcServer = object : GrpcServer(9001) {
init {
serverBuilder.addService(service)
}
}
grpcServer.start()
}
import com.zepben.evolve.conn.grpc.GrpcServer
import com.zepben.evolve.streaming.data.CurrentStateEvent
import com.zepben.evolve.streaming.get.QueryNetworkStateService
import java.time.LocalDateTime
class QueryNetworkStateServiceImpl {
val service = QueryNetworkStateService(::getCurrentStates);
fun getCurrentStates(from: LocalDateTime?, to: LocalDateTime?): Sequence<List<CurrentStateEvent>> {
val batches = ... // Some logic to build the batches of events.
return batches.asSequence()
}
}
fun main() {
val service = QueryNetworkStateServiceImpl().service
val grpcServer = object : GrpcServer(9001) {
init {
serverBuilder.addService(service)
}
}
grpcServer.start()
}