-
Notifications
You must be signed in to change notification settings - Fork 0
User/jvollmer/read can with submodule #2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 19 commits
a99dc8e
eac8e06
a09c3e7
f8b5624
b6ae409
27ad02f
732b7b2
bfd9dbf
f0ab31d
dd60d39
22c1610
6f6c5eb
8a74ec1
471126e
45d78ce
09a2d82
da22fe1
a6923cc
74d1dfe
3700ca4
2d5d0b7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
[submodule "sc1-data-format"] | ||
path = sc1-data-format | ||
url = git@github.com:badgerloop-software/sc1-data-format.git |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
Link to docs https://www.waveshare.com/wiki/RS485_CAN_HAT#Install_Library | ||
|
||
Create virtual environment using: | ||
|
||
- Windows: `virtualenv -p python3 .env` | ||
- Linux/MacOS: `python3 -m venv .env` |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,143 @@ | ||||||
import can | ||||||
import struct | ||||||
import time | ||||||
import logging | ||||||
from send_messages import transmit_can_message | ||||||
import argparse | ||||||
from typing import List, Dict, Any | ||||||
from dataclasses import dataclass, asdict | ||||||
import json | ||||||
|
||||||
""" | ||||||
Message structure: | ||||||
.arbitration_id: id of the CAN message | ||||||
.data: the body content of the CAN message | ||||||
.timestamp: the timestamp of the CAN message | ||||||
""" | ||||||
|
||||||
|
||||||
@dataclass | ||||||
class SignalInfo: | ||||||
name: str | ||||||
bytes: int | ||||||
type: str | ||||||
units: str | ||||||
nominal_min: int | ||||||
nominal_max: int | ||||||
subsystem: str | ||||||
|
||||||
|
||||||
logging.basicConfig( | ||||||
level=logging.INFO, | ||||||
format="%(asctime)s - %(levelname)s - %(message)s", | ||||||
datefmt="%Y-%m-%d %H:%M:%S", | ||||||
) | ||||||
|
||||||
|
||||||
def preprocess_data_format(format: Dict[str, List[Any]]) -> Dict[str, Dict[int, Any]]: | ||||||
""" | ||||||
Parse data format and return in a more friendly format for CAN consumption: | ||||||
<CAN ID>: { | ||||||
<offset>: { | ||||||
<category 1>: ... | ||||||
... | ||||||
} | ||||||
... | ||||||
}, | ||||||
... | ||||||
""" | ||||||
processed = {} | ||||||
for key, s in format.items(): | ||||||
# Get the arbitration ID and offset | ||||||
can_id = int(s[-2], base=16) | ||||||
offset = s[-1] | ||||||
# Drop the ID and offset | ||||||
s = s[:-2] | ||||||
# Add/Update the message data in the processed data format | ||||||
p = processed.setdefault(can_id, {}) | ||||||
num_bytes = s[0] | ||||||
data_type = s[1] | ||||||
units = s[2] | ||||||
nominal_min = s[3] | ||||||
nominal_max = s[4] | ||||||
subsystem = s[5] | ||||||
signal_info = SignalInfo( | ||||||
key, num_bytes, data_type, units, nominal_min, nominal_max, subsystem | ||||||
) | ||||||
p[offset] = signal_info | ||||||
|
||||||
return processed | ||||||
|
||||||
|
||||||
with open("sc1-data-format/format.json", "r") as file: | ||||||
data = json.load(file) | ||||||
|
||||||
signal_definitions = preprocess_data_format(data) | ||||||
|
||||||
|
||||||
class MyListener(can.Listener): | ||||||
def on_message_received(self, message): | ||||||
message_data = { | ||||||
"id": message.arbitration_id, | ||||||
"data": list(message.data), | ||||||
"timestamp": message.timestamp, | ||||||
} | ||||||
|
||||||
# loop through signal definitions to find can_id | ||||||
can_id = message_data["id"] | ||||||
if can_id not in signal_definitions: | ||||||
logging.error(f"CAN ID {can_id:0x} not found in signal definitions.") | ||||||
return | ||||||
signals = signal_definitions[can_id] | ||||||
byte_array = bytes(message.data) | ||||||
|
||||||
for offset, signals_info in signals.items(): | ||||||
logging.debug(f"Processing signal at offset {offset} for CAN ID {can_id:0x}") | ||||||
print(offset) | ||||||
print(signals_info) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can revert print statements or convert to debug logging statements for committed code There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. removed |
||||||
data_type = signals_info.type | ||||||
signal_name = signals_info.name | ||||||
if data_type == "float": | ||||||
if len(byte_array) < 4: | ||||||
logging.error( | ||||||
f"Insufficient data for float signal in CAN ID {can_id:0x}." | ||||||
) | ||||||
return | ||||||
else: | ||||||
# Unpack the first 4 bytes as a little-endian float. | ||||||
float_value = struct.unpack("<f", byte_array[:4])[0] | ||||||
logging.debug( | ||||||
f"New Message: ID={can_id:0x},Name={signal_name} Value={float_value}, Time Stamp={message_data['timestamp']}" | ||||||
) | ||||||
elif data_type == "boolean": | ||||||
bool_value = bool((byte_array[0] >> offset) & 1) | ||||||
logging.debug( | ||||||
f"New Message: ID={can_id:0x},Name={signal_name} Value={bool_value}, Time Stamp={message_data['timestamp']}" | ||||||
) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we differentiate between all data types in the data format? Also, can we do a more strict check against the expected length in bytes for each value? |
||||||
|
||||||
|
||||||
if __name__ == "__main__": | ||||||
# Create a CAN bus connection | ||||||
bus = can.interface.Bus(channel="can0", bustype="socketcan") | ||||||
|
||||||
# set up listener | ||||||
listener = MyListener() | ||||||
|
||||||
# A Notifier runs in the background and listens for messages. When a new message arrives, it calls on_message in MyListener. | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's supposed to be on_message_received |
||||||
notifier = can.Notifier(bus, [listener]) | ||||||
|
||||||
try: | ||||||
# create an infinite loop to keep listening to messages. | ||||||
print("Listening for CAN messages... Press Ctrl+C to stop.") | ||||||
parser = argparse.ArgumentParser(description="Specify the CAN channel.") | ||||||
# Define a positional argument for channel | ||||||
parser.add_argument("channel", type=str, help="CAN channel (e.g., can0, vcan0)") | ||||||
args = parser.parse_args() | ||||||
transmit_can_message() | ||||||
Comment on lines
+133
to
+135
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we split out transmitting and reading functionality for the test script (i.e. add another "tx" argument to enable transmitting (a) message(s)) |
||||||
while True: | ||||||
time.sleep(1) | ||||||
# Infinite loop to keep listening | ||||||
except KeyboardInterrupt: | ||||||
print("Stopping CAN receiver.") | ||||||
notifier.stop() | ||||||
bus.shutdown() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
#!/usr/bin/env python3 | ||
import can | ||
import argparse | ||
import logging | ||
|
||
|
||
def transmit_can_message(channel="can0", bustype="socketcan"): | ||
""" | ||
Initializes the CAN bus, prompts the user for message parameters, | ||
and transmits the CAN message. | ||
""" | ||
# Initialize the CAN bus interface | ||
bus = can.interface.Bus(channel=channel, bustype=bustype) | ||
|
||
# Prompt for the arbitration ID (CAN ID), this gets the arbitration id from the user and validates it | ||
arb_id_input = input("Enter 3-digit hex message id (e.g. 0x123): ").strip() | ||
try: | ||
# Accept input as hex if it starts with 0x, otherwise assume hex too | ||
arbitration_id = int(arb_id_input, 16) | ||
except ValueError: | ||
logging.error("Invalid data byte input.") | ||
bus.shutdown() | ||
return | ||
|
||
# Prompt for data bytes input (as hex values) accepts users inputs as data. | ||
data_input = input( | ||
"Enter data bytes as hex values separated by spaces (e.g. '11 22 33 44'): " | ||
).strip() | ||
try: | ||
data_bytes = [] | ||
for byte_str in data_input.split(): | ||
# Convert each input to an integer (assumed hex) | ||
data_byte = int(byte_str, 16) | ||
if not (0 <= data_byte <= 255): | ||
logging.debug(f"Byte out of range (0-255): {data_byte}") | ||
bus.shutdown() | ||
return | ||
data_bytes.append(data_byte) | ||
except ValueError: | ||
logging.error("Invalid data byte input.") | ||
bus.shutdown() | ||
return | ||
|
||
# Create the CAN message | ||
msg = can.Message( | ||
arbitration_id=arbitration_id, data=data_bytes, is_extended_id=False | ||
) | ||
|
||
# Send message using bus.send() | ||
try: | ||
bus.send(msg) | ||
logging.debug( | ||
f"Message sent successfully!\n Message Details: ID={msg.arbitration_id}, Data={msg.data}" | ||
) | ||
except can.CanError as e: | ||
logging.error(f"Message failed to send: {e}") | ||
|
||
# Cleanly shutdown the bus connection | ||
bus.shutdown() | ||
|
||
|
||
if __name__ == "__main__": | ||
# Allow user to input CAN channel | ||
parser = argparse.ArgumentParser(description="Specify the CAN channel.") | ||
# Define a positional argument for channel | ||
parser.add_argument("channel", type=str, help="CAN channel (e.g., can0, vcan0)") | ||
args = parser.parse_args() | ||
# add logging | ||
logging.basicConfig( | ||
level=logging.INFO, | ||
format="%(asctime)s - %(levelname)s - %(message)s", | ||
datefmt="%Y-%m-%d %H:%M:%S", | ||
) | ||
transmit_can_message(args.channel) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Not looping through
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed