Main Implementation
Kivy
Now that we have the finished our custom widget we need to first add it to our applications kivy string.
Navigating back to src/res/main.kv
we need to import our newly defined widget.
Adding the Widget
To do this, all we need to do is add an instance of our VirtualJoystickWidget to the same level as the images tabbed panel. TabbedPanel:
BoxLayout:
orientation: 'horizontal'
VirtualJoystickWidget:
id: joystick
TabbedPanel:
do_default_tab: False
id: tab_root
TabbedPanelItem:
text: "rgb"
Image:
id: rgb
When we run the code now, we should see:
Adding Text
One additional feature we felt was important was to read information about the dashboard state of the Amiga, this could MANUAL_READY, AUTO_ACIVE, or AUTO_READY, etc.
This will be the final update to our kivy string. Before the VirtualJoystickWidget, we will add a vertical stack of text boxes:
BoxLayout:
orientation: 'horizontal'
BoxLayout:
size_hint_x: 0.3
orientation: 'vertical'
Widget:
size_hint_y: 2.0
Label:
text: "Amiga State:"
font_size: 18
size_hint_y: 0.5
Label:
text: app.amiga_state
font_size: 18
Widget: # Empty placeholder
Label:
text: "Speed [m/s]:"
font_size: 18
size_hint_y: 0.5
Label:
text: app.amiga_speed
font_size: 18
Widget: # Empty placeholder
Label:
text: "Angular Rate [rad/s]:"
font_size: 18
size_hint_y: 0.5
Label:
text: app.amiga_rate
font_size: 18
Widget:
size_hint_y: 2.0
VirtualJoystickWidget:
Please refer to the src/res/main.kv
if the placement is confusing.
In the
src/res/main.kv
and src/main.py
files of the
virtual-joystick
app we define the kivy app and Python implementation of the
VirtualJoystickApp
.
You should open these files for reference as you follow along.
Python Implementation
Imports
You should have already gone through the Camera Streamer Tutorial based on the camera-streamer example app. Understanding these instructions will rely on understanding those!
We will need to add a few more imports to the Camera Streamer Tutorial to send and receive canbus messages.
from farm_ng.canbus.canbus_pb2 import Twist2d
from farm_ng.canbus.packet import AmigaControlState
from farm_ng.canbus.packet import AmigaTpdo1
These imports are a part of the canbus API which will make more sense in application.
Kivy String Variables
To update the kivy string for the amigas, state, linear and angual velocity, we need to add:
amiga_state = StringProperty("???")
amiga_speed = StringProperty("???")
amiga_rate = StringProperty("???")
Class Initialization
In this tutorial, we introduce the service_config.json file.
def __init__(
self,
service_config: EventServiceConfig,
) -> None:
super().__init__()
self.counter: int = 0
self.service_config = service_config
self.async_tasks: list[asyncio.Task] = []
self.image_decoder = ImageDecoder()
self.view_name = "rgb"
self.max_speed: float = 1.0
self.max_angular_rate: float = 1.0
Client Configurations
The app function looks a little different in this example vs the camera streamer. Here, we are using the EventServiceConfigList() function to create a list of services our custom application will subscribe to.
async def app_func(self):
async def run_wrapper() -> None:
# we don't actually need to set asyncio as the lib because it is
# the default, but it doesn't hurt to be explicit
await self.async_run(async_lib="asyncio")
for task in self.async_tasks:
task.cancel()
config_list = proto_from_json_file(
self.service_config, EventServiceConfigList()
)
oak0_client: EventClient | None = None
canbus_client: EventClient | None = None
for config in config_list.configs:
if config.name == "oak0":
oak0_client = EventClient(config)
elif config.name == "canbus":
canbus_client = EventClient(config)
# Confirm that EventClients were created for all required services
if None in [oak0_client,canbus_client]:
raise RuntimeError(
f"No {config} service config in {self.service_config}"
)
# Camera task
self.tasks: list[asyncio.Task] = [
asyncio.create_task(self.stream_camera(oak0_client, view_name))
for view_name in self.STREAM_NAMES
]
self.tasks.append(asyncio.ensure_future(self.pose_generator(canbus_client)))
return await asyncio.gather(run_wrapper(),*self.tasks)
We create two clients in this example, the oak0_client and the canbus_client from the EventClient(). Here is more information about the event_service framework.
Sending CAN Messages
This function has two main purposes, sending can messages and reading messages from the dashboard. The dashboards tpd01 messages contains information about the state of the robot (manual, auto) and velocities. We use the twist2d() structure to send desired velocities to the dashboard which is responsible for converting those to individual wheel velocities.
async def pose_generator(self, canbus_client: EventClient, period: float = 0.02):
"""The pose generator generates twist messages"""
while self.root is None:
await asyncio.sleep(0.01)
twist = Twist2d()
joystick: VirtualJoystickWidget = self.root.ids["joystick"]
rate = canbus_client.config.subscriptions[0].every_n
async for event, payload in canbus_client.subscribe(
SubscribeRequest(uri=Uri(path="/state"), every_n=rate),
decode=False,
):
message = payload_to_protobuf(event, payload)
tpdo1 = AmigaTpdo1.from_proto(message.amiga_tpdo1)
twist.linear_velocity_x = self.max_speed * joystick.joystick_pose.y
twist.angular_velocity = self.max_angular_rate * -joystick.joystick_pose.x
self.amiga_state = tpdo1.state.name
self.amiga_speed = "{:.4f}".format(twist.linear_velocity_x)
self.amiga_rate = "{:.4f}".format(twist.angular_velocity)
await canbus_client.request_reply("/twist", twist)
await asyncio.sleep(period)
The AmigaTpdo1
message comes from the dashboard and contains the:
- state of the Amiga (AmigaControlState)
- measured speed (forward positive)
- measured angular rate (left positive)
This is the information you'll use for closed loop control!
If you're curious to learn more about CAN bus in general, see CSS Electronics - CAN Bus Explained. In this virtual joystick tutorial, we are only teaching you to interact with the canbus client through Amiga state messages.
To display the values in the Label
widgets we use a kivy
StringProperty
for each value.
These are bound to the corresponding Label
widget text fields,
so we only need to update the value of the StringProperty
and
we do not need to update the text field of the Label
explicitly.
Other notes
service_config.json
The service_config is used to store all the services that your custom application will use in one place. Here you can specify which cameras you will be using and at what relative frequency (every_n) and port. We suggest adjusting the every_n of the oak and canbus client and evaluate the performance of the app. We will add the
Local Development
To run while developing, you can run ./entry.sh
to compile and run the code.