Python Implementation
Similar to the last tutorial, this will give a break down of the most important features of the camera-streamer repository
The Python implementation of the
camera-streamer
app can be found at
src/main.py
.
You should open that file for reference as you follow along.
Rename Classes
In the camera-streamer repository,
we have renamed some
keywords. Begin by navigating to src/main.py
in your app and open it.
There are 2 places to change the templated name:
# 1. Rename the class
class CameraApp(App):
def __init__(self) -> None:
super().__init__()
...
# 2. Run with the new class name
try:
loop.run_until_complete(CameraApp().app_func())
except asyncio.CancelledError:
pass
Update Imports
This app is app will be the first real taste of the Amiga SDK. We will now need to import farm-ng libraries to access the oak camera streams.
from __future__ import annotations
import argparse
import asyncio
import logging
import os
from pathlib import Path
from typing import Literal
from farm_ng.core.event_client import EventClient
from farm_ng.core.event_service_pb2 import EventServiceConfig
from farm_ng.core.event_service_pb2 import EventServiceConfigList
from farm_ng.core.event_service_pb2 import SubscribeRequest
from farm_ng.core.events_file_reader import payload_to_protobuf
from farm_ng.core.events_file_reader import proto_from_json_file
from farm_ng.core.uri_pb2 import Uri
from turbojpeg import TurboJPEG
os.environ["KIVY_NO_ARGS"] = "1"
from kivy.config import Config # noreorder # noqa: E402
Config.set("graphics", "resizable", False)
Config.set("graphics", "width", "1280")
Config.set("graphics", "height", "800")
Config.set("graphics", "fullscreen", "false")
Config.set("input", "mouse", "mouse,disable_on_activity")
Config.set("kivy", "keyboard_mode", "systemanddock")
from kivy.app import App # noqa: E402
from kivy.lang.builder import Builder # noqa: E402
from kivy.graphics.texture import Texture # noqa: E402
logger = logging.getLogger("amiga.apps.camera")
You can go line-by-line, however, we recommend that imports above class CameraApp(App) looks like the list above.
Update: CameraApp()
The camera app will generally follow the same format as the template tic-toc app. However, will also subscribe to a farm-ng service, in the case of this application, it will be the oak camera service.
class CameraApp(App):
"""Base class for the main Kivy app."""
STREAM_NAMES = ["rgb", "disparity", "left", "right"]
def __init__(self, service_config: EventServiceConfig) -> None:
super().__init__()
self.service_config = service_config
self.image_decoder = TurboJPEG()
self.async_tasks: list[asyncio.Task] = []
The EventServiceConfig contains the custom configuration used to
specify which specific services your custom application
will need to access. It will be stored in the variable self.service_config
image_decoder
is responsible for taking the raw bit
stream from the oak service and packing them to
make python interpretable image frames.
We will use TurboJPEG
as the image decoder (it is much faster
than kivy's default image decoder).
Update: on_exit_btn(self)
def on_exit_btn(self) -> None:
"""Kills the running kivy application."""
for task in self.tasks:
task.cancel()
App.get_running_app().stop()
This method is similar to the amiga-app-template-kivy
however, this time we also need to cancel the async tasks that are running.
Update: app_func(self)
async def app_func(self):
config_list = proto_from_json_file(
self.service_config, EventServiceConfigList()
)
oak0_client: EventClient | None = None
for config in config_list.configs:
if config.name == "oak0":
oak0_client = EventClient(config)
if None in [oak0_client]:
raise RuntimeError(f"No {config} service config in {self.service_config}")
# stream camera frames
self.tasks: list[asyncio.Task] = [
asyncio.create_task(self.stream_camera(oak0_client, view_name))
for view_name in self.STREAM_NAMES
]
return await asyncio.gather(run_wrapper(), *self.tasks)
More information can be found here:
farm-ng-core
and the Oak Camera Client
example.
Within this repository and example, you can see how config_list
and oak0_client
are made.
Finally, once the oak0_client is made, it can be used to subscribe to the camera streams.
New Method: stream_camera()
Now, add the function stream_camera. The loop for view_name in STREAM_NAMES makes four instances of the stream_camera method. We make all four instances as opposed to only one because rather than changing the async tasks, we can conditionally display the image streams.
async def stream_camera(
self,
oak_client: EventClient,
view_name: Literal["rgb", "disparity", "left", "right"] = "rgb",
) -> None:
"""Subscribes to the camera service and populates the tabbed panel with all 4 image streams."""
while self.root is None:
await asyncio.sleep(0.01)
rate = oak_client.config.subscriptions[0].every_n
async for event, payload in oak_client.subscribe(
SubscribeRequest(uri=Uri(path=f"/{view_name}"), every_n=rate),
decode=False,
):
self.view_name = self.root.ids["tab_root"].current_tab.text
if view_name == self.view_name:
message = payload_to_protobuf(event, payload)
try:
img = self.image_decoder.decode(message.image_data)
except Exception as e:
logger.exception(f"Error decoding image: {e}")
continue
# create the opengl texture and set it to the image
texture = Texture.create(
size=(img.shape[1], img.shape[0]), icolorfmt="bgr"
)
texture.flip_vertical()
texture.blit_buffer(
bytes(img.data),
colorfmt="bgr",
bufferfmt="ubyte",
mipmap_generation=False,
)
self.root.ids[view_name].texture = texture
Event Client
The client-service framework used here is vital to communicate with the Amiga.
async for _, message in EventClient(self.service_config).subscribe(
SubscribeRequest(
uri=Uri(path=f"/{[view_name]}"), every_n=self.stream_every_n
),
decode=True,
):
The EventClient is listening to all of the messages being sent by each of the services on the Amiga.
This class is the "on-ramp" used to access the Amigas communication highway. All the other vehicles on the highway are messages from the other services. The, oaks, canbus, trackfollower, etc... can all be accessed from this EventClient.
It can also be used for subsrbibing to all other services that might be used by your custom application.
More details on the .subscribe() method can be found here: event_client.py
SubscribeRequest accepts an uri (message name) and a relative frequency (every_n). This is considered a relative frequency because the oak service operates at 10hz while the canbus operates at 50hz.
Decode and display
Finally, we can decode and display the images received from the stream.
# create the opengl texture and set it to the image
texture = Texture.create(size=(img.shape[1], img.shape[0]), icolorfmt="bgr")
texture.flip_vertical()
texture.blit_buffer(
bytes(img.data),
colorfmt="bgr",
bufferfmt="ubyte",
mipmap_generation=False,
)
self.root.ids[view_name].texture = texture
For each of the image streams, we update the Image
widget
Texture
in the TabbedPanel
with the corresponding decoded
image.
The Image
widgets in the TabbedPanel
accessed by their kivy
id.
New Method: find_config_by_name()
We've made stock configurations for each of the services.
def find_config_by_name(
service_configs: EventServiceConfigList, name: str
) -> EventServiceConfig | None:
"""Utility function to find a service config by name.
Args:
service_configs: List of service configs
name: Name of the service to find
"""
for config in service_configs.configs:
if config.name == name:
return config
return None
Should you only provide the name of the service in the configuration file, this will return a complete configuration for your custom application.
Update: main()
This example is used to demonstrate how you could specify specific services to subscribe to from the command line.
if __name__ == "__main__":
parser = argparse.ArgumentParser(prog="template-app")
# Add additional command line arguments here
parser.add_argument("--service-config", type=Path, default="service_config.json")
args = parser.parse_args()
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(CameraApp(args.service_config).app_func())
except asyncio.CancelledError:
pass
loop.close()
service_config.json is a list of each of the services you've requested for your application. For this example, its only a single oak service but it could modified to add more cameras or canbus for example.
Add File: service_config.json
{
"configs": [
{
"name": "oak0",
"port": 50010,
"host": "localhost",
"log_level": "INFO",
"subscriptions": [
{
"uri": {
"path": "*",
"query": "service_name=oak0"
},
"every_n": 1
}
]
}
]
}
Update setup.cfg
In order to import everything we need, we must add the library PyTurboJPEG
to
the
setup.cfg
file so the dependency installs.
install_requires =
wheel
kivy
farm_ng_amiga
PyTurboJPEG
Running the app
To run the app:
- run the command
./install.sh
and click the icon on the brain screen - run the command
./entry.sh