Skip to main content

Amiga Tool Control example

Basic Knowledge Requirements

Before diving into this code, here's a quick heads-up on what you'll need to be familiar with:

  1. Python Programming: It's important to have a good grasp of Python, especially with concepts like functions, loops, and classes, since the example utilizes these fundamentals.
  2. Asynchronous Programming with asyncio: Familiarity with Python's asyncio for writing concurrent code using the async/await syntax.
  3. farm-ng Canbus Service Overview: This overview provides a base understanding of the gRPC service the client you create will connect to.
  4. Vehicle Twist example: It is recommended to have used the Vehicle Twist example for motion control of the Amiga before controlling the tools.
Warning

The tool control example will cause any tools connected to the Amiga to actuate when the dashboard is in auto mode. This includes H-bridges and PTOs. Make sure the area is clear before running examples.

The Tool Control Example operates as a standalone Python script, in which an EventClient to the farm-ng Canbus service running on an Amiga brain is created.

This script commands actuators (H-bridges & PTOs) based on keyboard inputs from the computer on which you run the example.

You should run this example on your local PC, connected to the same local network as the brain or linked to it through tailscale.

Ensure that a farm-ng brain, attached to an Amiga with at least one H-bridge or PTO device, is actively running the canbus service.

1. Install the farm-ng Brain ADK package

2. Install the example's dependencies

Setup

cd farm-ng-amiga/
Recommended

Create a virtual environment

python3 -m venv venv
source venv/bin/activate

Install

cd py/examples/tool_control
pip install -r requirements.txt

3. Execute the Python script

info

Since this example must be run from your local PC, you will need update the service_config.json by modifying the host field with your Amiga brain name.

Please check out Amiga Development 101 for more details.

python main.py --service-config service_config.json

If everything worked correctly you should now see a stream of the statuses of all connected tools come up in your terminal!

4. Using the example

Reminder

To control tools using this example, you must activate the auto control mode on your Amiga via the dashboard.

Key Combinations for Tool Control

Let's explore the key combinations used in the tool_control_from_key_presses function for controlling various devices:

tip

It is recommended to double check the example code to confirm that the following key mappings have not changed before you run the example.

All devices passive

  • Spacebar: Pressing this key sets all devices to passive mode and overrides any additional key presses.

H-Bridge Control

The H-Bridges are controlled using the numbers 0, 1, 2, & 3, corresponding to H-Bridges 0, 1, 2, & 3, along with the Up and Down arrow keys:

  • Up Arrow + [0/1/2/3]: Moves the corresponding H-Bridges forward.
  • Down Arrow + [0/1/2/3]: Moves the corresponding H-Bridges in reverse.
  • Up + Down Arrows + [0/1/2/3]: Stops the corresponding H-Bridges.

PTO Control

PTOs are controlled with the keys a, b, c, & d, corresponding to PTOs 0, 1, 2, & 3, combined with the Left and Right arrow keys:

  • Left Arrow + [a/b/c/d]: Moves the PTO forward.
  • Right Arrow + [a/b/c/d]: Moves the PTO in reverse.
  • Left + Right Arrows + [a/b/c/d]: Stops the PTO.

These key mappings allow for simultaneous control of multiple devices through the Canbus Service /control_tools API.

Running with Twist control

You can also run this example alongside the Vehicle Twist example to control the tools and drive the robot at the same time. The Canbus Service will synchronize the Twist2d commands with the ActuatorCommands for simultaneous driving and tool control!

5. Code Overview

KeyboardListener

We create a KeyboardListener class that wraps the pynput.keyboard for receiving and assembling a set of simultaneous key presses.

class KeyboardListener:
def __init__(self):
self.pressed_keys = set()
self.listener = keyboard.Listener(on_press=self.on_press, on_release=self.on_release)

def on_press(self, key):
try:
key_name = key.char
except AttributeError:
key_name = key.name # For special keys
self.pressed_keys.add(key_name)

def on_release(self, key):
try:
key_name = key.char
except AttributeError:
key_name = key.name # For special keys
self.pressed_keys.discard(key_name)

def start(self):
self.listener.start()

def stop(self):
self.listener.stop()

tool_control_from_key_presses

This function maps the pressed set of pressed_keys to an ActuatorCommands proto message that we can use to command the tools on the Amiga.

def tool_control_from_key_presses(pressed_keys: set) -> ActuatorCommands:
if 'space' in pressed_keys:
print("Set all to passive with empty command")
return ActuatorCommands()

commands: ActuatorCommands = ActuatorCommands()

# H-bridges controlled with 0, 1, 2, 3 & up / down arrows
# up = forward, down = reverse, both = stop, neither / not pressed => omitted => passive
if 'up' in pressed_keys and 'down' in pressed_keys:
for hbridge_id in pressed_keys & {'0', '1', '2', '3'}:
commands.hbridges.append(HBridgeCommand(id=int(hbridge_id), command=HBridgeCommandType.HBRIDGE_STOPPED))
elif 'up' in pressed_keys:
for hbridge_id in pressed_keys & {'0', '1', '2', '3'}:
commands.hbridges.append(HBridgeCommand(id=int(hbridge_id), command=HBridgeCommandType.HBRIDGE_FORWARD))
elif 'down' in pressed_keys:
for hbridge_id in pressed_keys & {'0', '1', '2', '3'}:
commands.hbridges.append(HBridgeCommand(id=int(hbridge_id), command=HBridgeCommandType.HBRIDGE_REVERSE))

# PTOs controlled with a, b, c, d & left / right arrows
# left = forward, right = reverse, both = stop, neither / not pressed => omitted => passive
pto_id_mapping = {'a': 0x0, 'b': 0x1, 'c': 0x2, 'd': 0x3}
pto_rpm: float = 20.0
if 'left' in pressed_keys and 'right' in pressed_keys:
for pto_char in pressed_keys & {'a', 'b', 'c', 'd'}:
pto_id = pto_id_mapping[pto_char]
commands.ptos.append(PtoCommand(id=pto_id, command=PtoCommandType.PTO_STOPPED, rpm=pto_rpm))
elif 'left' in pressed_keys:
for pto_char in pressed_keys & {'a', 'b', 'c', 'd'}:
pto_id = pto_id_mapping[pto_char]
commands.ptos.append(PtoCommand(id=pto_id, command=PtoCommandType.PTO_FORWARD, rpm=pto_rpm))
elif 'right' in pressed_keys:
for pto_char in pressed_keys & {'a', 'b', 'c', 'd'}:
pto_id = pto_id_mapping[pto_char]
commands.ptos.append(PtoCommand(id=pto_id, command=PtoCommandType.PTO_REVERSE, rpm=pto_rpm))

return commands

control_tools

In this example we use the EventClient with the request_reply method to send an ActuatorCommands message on the /control_tools path at a regular interval.

async def control_tools(service_config_path: Path, keyboard_listener: KeyboardListener) -> None:
"""Control the tools / actuators on your Amiga.

Args:
service_config_path (Path): The path to the canbus service config.
keyboard_listener (KeyboardListener): The keyboard listener.
"""
config: EventServiceConfig = proto_from_json_file(service_config_path, EventServiceConfig())
client: EventClient = EventClient(config)

while True:
# Send the tool control command
commands: ActuatorCommands = tool_control_from_key_presses(keyboard_listener.pressed_keys)
await client.request_reply("/control_tools", commands, decode=True)

# Sleep for a bit
await asyncio.sleep(0.1)

stream_tool_statuses

We asynchronously stream the ToolStatuses message from the canbus service on the /tool_statuses topic.

async def stream_tool_statuses(service_config_path: Path) -> None:
"""Stream the tool statuses.

Args:
service_config_path (Path): The path to the canbus service config.
"""

config: EventServiceConfig = proto_from_json_file(service_config_path, EventServiceConfig())

message: ToolStatuses
async for event, message in EventClient(config).subscribe(config.subscriptions[0], decode=True):
print("###################")
print(message)

Run the tasks

We use the asyncio.gather method to allow running the two tasks, (1) controlling the tools and (2) streaming the tool statuses, simultaneously and asynchronously.

async def run(service_config_path: Path, keyboard_listener: KeyboardListener):
# Create tasks for both functions
tasks: list[asyncio.Task] = [
asyncio.create_task(control_tools(service_config_path, keyboard_listener)),
asyncio.create_task(stream_tool_statuses(service_config_path)),
]
await asyncio.gather(*tasks)


if __name__ == "__main__":
parser = argparse.ArgumentParser(prog="Command and monitor tools with the canbus service.")
parser.add_argument("--service-config", type=Path, required=True, help="The canbus service config.")
args = parser.parse_args()

keyboard_listener = KeyboardListener()
keyboard_listener.start()

try:
asyncio.run(run(args.service_config, keyboard_listener))
except KeyboardInterrupt:
pass
finally:
keyboard_listener.stop()

Congrats you are done!