-
Notifications
You must be signed in to change notification settings - Fork 0
User/jvollmer/read can with submodule #2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
BeckWangthumboon
wants to merge
21
commits into
main
Choose a base branch
from
user/jvollmer/read-can-with-submodule
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
Show all changes
21 commits
Select commit
Hold shift + click to select a range
a99dc8e
setup + created listener to recieve messages
BeckWangthumboon eac8e06
added loop to display messages
BeckWangthumboon a09c3e7
fix bugs
BeckWangthumboon f8b5624
transmit messages (GPT)
BeckWangthumboon b6ae409
work in progress
BeckWangthumboon 27ad02f
added parsing for boolean and float values in messages.
BeckWangthumboon 732b7b2
fixed bug
BeckWangthumboon bfd9dbf
test can hat
BeckWangthumboon f0ab31d
Add data format submodule
BeckWangthumboon dd60d39
TODO - add starter CAN info
BeckWangthumboon 22c1610
Uppercase README.md
BeckWangthumboon 6f6c5eb
TODO - Untested - Parse data format into a more parseable format
BeckWangthumboon 8a74ec1
fixed parsing and add data class
BeckWangthumboon 471126e
tested preprocess data format function
BeckWangthumboon 45d78ce
fixed parsing
BeckWangthumboon 09a2d82
just moved stuff aroun, deleted signal def and replaced it with data …
BeckWangthumboon da22fe1
changed hex to int in on message_received
BeckWangthumboon a6923cc
added hex to print message
BeckWangthumboon 74d1dfe
fixed parsing again
BeckWangthumboon 3700ca4
created parse data method, moved dataclasses to seperate file
BeckWangthumboon 2d5d0b7
comments + logging change
BeckWangthumboon File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
[submodule "sc1-data-format"] | ||
path = sc1-data-format | ||
url = git@github.com:badgerloop-software/sc1-data-format.git |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
Link to docs https://www.waveshare.com/wiki/RS485_CAN_HAT#Install_Library | ||
|
||
Create virtual environment using: | ||
|
||
- Windows: `virtualenv -p python3 .env` | ||
- Linux/MacOS: `python3 -m venv .env` |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
from dataclasses import dataclass | ||
|
||
|
||
@dataclass | ||
class SignalInfo: | ||
name: str | ||
bytes: int | ||
type: str | ||
units: str | ||
nominal_min: int | ||
nominal_max: int | ||
subsystem: str | ||
|
||
|
||
@dataclass | ||
class ParsedData: | ||
can_id: int | ||
signal_name: str | ||
value: float | bool | ||
timestamp: float |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,142 @@ | ||
import can | ||
import struct | ||
import time | ||
import logging | ||
from send_messages import transmit_can_message | ||
import argparse | ||
from typing import List, Dict, Any | ||
import json | ||
from dataclasses import SignalInfo, ParsedData | ||
|
||
""" | ||
Message structure: | ||
.arbitration_id: id of the CAN message | ||
.data: the body content of the CAN message | ||
.timestamp: the timestamp of the CAN message | ||
""" | ||
|
||
|
||
logging.basicConfig( | ||
level=logging.INFO, | ||
format="%(asctime)s - %(levelname)s - %(message)s", | ||
datefmt="%Y-%m-%d %H:%M:%S", | ||
) | ||
|
||
|
||
def preprocess_data_format(format: Dict[str, List[Any]]) -> Dict[str, Dict[int, Any]]: | ||
""" | ||
Parse data format and return in a more friendly format for CAN consumption: | ||
<CAN ID>: { | ||
<offset>: { | ||
<category 1>: ... | ||
... | ||
} | ||
... | ||
}, | ||
... | ||
""" | ||
processed = {} | ||
for key, s in format.items(): | ||
# Get the arbitration ID and offset | ||
can_id = int(s[-2], base=16) | ||
offset = s[-1] | ||
# Drop the ID and offset | ||
s = s[:-2] | ||
# Add/Update the message data in the processed data format | ||
p = processed.setdefault(can_id, {}) | ||
num_bytes = s[0] | ||
data_type = s[1] | ||
units = s[2] | ||
nominal_min = s[3] | ||
nominal_max = s[4] | ||
subsystem = s[5] | ||
signal_info = SignalInfo( | ||
key, num_bytes, data_type, units, nominal_min, nominal_max, subsystem | ||
) | ||
p[offset] = signal_info | ||
|
||
return processed | ||
|
||
|
||
with open("sc1-data-format/format.json", "r") as file: | ||
data = json.load(file) | ||
|
||
signal_definitions = preprocess_data_format(data) | ||
|
||
|
||
class MyListener(can.Listener): | ||
def on_message_received(self, message): | ||
message_data = { | ||
"id": message.arbitration_id, | ||
"data": list(message.data), | ||
"timestamp": message.timestamp, | ||
} | ||
self.parse_data(message_data) | ||
|
||
def parse_data(self, message_data): | ||
# get can_id | ||
can_id = message_data["id"] | ||
# loop to find can_id | ||
if can_id not in signal_definitions: | ||
logging.error(f"CAN ID {can_id:0x} not found in signal definitions.") | ||
return None | ||
signals = signal_definitions[can_id] | ||
byte_array = bytes(message_data["data"]) | ||
|
||
for offset, signals_info in signals.items(): | ||
logging.debug( | ||
f"Processing signal at offset {offset} for CAN ID {can_id:0x}" | ||
) | ||
data_type = signals_info.type | ||
signal_name = signals_info.name | ||
if data_type == "float": | ||
if len(byte_array) < 4: | ||
logging.error( | ||
f"Insufficient data for float signal in CAN ID {can_id:0x}." | ||
) | ||
return | ||
else: | ||
# Unpack the first 4 bytes as a little-endian float. | ||
float_value = struct.unpack("<f", byte_array[:4])[0] | ||
logging.debug( | ||
f"New Message: ID={can_id:0x},Name={signal_name} Value={float_value}, Time Stamp={message_data['timestamp']}" | ||
) | ||
return ParsedData( | ||
can_id, signal_name, float_value, message_data["timestamp"] | ||
) | ||
|
||
elif data_type == "boolean": | ||
bool_value = bool((byte_array[0] >> offset) & 1) | ||
logging.debug( | ||
f"New Message: ID={can_id:0x},Name={signal_name} Value={bool_value}, Time Stamp={message_data['timestamp']}" | ||
) | ||
return ParsedData( | ||
can_id, signal_name, bool_value, message_data["timestamp"] | ||
) | ||
|
||
|
||
if __name__ == "__main__": | ||
# Create a CAN bus connection | ||
bus = can.interface.Bus(channel="can0", bustype="socketcan") | ||
|
||
# set up listener | ||
listener = MyListener() | ||
|
||
# A Notifier runs in the background and listens for messages. When a new message arrives, it calls on_message in MyListener. | ||
notifier = can.Notifier(bus, [listener]) | ||
|
||
try: | ||
# create an infinite loop to keep listening to messages. | ||
logging.debug("Listening for CAN messages... Press Ctrl+C to stop.") | ||
parser = argparse.ArgumentParser(description="Specify the CAN channel.") | ||
# Define a positional argument for channel | ||
parser.add_argument("channel", type=str, help="CAN channel (e.g., can0, vcan0)") | ||
args = parser.parse_args() | ||
transmit_can_message() | ||
Comment on lines
+133
to
+135
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we split out transmitting and reading functionality for the test script (i.e. add another "tx" argument to enable transmitting (a) message(s)) |
||
while True: | ||
time.sleep(1) | ||
# Infinite loop to keep listening | ||
except KeyboardInterrupt: | ||
logging.debug("Stopping CAN receiver.") | ||
notifier.stop() | ||
bus.shutdown() |
Submodule sc1-data-format
added at
f4fefb
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,74 @@ | ||
#!/usr/bin/env python3 | ||
import can | ||
import argparse | ||
import logging | ||
|
||
|
||
def transmit_can_message(channel="can0", bustype="socketcan"): | ||
""" | ||
Initializes the CAN bus, prompts the user for message parameters, | ||
and transmits the CAN message. | ||
""" | ||
# Initialize the CAN bus interface | ||
bus = can.interface.Bus(channel=channel, bustype=bustype) | ||
|
||
# Prompt for the arbitration ID (CAN ID), this gets the arbitration id from the user and validates it | ||
arb_id_input = input("Enter 3-digit hex message id (e.g. 0x123): ").strip() | ||
try: | ||
# Accept input as hex if it starts with 0x, otherwise assume hex too | ||
arbitration_id = int(arb_id_input, 16) | ||
except ValueError: | ||
logging.error("Invalid data byte input.") | ||
bus.shutdown() | ||
return | ||
|
||
# Prompt for data bytes input (as hex values) accepts users inputs as data. | ||
data_input = input( | ||
"Enter data bytes as hex values separated by spaces (e.g. '11 22 33 44'): " | ||
).strip() | ||
try: | ||
data_bytes = [] | ||
for byte_str in data_input.split(): | ||
# Convert each input to an integer (assumed hex) | ||
data_byte = int(byte_str, 16) | ||
if not (0 <= data_byte <= 255): | ||
logging.debug(f"Byte out of range (0-255): {data_byte}") | ||
bus.shutdown() | ||
return | ||
data_bytes.append(data_byte) | ||
except ValueError: | ||
logging.error("Invalid data byte input.") | ||
bus.shutdown() | ||
return | ||
|
||
# Create the CAN message | ||
msg = can.Message( | ||
arbitration_id=arbitration_id, data=data_bytes, is_extended_id=False | ||
) | ||
|
||
# Send message using bus.send() | ||
try: | ||
bus.send(msg) | ||
logging.debug( | ||
f"Message sent successfully!\n Message Details: ID={msg.arbitration_id}, Data={msg.data}" | ||
) | ||
except can.CanError as e: | ||
logging.error(f"Message failed to send: {e}") | ||
|
||
# Cleanly shutdown the bus connection | ||
bus.shutdown() | ||
|
||
|
||
if __name__ == "__main__": | ||
# Allow user to input CAN channel | ||
parser = argparse.ArgumentParser(description="Specify the CAN channel.") | ||
# Define a positional argument for channel | ||
parser.add_argument("channel", type=str, help="CAN channel (e.g., can0, vcan0)") | ||
args = parser.parse_args() | ||
# add logging | ||
logging.basicConfig( | ||
level=logging.INFO, | ||
format="%(asctime)s - %(levelname)s - %(message)s", | ||
datefmt="%Y-%m-%d %H:%M:%S", | ||
) | ||
transmit_can_message(args.channel) |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's supposed to be on_message_received