Skip to main content

Migration Guide to Amiga OS 2.0

Preamble

On November 2023, we released Amiga OS 2.0 Barley, which is a major update to the Amiga OS 1.0 Artichoke. This update includes a number of breaking changes to the Amiga OS 1.0 Artichoke API.

This guide will help you migrate your code from Amiga OS 1.0 Artichoke to Amiga OS 2.0 Barley. For more information about the Amiga OS 2.0 Barley release, please see the release notes.

Prerequisites

This guide assumes that:

  • You have knowledge of the Amiga OS 1.0 Artichoke API.
  • You have an Amiga Brain running Amiga OS 2.0 Barley.

Migration Steps

Update the Amiga Brain SDK

With the release of Amiga OS 2.0 Barley, we have also released a new version of the Amiga Brain SDK. This new version of the SDK contains the new Amiga OS 2.0 Barley API including new features and breaking changes for the following python packages:

Service API

In farm-ng-core we have introduced a new service API. This API is used to communicate with the Amiga Brain services. The service API is a framework that allows you to create service clients and service servers and share protobuf messages between them.

In particular, we have introduced the new main classes:

  • EventServiceGrpc This class is used to publish events to the Amiga Brain network.
  • EventClient This class is used to subscribe to events from the Amiga Brain network.

We recommend to visit the following tutorials to learn more about the new service API:

  • Service Client: This tutorial will show you how to create a service client to communicate with the Amiga Brain services.
  • Service Counter: A bit more advanced tutorial that will show you how to create a service client to create a service that counts the number of times it has been called.

Example: Kivy camera streamer api

In this example we will show you how to migrate the Kivy camera streamer example using the new service API.

Update the package

In order to use the new application within the brain, we have to include the install.sh, uninstall.sh and manifest.json files in the root of the package.

We suggest to follow the Amiga Brain SDK guide to learn more about how to create a package for the Amiga Brain.

Update the dependencies

As mentioned before, we have to update the dependencies to use the new version of the farm_ng_core and farm_ng_amiga packages:

pip install -U farm_ng_core farm_ng_amiga

check the version of the packages:

pip list | grep -E 'farm-ng|farm_ng'
# farm-ng-amiga 2.0.0
# farm-ng-core 2.0.0

Alternatively, make sure that in the setup.cfg, the farm_ng_core and farm_ng_amiga packages are pointing to the latest version:

install_requires =
wheel
kivy
farm_ng_amiga>=2.0.0
farm_ng_core>=2.0.0
kornia-rs

Update the code

In the main.py file we have to import the new service API:

from farm_ng.core.event_client import EventClient
from farm_ng.core.event_service_pb2 import EventServiceConfig
from farm_ng.core.event_service_pb2 import EventServiceConfigList
from farm_ng.core.event_service_pb2 import SubscribeRequest
from farm_ng.core.events_file_reader import proto_from_json_file
from farm_ng.core.uri_pb2 import Uri

Later, in the implementation of the coroutine stream_camera, we have to create a new EventClient to leverage the new subscribe API to subscribe to the camera events in an asynchronous fashion.

Note that we can configure the subscription via the SubscribeRequest message. In this case, we are subscribing to the /rgb topic and we are requesting to receive every n messages.

The block below should reflect how to use the new EventClient API:

  async def stream_camera(
self, view_name: Literal["rgb", "disparity", "left", "right"] = "rgb"
) -> None:
while self.root is None:
await asyncio.sleep(0.01)

async for _, message in EventClient(self.service_config).subscribe(
SubscribeRequest(
uri=Uri(path=f"/{view_name}"), every_n=self.stream_every_n
),
decode=True,
):
try:
img = self.image_decoder.decode(message.image_data)
except Exception as e:
logger.exception(f"Error decoding image: {e}")
continue

Once you are done and launch the app, you should see the following:

Screenshot from 2023-12-01 17-50-01

Porting Virtual Joystick

This tutorial will walk you though porting form Amiga OS 1.0 Artichoke to Amiga OS 2.0 Barley

In this example, we will walk through porting the virtual-joystick from the OS 1.0 to OS 2.0.

This functions as an example for changes you should make in your custom application to port them from 1.0 to 2.0. Not all changes will be identical, however, we have many examples for new service structure here: Brain Examples

farm-ng Imports

OS 1.0OS 2.0
import grpc
from farm_ng.canbus import canbus_pb2
from farm_ng.canbus.canbus_client import CanbusClient
from farm_ng.canbus.packet import AmigaControlState
from farm_ng.canbus.packet import AmigaTpdo1
from farm_ng.canbus.packet import make_amiga_rpdo1_proto
from farm_ng.canbus.packet import parse_amiga_tpdo1_proto
from farm_ng.oak import oak_pb2
from farm_ng.oak.camera_client import OakCameraClient
from farm_ng.service import service_pb2
from farm_ng.service.service_client import ClientConfig
from farm_ng.canbus.canbus_pb2 import Twist2d
from farm_ng.canbus.packet import AmigaControlState
from farm_ng.canbus.packet import AmigaTpdo1
from farm_ng.core.event_client import EventClient
from farm_ng.core.event_service_pb2 import EventServiceConfig
from farm_ng.core.event_service_pb2 import EventServiceConfigList
from farm_ng.core.event_service_pb2 import SubscribeRequest
from farm_ng.core.events_file_reader import payload_to_protobuf
from farm_ng.core.events_file_reader import proto_from_json_file
from farm_ng.core.uri_pb2 import Uri

With this update, farm-ng-core was largely refactored to make subscribing to services more simple.

Defining Clients

OS 1.0OS 2.0
  def __init__(
self,
address: str,
camera_port: int,
canbus_port: int,
stream_every_n: int
) -> None:
super().__init__()
self.address: str = address

self.camera_port: int = camera_port
self.canbus_port: int = canbus_port
self.stream_every_n: int = stream_every_n
  def __init__(
self,
service_config: EventServiceConfig,
) -> None:
super().__init__()

self.counter: int = 0
self.service_config = service_config

Now, rather than specifying the ports for the camera and canbus through command line arguments, they are included in a file called service_config.json. This file contains all the metadata for the individual services. The service_config.json encapsulates all of the services used by your custom application.

Creating Clients

OS 1.0OS 2.0
camera_config: ClientConfig = ClientConfig(
address=self.address, port=self.camera_port
)
camera_client: OakCameraClient = OakCameraClient(camera_config)

# configure the canbus client
canbus_config: ClientConfig = ClientConfig(
address=self.address, port=self.canbus_port
)
canbus_client: CanbusClient = CanbusClient(canbus_config)
config_list = proto_from_json_file(
self.service_config, EventServiceConfigList()
)

oak0_client: EventClient | None = None
canbus_client: EventClient | None = None

for config in config_list.configs:
if config.name == "oak0":
oak0_client = EventClient(config)
elif config.name == "canbus":
canbus_client = EventClient(config)

Rather than accepting metadata for each service as parameters to the class, all of the clients are defined by the config file. Additionally, all of the clients use the generic EventClient class rather than service specific client classes (eg. OakCameraClient and CanbusClient no longer exist)

asyncio Tasks

OS 1.0OS 2.0
self.async_tasks.append(
asyncio.ensure_future(self.stream_camera(camera_client))
)

# Canbus task(s)
self.async_tasks.append(
asyncio.ensure_future(self.stream_canbus(canbus_client))
)
self.async_tasks.append(
asyncio.ensure_future(self.send_can_msgs(canbus_client))
)
self.tasks: list[asyncio.Task] = [
asyncio.create_task(self.stream_camera(oak0_client, view_name))
for view_name in self.STREAM_NAMES
]

self.tasks.append(asyncio.create_task(self.pose_generator(canbus_client)))

In this example, we subscribe to each of the camera streams, however, if you only need one, the for loop for view_name om self.STREAM_NAMES is not required.

Streaming Cameras

OS 1.0OS 2.0
async def stream_camera(self, client: OakCameraClient) -> None:
"""This task listens to the camera client's stream and
populates the tabbed panel with all 4 image streams
from the oak camera."""
while self.root is None:
await asyncio.sleep(0.01)

response_stream = None

while True:
# check the state of the service
state = await client.get_state()

if state.value not in [
service_pb2.ServiceState.IDLE,
service_pb2.ServiceState.RUNNING,
]:
# Cancel existing stream, if it exists
if response_stream is not None:
response_stream.cancel()
response_stream = None
print("Camera service is not streaming or ready to stream")
await asyncio.sleep(0.1)
continue

# Create the stream
if response_stream is None:
response_stream = client.stream_frames(every_n=1)

try:
# try/except so app doesn't crash on killed service
response: oak_pb2.StreamFramesReply = await response_stream.read()
assert response and response != grpc.aio.EOF, "End of stream"
except Exception as e:
print(e)
response_stream.cancel()
response_stream = None
continue

# get the sync frame
frame: oak_pb2.OakSyncFrame = response.frame

# get image and show
for view_name in ["rgb", "disparity", "left", "right"]:
# Skip if view_name was not included in frame
try:
# Decode the image and render it in the correct kivy texture
img = self.image_decoder.decode(
getattr(frame, view_name).image_data
)
async def stream_camera(
self,
oak_client: EventClient,
view_name: Literal["rgb", "disparity", "left", "right"] = "rgb",
) -> None:

while self.root is None:
await asyncio.sleep(0.01)

rate = oak_client.config.subscriptions[0].every_n

async for event, payload in oak_client.subscribe(
SubscribeRequest(
uri=Uri(path=f"/{view_name}"), every_n=rate
),
decode=False,
):
if view_name == self.view_name:
message = payload_to_protobuf(event, payload)
try:
img = self.image_decoder.decode(message.image_data)
except Exception as e:
logger.exception(f"Error decoding image: {e}")
continue

In OS 2.0, the method .subscribe() from the EventClient class to subscribe to various services.

Sending CAN Messages

OS 1.0OS 2.0
async def stream_canbus(self, client: CanbusClient) -> None:
"""This task:
- listens to the canbus client's stream
- filters for AmigaTpdo1 messages
- extracts useful values from AmigaTpdo1 messages
"""
while self.root is None:
await asyncio.sleep(0.01)

response_stream = None

while True:
# check the state of the service
state = await client.get_state()

if state.value not in [
service_pb2.ServiceState.IDLE,
service_pb2.ServiceState.RUNNING,
]:
if response_stream is not None:
response_stream.cancel()
response_stream = None

print("Canbus service is not streaming or ready to stream")
await asyncio.sleep(0.1)
continue

if (
response_stream is None
and state.value != service_pb2.ServiceState.UNAVAILABLE
):
# get the streaming object
response_stream = client.stream()

try:
# try/except so app doesn't crash on killed service
response: canbus_pb2.StreamCanbusReply = await response_stream.read()
assert response and response != grpc.aio.EOF, "End of stream"
except Exception as e:
print(e)
response_stream.cancel()
response_stream = None
continue

for proto in response.messages.messages:
amiga_tpdo1: Optional[AmigaTpdo1] = parse_amiga_tpdo1_proto(proto)
if amiga_tpdo1:
# Store the value for possible other uses
self.amiga_tpdo1 = amiga_tpdo1

# Update the Label values as they are received
self.amiga_state = AmigaControlState(amiga_tpdo1.state).name[6:]
self.amiga_speed = str(amiga_tpdo1.meas_speed)
self.amiga_rate = str(amiga_tpdo1.meas_ang_rate)

async def send_can_msgs(self, client: CanbusClient) -> None:
"""This task ensures the canbus client sendCanbusMessage
method has the pose_generator it will use to send
messages on the CAN bus to control the Amiga robot."""
while self.root is None:
await asyncio.sleep(0.01)

response_stream = None
while True:
# check the state of the service
state = await client.get_state()

# Wait for a running CAN bus service
if state.value != service_pb2.ServiceState.RUNNING:
# Cancel existing stream, if it exists
if response_stream is not None:
response_stream.cancel()
response_stream = None
print("Waiting for running canbus service...")
await asyncio.sleep(0.1)
continue

if response_stream is None:
print("Start sending CAN messages")
response_stream = client.stub.sendCanbusMessage(self.pose_generator())

try:
async for response in response_stream:
# Sit in this loop and wait until canbus service reports back it is not sending
assert response.success
except Exception as e:
print(e)
response_stream.cancel()
response_stream = None
continue

await asyncio.sleep(0.1)

async def pose_generator(self, period: float = 0.02):
"""The pose generator yields an AmigaRpdo1
(auto control command) for the canbus client
to send on the bus at the specified period
(recommended 50hz) based on the onscreen joystick
position."""
while self.root is None:
await asyncio.sleep(0.01)

joystick: VirtualJoystickWidget = self.root.ids["joystick"]
while True:
msg: canbus_pb2.RawCanbusMessage = make_amiga_rpdo1_proto(
state_req=AmigaControlState.STATE_AUTO_ACTIVE,
cmd_speed=self.max_speed * joystick.joystick_pose.y,
cmd_ang_rate=self.max_angular_rate * -joystick.joystick_pose.x,
)
yield canbus_pb2.SendCanbusMessageRequest(message=msg)
await asyncio.sleep(period)
async def pose_generator(self, canbus_client: EventClient):
"""The pose generator yields an AmigaRpdo1 (auto control command)
for the canbus client to send on the bus
at the specified period (recommended 50hz)
based on the onscreen joystick position."""
while self.root is None:
await asyncio.sleep(0.01)

twist = Twist2d()

joystick: VirtualJoystickWidget = self.root.ids["joystick"]

rate = canbus_client.config.subscriptions[0].every_n

async for event, payload in canbus_client.subscribe(
SubscribeRequest(uri=Uri(path="/state"), every_n=rate),
decode=False,
):
message = payload_to_protobuf(event, payload)
tpdo1 = AmigaTpdo1.from_proto(message.amiga_tpdo1)

twist.linear_velocity_x = self.max_speed * joystick.joystick_pose.y
twist.angular_velocity = self.max_angular_rate * -joystick.joystick_pose.x

self.amiga_state = tpdo1.state.name
self.amiga_speed = "{:.4f}".format(twist.linear_velocity_x)
self.amiga_rate = "{:.4f}".format(twist.angular_velocity)

await canbus_client.request_reply("/twist", twist)

In OS 2.0, receiving and sending CAN messages might have the biggest simplification. Rather than using the two methods, stream_canbus() and send_can_msgs() in OS 1.0, in OS 2.0, we use request_reply() method to send twist2d() messages. More info about the new canbus service can be found here: canbus service

Running the program

OS 1.0OS 2.0
if __name__ == "__main__":
parser = argparse.ArgumentParser(prog="virtual-joystick")
parser.add_argument(
"--address", type=str, default="localhost", help="The server address"
)
parser.add_argument(
"--camera-port",
type=int,
required=True,
help="The grpc port where the camera service is running.",
)
parser.add_argument(
"--canbus-port",
type=int,
required=True,
help="The grpc port where the canbus service is running.",
)
parser.add_argument(
"--stream-every-n",
type=int,
default=1,
help="Streaming frequency (used to skip frames)",
)

args = parser.parse_args()

loop = asyncio.get_event_loop()
try:
loop.run_until_complete(
VirtualJoystickApp(
args.address, args.camera_port, args.canbus_port, args.stream_every_n
).app_func()
)
except asyncio.CancelledError:
pass
loop.close()
if __name__ == "__main__":
parser = argparse.ArgumentParser(prog="template-app")

# Add additional command line arguments here
parser.add_argument("--service-config", type=Path, default="service_config.json")

args = parser.parse_args()

loop = asyncio.get_event_loop()

try:
loop.run_until_complete(KivyVirtualJoystick(args.service_config).app_func())
except asyncio.CancelledError:
pass
loop.close()

Using the service_config.json file allows us to specify all of the service metadata within the .json file rather than from the command line.

service_config.json

By editing the this .json file to include the services required by your application.