Skip to content
Open
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[submodule "sc1-data-format"]
path = sc1-data-format
url = git@github.com:badgerloop-software/sc1-data-format.git
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Link to docs https://www.waveshare.com/wiki/RS485_CAN_HAT#Install_Library

Create virtual environment using:

- Windows: `virtualenv -p python3 .env`
- Linux/MacOS: `python3 -m venv .env`
143 changes: 143 additions & 0 deletions read_can_messages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
import can
import struct
import time
import logging
from send_messages import transmit_can_message
import argparse
from typing import List, Dict, Any
from dataclasses import dataclass, asdict
import json

"""
Message structure:
.arbitration_id: id of the CAN message
.data: the body content of the CAN message
.timestamp: the timestamp of the CAN message
"""


@dataclass
class SignalInfo:
name: str
bytes: int
type: str
units: str
nominal_min: int
nominal_max: int
subsystem: str


logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)


def preprocess_data_format(format: Dict[str, List[Any]]) -> Dict[str, Dict[int, Any]]:
"""
Parse data format and return in a more friendly format for CAN consumption:
<CAN ID>: {
<offset>: {
<category 1>: ...
...
}
...
},
...
"""
processed = {}
for key, s in format.items():
# Get the arbitration ID and offset
can_id = int(s[-2], base=16)
offset = s[-1]
# Drop the ID and offset
s = s[:-2]
# Add/Update the message data in the processed data format
p = processed.setdefault(can_id, {})
num_bytes = s[0]
data_type = s[1]
units = s[2]
nominal_min = s[3]
nominal_max = s[4]
subsystem = s[5]
signal_info = SignalInfo(
key, num_bytes, data_type, units, nominal_min, nominal_max, subsystem
)
p[offset] = signal_info

return processed


with open("sc1-data-format/format.json", "r") as file:
data = json.load(file)

signal_definitions = preprocess_data_format(data)


class MyListener(can.Listener):
def on_message_received(self, message):
message_data = {
"id": message.arbitration_id,
"data": list(message.data),
"timestamp": message.timestamp,
}

# loop through signal definitions to find can_id
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Not looping through

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

can_id = message_data["id"]
if can_id not in signal_definitions:
logging.error(f"CAN ID {can_id:0x} not found in signal definitions.")
return
signals = signal_definitions[can_id]
byte_array = bytes(message.data)

for offset, signals_info in signals.items():
logging.debug(f"Processing signal at offset {offset} for CAN ID {can_id:0x}")
print(offset)
print(signals_info)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can revert print statements or convert to debug logging statements for committed code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

data_type = signals_info.type
signal_name = signals_info.name
if data_type == "float":
if len(byte_array) < 4:
logging.error(
f"Insufficient data for float signal in CAN ID {can_id:0x}."
)
return
else:
# Unpack the first 4 bytes as a little-endian float.
float_value = struct.unpack("<f", byte_array[:4])[0]
logging.debug(
f"New Message: ID={can_id:0x},Name={signal_name} Value={float_value}, Time Stamp={message_data['timestamp']}"
)
elif data_type == "boolean":
bool_value = bool((byte_array[0] >> offset) & 1)
logging.debug(
f"New Message: ID={can_id:0x},Name={signal_name} Value={bool_value}, Time Stamp={message_data['timestamp']}"
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we differentiate between all data types in the data format? Also, can we do a more strict check against the expected length in bytes for each value?



if __name__ == "__main__":
# Create a CAN bus connection
bus = can.interface.Bus(channel="can0", bustype="socketcan")

# set up listener
listener = MyListener()

# A Notifier runs in the background and listens for messages. When a new message arrives, it calls on_message in MyListener.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# A Notifier runs in the background and listens for messages. When a new message arrives, it calls on_message in MyListener.
# A Notifier runs in the background and listens for messages. When a new message arrives, it calls on_message_received in MyListener.

?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's supposed to be on_message_received

notifier = can.Notifier(bus, [listener])

try:
# create an infinite loop to keep listening to messages.
print("Listening for CAN messages... Press Ctrl+C to stop.")
parser = argparse.ArgumentParser(description="Specify the CAN channel.")
# Define a positional argument for channel
parser.add_argument("channel", type=str, help="CAN channel (e.g., can0, vcan0)")
args = parser.parse_args()
transmit_can_message()
Comment on lines +133 to +135
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we split out transmitting and reading functionality for the test script (i.e. add another "tx" argument to enable transmitting (a) message(s))

while True:
time.sleep(1)
# Infinite loop to keep listening
except KeyboardInterrupt:
print("Stopping CAN receiver.")
notifier.stop()
bus.shutdown()
1 change: 1 addition & 0 deletions sc1-data-format
Submodule sc1-data-format added at f4fefb
74 changes: 74 additions & 0 deletions send_messages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
#!/usr/bin/env python3
import can
import argparse
import logging


def transmit_can_message(channel="can0", bustype="socketcan"):
"""
Initializes the CAN bus, prompts the user for message parameters,
and transmits the CAN message.
"""
# Initialize the CAN bus interface
bus = can.interface.Bus(channel=channel, bustype=bustype)

# Prompt for the arbitration ID (CAN ID), this gets the arbitration id from the user and validates it
arb_id_input = input("Enter 3-digit hex message id (e.g. 0x123): ").strip()
try:
# Accept input as hex if it starts with 0x, otherwise assume hex too
arbitration_id = int(arb_id_input, 16)
except ValueError:
logging.error("Invalid data byte input.")
bus.shutdown()
return

# Prompt for data bytes input (as hex values) accepts users inputs as data.
data_input = input(
"Enter data bytes as hex values separated by spaces (e.g. '11 22 33 44'): "
).strip()
try:
data_bytes = []
for byte_str in data_input.split():
# Convert each input to an integer (assumed hex)
data_byte = int(byte_str, 16)
if not (0 <= data_byte <= 255):
logging.debug(f"Byte out of range (0-255): {data_byte}")
bus.shutdown()
return
data_bytes.append(data_byte)
except ValueError:
logging.error("Invalid data byte input.")
bus.shutdown()
return

# Create the CAN message
msg = can.Message(
arbitration_id=arbitration_id, data=data_bytes, is_extended_id=False
)

# Send message using bus.send()
try:
bus.send(msg)
logging.debug(
f"Message sent successfully!\n Message Details: ID={msg.arbitration_id}, Data={msg.data}"
)
except can.CanError as e:
logging.error(f"Message failed to send: {e}")

# Cleanly shutdown the bus connection
bus.shutdown()


if __name__ == "__main__":
# Allow user to input CAN channel
parser = argparse.ArgumentParser(description="Specify the CAN channel.")
# Define a positional argument for channel
parser.add_argument("channel", type=str, help="CAN channel (e.g., can0, vcan0)")
args = parser.parse_args()
# add logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
transmit_can_message(args.channel)