Migration Guide to Amiga OS 2.0
Preamble
On November 2023, we released Amiga OS 2.0 Barley, which is a major update to the Amiga OS 1.0 Artichoke. This update includes a number of breaking changes to the Amiga OS 1.0 Artichoke API.
This guide will help you migrate your code from Amiga OS 1.0 Artichoke to Amiga OS 2.0 Barley. For more information about the Amiga OS 2.0 Barley release, please see the release notes.
Prerequisites
This guide assumes that:
- You have knowledge of the Amiga OS 1.0 Artichoke API.
- You have an Amiga Brain running Amiga OS 2.0 Barley.
Migration Steps
Update the Amiga Brain SDK
With the release of Amiga OS 2.0 Barley, we have also released a new version of the Amiga Brain SDK. This new version of the SDK contains the new Amiga OS 2.0 Barley API including new features and breaking changes for the following python packages:
Service API
In farm-ng-core
we have introduced a new service API. This API is used to communicate with the Amiga Brain services. The service API is a framework that allows you to create service clients and service servers and share protobuf messages between them.
In particular, we have introduced the new main classes:
EventServiceGrpc
This class is used to publish events to the Amiga Brain network.
EventClient
This class is used to subscribe to events from the Amiga Brain network.
We recommend to visit the following tutorials to learn more about the new service API:
- Service Client: This tutorial will show you how to create a service client to communicate with the Amiga Brain services.
- Service Counter: A bit more advanced tutorial that will show you how to create a service client to create a service that counts the number of times it has been called.
Example: Kivy camera streamer api
In this example we will show you how to migrate the Kivy camera streamer example using the new service API.
Update the package
In order to use the new application within the brain, we have to include
the install.sh
, uninstall.sh
and manifest.json
files in the root of the package.
We suggest to follow the Amiga Brain SDK guide to learn more about how to create a package for the Amiga Brain.
Update the dependencies
As mentioned before, we have to update the dependencies to use the new version of the farm_ng_core
and farm_ng_amiga
packages:
pip install -U farm_ng_core farm_ng_amiga
check the version of the packages:
pip list | grep -E 'farm-ng|farm_ng'
# farm-ng-amiga 2.0.0
# farm-ng-core 2.0.0
Alternatively, make sure that in the setup.cfg
, the farm_ng_core
and farm_ng_amiga
packages are pointing to the latest version:
install_requires =
wheel
kivy
farm_ng_amiga>=2.0.0
farm_ng_core>=2.0.0
kornia-rs
Update the code
In the main.py
file we have to import the new service API:
from farm_ng.core.event_client import EventClient
from farm_ng.core.event_service_pb2 import EventServiceConfig
from farm_ng.core.event_service_pb2 import EventServiceConfigList
from farm_ng.core.event_service_pb2 import SubscribeRequest
from farm_ng.core.events_file_reader import proto_from_json_file
from farm_ng.core.uri_pb2 import Uri
Later, in the implementation of the coroutine stream_camera
, we have to create a new EventClient
to leverage the new subscribe
API to subscribe to the camera events in an asynchronous fashion.
Note that we can configure the subscription via the SubscribeRequest
message. In this case, we are subscribing to the /rgb
topic and we are requesting to receive every n
messages.
The block below should reflect how to use the new EventClient
API:
async def stream_camera(
self, view_name: Literal["rgb", "disparity", "left", "right"] = "rgb"
) -> None:
while self.root is None:
await asyncio.sleep(0.01)
async for _, message in EventClient(self.service_config).subscribe(
SubscribeRequest(
uri=Uri(path=f"/{view_name}"), every_n=self.stream_every_n
),
decode=True,
):
try:
img = self.image_decoder.decode(message.image_data)
except Exception as e:
logger.exception(f"Error decoding image: {e}")
continue
Once you are done and launch the app, you should see the following:
Porting Virtual Joystick
This tutorial will walk you though porting form Amiga OS 1.0 Artichoke to Amiga OS 2.0 Barley
In this example, we will walk through porting the virtual-joystick
from the OS 1.0 to OS 2.0.
This functions as an example for changes you should make in your custom application to port them from 1.0 to 2.0.
Not all changes will be identical, however, we have many examples for new service structure here:
Brain Examples
farm-ng Imports
OS 1.0 | OS 2.0 |
import grpc from farm_ng.canbus import canbus_pb2 from farm_ng.canbus.canbus_client import CanbusClient from farm_ng.canbus.packet import AmigaControlState from farm_ng.canbus.packet import AmigaTpdo1 from farm_ng.canbus.packet import make_amiga_rpdo1_proto from farm_ng.canbus.packet import parse_amiga_tpdo1_proto from farm_ng.oak import oak_pb2 from farm_ng.oak.camera_client import OakCameraClient from farm_ng.service import service_pb2 from farm_ng.service.service_client import ClientConfig
| from farm_ng.canbus.canbus_pb2 import Twist2d from farm_ng.canbus.packet import AmigaControlState from farm_ng.canbus.packet import AmigaTpdo1 from farm_ng.core.event_client import EventClient from farm_ng.core.event_service_pb2 import EventServiceConfig from farm_ng.core.event_service_pb2 import EventServiceConfigList from farm_ng.core.event_service_pb2 import SubscribeRequest from farm_ng.core.events_file_reader import payload_to_protobuf from farm_ng.core.events_file_reader import proto_from_json_file from farm_ng.core.uri_pb2 import Uri
|
With this update, farm-ng-core
was largely
refactored to make subscribing to services more simple.
Defining Clients
OS 1.0 | OS 2.0 |
def __init__( self, address: str, camera_port: int, canbus_port: int, stream_every_n: int ) -> None: super().__init__() self.address: str = address
self.camera_port: int = camera_port self.canbus_port: int = canbus_port self.stream_every_n: int = stream_every_n
| def __init__( self, service_config: EventServiceConfig, ) -> None: super().__init__()
self.counter: int = 0 self.service_config = service_config
|
Now, rather than specifying the ports for the camera and canbus through command line arguments,
they are included in a file called service_config.json
. This file contains all the metadata for
the individual services. The service_config.json
encapsulates all of the services used by your custom application.
Creating Clients
OS 1.0 | OS 2.0 |
camera_config: ClientConfig = ClientConfig( address=self.address, port=self.camera_port ) camera_client: OakCameraClient = OakCameraClient(camera_config)
canbus_config: ClientConfig = ClientConfig( address=self.address, port=self.canbus_port ) canbus_client: CanbusClient = CanbusClient(canbus_config)
| config_list = proto_from_json_file( self.service_config, EventServiceConfigList() )
oak0_client: EventClient | None = None canbus_client: EventClient | None = None
for config in config_list.configs: if config.name == "oak0": oak0_client = EventClient(config) elif config.name == "canbus": canbus_client = EventClient(config)
|
Rather than accepting metadata for each service as parameters to the class, all of the clients are
defined by the config file. Additionally, all of the clients use the generic EventClient
class
rather than service specific client classes (eg. OakCameraClient
and CanbusClient
no longer exist)
asyncio Tasks
OS 1.0 | OS 2.0 |
self.async_tasks.append( asyncio.ensure_future(self.stream_camera(camera_client)) )
self.async_tasks.append( asyncio.ensure_future(self.stream_canbus(canbus_client)) ) self.async_tasks.append( asyncio.ensure_future(self.send_can_msgs(canbus_client)) )
| self.tasks: list[asyncio.Task] = [ asyncio.create_task(self.stream_camera(oak0_client, view_name)) for view_name in self.STREAM_NAMES ]
self.tasks.append(asyncio.create_task(self.pose_generator(canbus_client)))
|
In this example, we subscribe to each of the camera streams, however, if you only need one,
the for loop for view_name om self.STREAM_NAMES
is not required.
Streaming Cameras
OS 1.0 | OS 2.0 |
async def stream_camera(self, client: OakCameraClient) -> None: """This task listens to the camera client's stream and populates the tabbed panel with all 4 image streams from the oak camera.""" while self.root is None: await asyncio.sleep(0.01)
response_stream = None
while True: state = await client.get_state()
if state.value not in [ service_pb2.ServiceState.IDLE, service_pb2.ServiceState.RUNNING, ]: if response_stream is not None: response_stream.cancel() response_stream = None print("Camera service is not streaming or ready to stream") await asyncio.sleep(0.1) continue
if response_stream is None: response_stream = client.stream_frames(every_n=1)
try: response: oak_pb2.StreamFramesReply = await response_stream.read() assert response and response != grpc.aio.EOF, "End of stream" except Exception as e: print(e) response_stream.cancel() response_stream = None continue
frame: oak_pb2.OakSyncFrame = response.frame
for view_name in ["rgb", "disparity", "left", "right"]: try: img = self.image_decoder.decode( getattr(frame, view_name).image_data )
| async def stream_camera( self, oak_client: EventClient, view_name: Literal["rgb", "disparity", "left", "right"] = "rgb", ) -> None:
while self.root is None: await asyncio.sleep(0.01)
rate = oak_client.config.subscriptions[0].every_n
async for event, payload in oak_client.subscribe( SubscribeRequest( uri=Uri(path=f"/{view_name}"), every_n=rate ), decode=False, ): if view_name == self.view_name: message = payload_to_protobuf(event, payload) try: img = self.image_decoder.decode(message.image_data) except Exception as e: logger.exception(f"Error decoding image: {e}") continue
|
In OS 2.0, the method .subscribe() from the
EventClient
class to subscribe to various services.
Sending CAN Messages
OS 1.0 | OS 2.0 |
async def stream_canbus(self, client: CanbusClient) -> None: """This task: - listens to the canbus client's stream - filters for AmigaTpdo1 messages - extracts useful values from AmigaTpdo1 messages """ while self.root is None: await asyncio.sleep(0.01)
response_stream = None
while True: state = await client.get_state()
if state.value not in [ service_pb2.ServiceState.IDLE, service_pb2.ServiceState.RUNNING, ]: if response_stream is not None: response_stream.cancel() response_stream = None
print("Canbus service is not streaming or ready to stream") await asyncio.sleep(0.1) continue
if ( response_stream is None and state.value != service_pb2.ServiceState.UNAVAILABLE ): response_stream = client.stream()
try: response: canbus_pb2.StreamCanbusReply = await response_stream.read() assert response and response != grpc.aio.EOF, "End of stream" except Exception as e: print(e) response_stream.cancel() response_stream = None continue
for proto in response.messages.messages: amiga_tpdo1: Optional[AmigaTpdo1] = parse_amiga_tpdo1_proto(proto) if amiga_tpdo1: self.amiga_tpdo1 = amiga_tpdo1
self.amiga_state = AmigaControlState(amiga_tpdo1.state).name[6:] self.amiga_speed = str(amiga_tpdo1.meas_speed) self.amiga_rate = str(amiga_tpdo1.meas_ang_rate)
async def send_can_msgs(self, client: CanbusClient) -> None: """This task ensures the canbus client sendCanbusMessage method has the pose_generator it will use to send messages on the CAN bus to control the Amiga robot.""" while self.root is None: await asyncio.sleep(0.01)
response_stream = None while True: state = await client.get_state()
if state.value != service_pb2.ServiceState.RUNNING: if response_stream is not None: response_stream.cancel() response_stream = None print("Waiting for running canbus service...") await asyncio.sleep(0.1) continue
if response_stream is None: print("Start sending CAN messages") response_stream = client.stub.sendCanbusMessage(self.pose_generator())
try: async for response in response_stream: assert response.success except Exception as e: print(e) response_stream.cancel() response_stream = None continue
await asyncio.sleep(0.1)
async def pose_generator(self, period: float = 0.02): """The pose generator yields an AmigaRpdo1 (auto control command) for the canbus client to send on the bus at the specified period (recommended 50hz) based on the onscreen joystick position.""" while self.root is None: await asyncio.sleep(0.01)
joystick: VirtualJoystickWidget = self.root.ids["joystick"] while True: msg: canbus_pb2.RawCanbusMessage = make_amiga_rpdo1_proto( state_req=AmigaControlState.STATE_AUTO_ACTIVE, cmd_speed=self.max_speed * joystick.joystick_pose.y, cmd_ang_rate=self.max_angular_rate * -joystick.joystick_pose.x, ) yield canbus_pb2.SendCanbusMessageRequest(message=msg) await asyncio.sleep(period)
| async def pose_generator(self, canbus_client: EventClient): """The pose generator yields an AmigaRpdo1 (auto control command) for the canbus client to send on the bus at the specified period (recommended 50hz) based on the onscreen joystick position.""" while self.root is None: await asyncio.sleep(0.01)
twist = Twist2d()
joystick: VirtualJoystickWidget = self.root.ids["joystick"]
rate = canbus_client.config.subscriptions[0].every_n
async for event, payload in canbus_client.subscribe( SubscribeRequest(uri=Uri(path="/state"), every_n=rate), decode=False, ): message = payload_to_protobuf(event, payload) tpdo1 = AmigaTpdo1.from_proto(message.amiga_tpdo1)
twist.linear_velocity_x = self.max_speed * joystick.joystick_pose.y twist.angular_velocity = self.max_angular_rate * -joystick.joystick_pose.x
self.amiga_state = tpdo1.state.name self.amiga_speed = "{:.4f}".format(twist.linear_velocity_x) self.amiga_rate = "{:.4f}".format(twist.angular_velocity)
await canbus_client.request_reply("/twist", twist)
|
In OS 2.0, receiving and sending CAN messages might have the biggest simplification.
Rather than using the two methods, stream_canbus() and send_can_msgs() in OS 1.0,
in OS 2.0, we use request_reply() method to send twist2d() messages. More info
about the new canbus service can be found here: canbus service
Running the program
OS 1.0 | OS 2.0 |
if __name__ == "__main__": parser = argparse.ArgumentParser(prog="virtual-joystick") parser.add_argument( "--address", type=str, default="localhost", help="The server address" ) parser.add_argument( "--camera-port", type=int, required=True, help="The grpc port where the camera service is running.", ) parser.add_argument( "--canbus-port", type=int, required=True, help="The grpc port where the canbus service is running.", ) parser.add_argument( "--stream-every-n", type=int, default=1, help="Streaming frequency (used to skip frames)", )
args = parser.parse_args()
loop = asyncio.get_event_loop() try: loop.run_until_complete( VirtualJoystickApp( args.address, args.camera_port, args.canbus_port, args.stream_every_n ).app_func() ) except asyncio.CancelledError: pass loop.close()
| if __name__ == "__main__": parser = argparse.ArgumentParser(prog="template-app")
parser.add_argument("--service-config", type=Path, default="service_config.json")
args = parser.parse_args()
loop = asyncio.get_event_loop()
try: loop.run_until_complete(KivyVirtualJoystick(args.service_config).app_func()) except asyncio.CancelledError: pass loop.close()
|
Using the service_config.json
file allows us to specify all of the service metadata within the .json file rather than from the command line.
By editing the this .json file to include the services
required by your application.