diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..89dcfa8 --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "sc1-data-format"] + path = sc1-data-format + url = git@github.com:badgerloop-software/sc1-data-format.git diff --git a/README.md b/README.md new file mode 100644 index 0000000..eec6aad --- /dev/null +++ b/README.md @@ -0,0 +1,6 @@ +Link to docs https://www.waveshare.com/wiki/RS485_CAN_HAT#Install_Library + +Create virtual environment using: + +- Windows: `virtualenv -p python3 .env` +- Linux/MacOS: `python3 -m venv .env` diff --git a/data_classes.py b/data_classes.py new file mode 100644 index 0000000..828d4e1 --- /dev/null +++ b/data_classes.py @@ -0,0 +1,20 @@ +from dataclasses import dataclass + + +@dataclass +class SignalInfo: + name: str + bytes: int + type: str + units: str + nominal_min: int + nominal_max: int + subsystem: str + + +@dataclass +class ParsedData: + can_id: int + signal_name: str + value: float | bool + timestamp: float diff --git a/read_can_messages.py b/read_can_messages.py new file mode 100644 index 0000000..835c706 --- /dev/null +++ b/read_can_messages.py @@ -0,0 +1,142 @@ +import can +import struct +import time +import logging +from send_messages import transmit_can_message +import argparse +from typing import List, Dict, Any +import json +from dataclasses import SignalInfo, ParsedData + +""" +Message structure: +.arbitration_id: id of the CAN message +.data: the body content of the CAN message +.timestamp: the timestamp of the CAN message +""" + + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(levelname)s - %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", +) + + +def preprocess_data_format(format: Dict[str, List[Any]]) -> Dict[str, Dict[int, Any]]: + """ + Parse data format and return in a more friendly format for CAN consumption: + : { + : { + : ... + ... + } + ... + }, + ... + """ + processed = {} + for key, s in format.items(): + # Get the arbitration ID and offset + can_id = int(s[-2], base=16) + offset = s[-1] + # Drop the ID and offset + s = s[:-2] + # Add/Update the message data in the processed data format + p = processed.setdefault(can_id, {}) + num_bytes = s[0] + data_type = s[1] + units = s[2] + nominal_min = s[3] + nominal_max = s[4] + subsystem = s[5] + signal_info = SignalInfo( + key, num_bytes, data_type, units, nominal_min, nominal_max, subsystem + ) + p[offset] = signal_info + + return processed + + +with open("sc1-data-format/format.json", "r") as file: + data = json.load(file) + +signal_definitions = preprocess_data_format(data) + + +class MyListener(can.Listener): + def on_message_received(self, message): + message_data = { + "id": message.arbitration_id, + "data": list(message.data), + "timestamp": message.timestamp, + } + self.parse_data(message_data) + + def parse_data(self, message_data): + # get can_id + can_id = message_data["id"] + # loop to find can_id + if can_id not in signal_definitions: + logging.error(f"CAN ID {can_id:0x} not found in signal definitions.") + return None + signals = signal_definitions[can_id] + byte_array = bytes(message_data["data"]) + + for offset, signals_info in signals.items(): + logging.debug( + f"Processing signal at offset {offset} for CAN ID {can_id:0x}" + ) + data_type = signals_info.type + signal_name = signals_info.name + if data_type == "float": + if len(byte_array) < 4: + logging.error( + f"Insufficient data for float signal in CAN ID {can_id:0x}." + ) + return + else: + # Unpack the first 4 bytes as a little-endian float. + float_value = struct.unpack("> offset) & 1) + logging.debug( + f"New Message: ID={can_id:0x},Name={signal_name} Value={bool_value}, Time Stamp={message_data['timestamp']}" + ) + return ParsedData( + can_id, signal_name, bool_value, message_data["timestamp"] + ) + + +if __name__ == "__main__": + # Create a CAN bus connection + bus = can.interface.Bus(channel="can0", bustype="socketcan") + + # set up listener + listener = MyListener() + + # A Notifier runs in the background and listens for messages. When a new message arrives, it calls on_message in MyListener. + notifier = can.Notifier(bus, [listener]) + + try: + # create an infinite loop to keep listening to messages. + logging.debug("Listening for CAN messages... Press Ctrl+C to stop.") + parser = argparse.ArgumentParser(description="Specify the CAN channel.") + # Define a positional argument for channel + parser.add_argument("channel", type=str, help="CAN channel (e.g., can0, vcan0)") + args = parser.parse_args() + transmit_can_message() + while True: + time.sleep(1) + # Infinite loop to keep listening + except KeyboardInterrupt: + logging.debug("Stopping CAN receiver.") + notifier.stop() + bus.shutdown() diff --git a/sc1-data-format b/sc1-data-format new file mode 160000 index 0000000..f4fefb0 --- /dev/null +++ b/sc1-data-format @@ -0,0 +1 @@ +Subproject commit f4fefb0c3785ddf4cd14c90d0035174d4a505e8e diff --git a/send_messages.py b/send_messages.py new file mode 100644 index 0000000..2aba67a --- /dev/null +++ b/send_messages.py @@ -0,0 +1,74 @@ +#!/usr/bin/env python3 +import can +import argparse +import logging + + +def transmit_can_message(channel="can0", bustype="socketcan"): + """ + Initializes the CAN bus, prompts the user for message parameters, + and transmits the CAN message. + """ + # Initialize the CAN bus interface + bus = can.interface.Bus(channel=channel, bustype=bustype) + + # Prompt for the arbitration ID (CAN ID), this gets the arbitration id from the user and validates it + arb_id_input = input("Enter 3-digit hex message id (e.g. 0x123): ").strip() + try: + # Accept input as hex if it starts with 0x, otherwise assume hex too + arbitration_id = int(arb_id_input, 16) + except ValueError: + logging.error("Invalid data byte input.") + bus.shutdown() + return + + # Prompt for data bytes input (as hex values) accepts users inputs as data. + data_input = input( + "Enter data bytes as hex values separated by spaces (e.g. '11 22 33 44'): " + ).strip() + try: + data_bytes = [] + for byte_str in data_input.split(): + # Convert each input to an integer (assumed hex) + data_byte = int(byte_str, 16) + if not (0 <= data_byte <= 255): + logging.debug(f"Byte out of range (0-255): {data_byte}") + bus.shutdown() + return + data_bytes.append(data_byte) + except ValueError: + logging.error("Invalid data byte input.") + bus.shutdown() + return + + # Create the CAN message + msg = can.Message( + arbitration_id=arbitration_id, data=data_bytes, is_extended_id=False + ) + + # Send message using bus.send() + try: + bus.send(msg) + logging.debug( + f"Message sent successfully!\n Message Details: ID={msg.arbitration_id}, Data={msg.data}" + ) + except can.CanError as e: + logging.error(f"Message failed to send: {e}") + + # Cleanly shutdown the bus connection + bus.shutdown() + + +if __name__ == "__main__": + # Allow user to input CAN channel + parser = argparse.ArgumentParser(description="Specify the CAN channel.") + # Define a positional argument for channel + parser.add_argument("channel", type=str, help="CAN channel (e.g., can0, vcan0)") + args = parser.parse_args() + # add logging + logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(levelname)s - %(message)s", + datefmt="%Y-%m-%d %H:%M:%S", + ) + transmit_can_message(args.channel)