Service Counter
Before diving into this code, here's a quick heads-up on what you'll need to be familiar with:
- Python Programming: It's important to have a good grasp of Python, especially with concepts
like
functions,loops, andclasses, since the example utilizes these fundamentals. - Asynchronous Programming with asyncio: Familiarity with Python's asyncio for writing concurrent
code using the
async/awaitsyntax.
The Service Counter
is a subsequent example of the service_client example
where we will show how to use the publish method from EventServiceGrpc to publish messages
to later use the EventClient to interact with the service.
In particular, we will create a service that will have a counter running in a separate task and will publish the counter value at fixed rate. We will show how to use the client to subscribe to the service and will print the counter value every time it receives a message. In addition, the client will be able to request the service to reset the counter to zero.
Requirements
This example only requires the farm-ng-core package.
pip3 install farm-ng-core
Create the service
We first create a service
that will publish the counter value at a certain
rate. For this, we will create a counter.py
program that will
instantiate the EventServiceGrpc and will run the service leveraging
the serve method with the asyncio event loop.
In the same program, we will create a CounterService class that will
implement the logic of the service, including the concurrent task that
will run the counter. The CounterService class will also have a method
to handle the requests from the client. The requests method is a
coroutine that triggers the request_handler method in the service,
in that case to reset the counter to zero.
class CounterServer:
def __init__(self, event_service: EventServiceGrpc) -> None:
"""Initialize the service.
Args:
event_service: The event service to use for communication.
"""
self._event_service = event_service
self._event_service.add_request_reply_handler(self.request_reply_handler)
self._counter: int = 0
self._rate: float = 1.0
async def request_reply_handler(self, event: Event, message: Message) -> None:
"""The callback for handling request/reply messages."""
if event.uri.path == "/reset_counter":
self._counter = 0
return Empty()
async def run(self) -> None:
"""Run the main task."""
while True:
await self._event_service.publish("/counter", Int32Value(value=self._counter))
self._counter += 1
await asyncio.sleep(1.0 / self._rate)
async def serve(self) -> None:
await asyncio.gather(self._event_service.serve(), self.run())
Create the client
For the client, we will create a client.py program that will implement a thin wrapper
class CounterClient around the EventClient class. The CounterClient will
have a method to subscribe to the events stream coming from the /counter path.
class CounterClient:
def __init__(self, service_config: EventServiceConfig) -> None:
"""Initialize the client.
Args:
service_config: The service config.
"""
self._event_client = EventClient(service_config)
async def subscribe(self) -> None:
"""Run the main task."""
async for event, message in self._event_client.subscribe(
request=SubscribeRequest(uri=Uri(path="/counter"), every_n=1), decode=True
):
print(f"Received message: {message}")
In the same program, we will create a main function that will instantiate the
EventServiceConfig and the CounterClient. The main function will have a
couple high level commands to subscribe to the /counter path and to request
the service to reset the counter to zero.
async def command_subscribe(client: CounterClient) -> None:
"""Subscribe to the counter service."""
await client.subscribe()
async def command_reset(client: CounterClient) -> None:
"""Reset the counter."""
await client._event_client.request_reply("/reset_counter", Empty())
Run the example
1. Run the service
In a first terminal, run the service:
python counter.py --service-config service_config.json
you should see the following output:
Starting server on port 5001
Server started
2. Subscribe to the service
In a second terminal, run the client:
python client.py --service-config service_config.json subscribe
you should see the following output and the counter value increasing:
Received message: value: 3
Received message: value: 4
Received message: value: 5
Received message: value: 6
...
...
3. Reset the counter
In a third terminal, run the client:
python client.py --service-config service_config.json reset
you should see the following output:
Received message:
Received message: value: 1
Received message: value: 2
Received message: value: 3
...
...