Update Network State Client
The UpdateNetworkStateClient will allow you to interact with a server running the UpdateNetworkStateService (e.g. EWB). It provides an object-oriented
wrapper for the gRPC library, with the ability to update information about the state of the network. This is done with the following 3 steps:
- Create a gRPC connection to the server.
- Create an instance of the
UpdateNetworkStateClientusing your gRPC connection. - Use your
UpdateNetworkStateClientto update the state of the network.
Creating a gRPC channel
The channel gRPC channel can be directly from the gRPC library, or using our GrpcChannelBuilder helper. At its most basic, this can be achieved with:
from zepben.ewb import GrpcChannelBuilder
channel = GrpcChannelBuilder().for_address(host, port).build()
For more in depth options for using a gRPC channel, see the gRPC documentation, or look up a tutorial.
Using a gRPC channel with your client
Using your gRPC channel with the UpdateNetworkStateClient is as simple as passing it to the constructor.
from zepben.ewb import UpdateNetworkStateClient
client = UpdateNetworkStateClient(channel)
Using your client to update the network state
Now that you have a client, you can use it to update the state of the network on the connected server.
Updating current network state
The current state of the network can be updated using the set_current_states function on the UpdateNetworkStateClient. All events passed in the same list will
be executed as a batch.
Updating current switch state
The current state of switches can be updating by passing a SwitchStateEvent to the set_current_states function.
from datetime import datetime
from zepben.ewb import SwitchStateEvent, SwitchAction
event1 = SwitchStateEvent("event1", datetime.now(), "switch_id_1", SwitchAction.OPEN)
event2 = SwitchStateEvent("event2", datetime.now(), "switch_id_2", SwitchAction.CLOSE)
response = client.set_current_states(1, (event1, event2))
Adding cuts
You can cut an AcLineSegment in the current state of the network by passing an AddCutEvent to the set_current_states function.
from datetime import datetime
from zepben.ewb import AddCutEvent
event1 = AddCutEvent("event1", datetime.now(), "cut_id", "acls_id")
response = client.set_current_states(1, (event1))
Removing cuts
You can remove a previously added cut from the current state of the network by passing a RemoveCutEvent to the set_current_states function.
from datetime import datetime
from zepben.ewb import RemoveCutEvent
event1 = RemoveCutEvent("event1", datetime.now(), "cut_id")
response = client.set_current_states(1, (event1))
Adding jumpers
You can add a jumper between two other pieces of equipment in the current state of the network by passing an AddJumperEvent to the set_current_states function.
from datetime import datetime
from zepben.ewb import AddJumperEvent, JumperConnection
event1 = AddJumperEvent("event1", datetime.now(), "jumper_id", JumperConnection("from_id", JumperConnection("to_id")))
response = client.set_current_states(1, (event1))
Removing jumpers
You can remove a previously added jumper from the current state of the network by passing a RemoveJumperEvent to the set_current_states function.
from datetime import datetime
from zepben.ewb import RemoveJumperEvent
event1 = RemoveJumperEvent("event1", datetime.now(), "jumper_id")
response = client.set_current_states(1, (event1))
Multiple Requests
If you have multiple batches to send, you can use set_current_states_in_batches rather than calling set_current_states multiple times
from datetime import datetime
from zepben.ewb import SwitchStateEvent, SwitchAction
event1 = SwitchStateEvent("event1", datetime.now(), "switch_id_1", SwitchAction.OPEN)
event2 = SwitchStateEvent("event2", datetime.now(), "switch_id_2", SwitchAction.CLOSE)
async def events_in_batches():
yield UpdateNetworkStateClient.SetCurrentStatesRequest(1, (event1,))
yield UpdateNetworkStateClient.SetCurrentStatesRequest(2, (event2,))
async for response in client.set_current_states_in_batches(events_in_batches()):
# Process your responses here. You will get a response per batch.
Batch result responses
Each batch will receive its own response, which will be one of the following:
BatchSuccessful- Indicates that all events in the batch were processed successfully. Events that are ignored because they set the state to one that is already present, or are skipped due to a later event applying the opposite action, will be marked as successful.ProcessingPaused- Indicates the entire batch was ignore as current state processing in teh server is currently paused. The response will include the time the server was paused.BatchFailure- Indicates at least one event in the batch could not be applied. Each event that failed will indicate why it failed, some of which will have more impact than others.StateEventUnknownMrid- ThemRIDof the event could not be found in the network hosted by this server.StateEventDuplicateMrid- ThemRIDof an item being added to the network is already in use.StateEventInvalidMrid- ThemRIDof the item being addressed in the event is of a different type than expected. e.g. You can't remove aJumperwith anmRIDthat belongs to aCut.StateEventUnsupportedPhasing- You tried to specify phases that do not make sense to the item being updated. When using the default phasing ofNONEyou will never receive this error. Until un-ganged switching is supported, this error will be returned for all events that specify phases.
You can check the type of response or failure by checking against the types above.
from zepben.ewb import BatchFailure, StateEventUnknownMrid
if isinstance(response, BatchFailure):
response.partial_failure # Will be true if all event failed, otherwise false.
for failure in response.failures:
if isinstance(failure, StateEventUnknownMrid):
# Process failure