From a8c882462aa67168653a0c5f5cb62e2f2bfcfaae Mon Sep 17 00:00:00 2001 From: Enrico Moro Date: Mon, 14 Jul 2025 08:19:47 +0200 Subject: [PATCH 1/5] added function to calculate max allowable payload for a given config --- .../dynamic_payload_analysis_core/core.py | 30 +++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/dynamic_payload_analysis_core/dynamic_payload_analysis_core/core.py b/dynamic_payload_analysis_core/dynamic_payload_analysis_core/core.py index cdfa3dd..0ec6f13 100644 --- a/dynamic_payload_analysis_core/dynamic_payload_analysis_core/core.py +++ b/dynamic_payload_analysis_core/dynamic_payload_analysis_core/core.py @@ -478,6 +478,36 @@ def get_valid_workspace(self, range : int, resolution : int, end_effector_name_l return valid_configurations + # TODO : test it + def compute_maximum_payloads(self, configs : np.ndarray): + """ + Compute the maximum payload for each provided configuration and return the results with the configs updated with the maximum payload as a new value. + :param configs: Array of configurations , format {"config", "end_effector_pos", "tau", "arm", "max_payload" } + """ + # payload limit + payload_limit = 10 + resolution_payload = 0.01 + + for config in configs: + # iterate over a possible set of payloads + for mass in np.arange(0, payload_limit , resolution_payload): + # Create external forces based on the masses and checked frames TODO change name in arm 7 for example + ext_forces = self.create_ext_force(mass, f"gripper_{config['arm']}_finger_joint", config["config"]) + # calculate the current torques + tau = self.compute_inverse_dynamics(config["config"], self.get_zero_velocity(), self.get_zero_acceleration(),extForce=ext_forces) + + if not self.check_effort_limits(tau,config["arm"]).all(): + # if the current payload is not valid, the maximum payload is the previous one + config["max_payload"] = mass - resolution_payload + break + else: + # if the current payload is valid, update the maximum payload + config["max_payload"] = mass + + + return configs + + def compute_forward_dynamics_aba_method(self, q : np.ndarray, qdot : np.ndarray, tau : np.ndarray, extForce : np.ndarray[pin.Force] = None) -> np.ndarray: """ From 890e968e7f567f46c859b7e171c60e7879457578 Mon Sep 17 00:00:00 2001 From: Enrico Moro Date: Mon, 14 Jul 2025 09:30:32 +0200 Subject: [PATCH 2/5] improvements in usage in the rviz panel --- .../menu_visual.py | 54 +++++++++++++++++-- 1 file changed, 51 insertions(+), 3 deletions(-) diff --git a/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/menu_visual.py b/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/menu_visual.py index 571703b..e4bcdb5 100644 --- a/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/menu_visual.py +++ b/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/menu_visual.py @@ -51,7 +51,7 @@ def __init__(self, node): self.selected_configuration = None # payload mass selection array - self.payload_selection = np.array([0.1, 0.2, 0.3, 0.4, 0.5, 1, 1.5, 1.8, 2.0, 2.5, 3.0, 3.5], dtype=float) + self.payload_selection = np.array([0.1, 0.2, 0.3, 0.4, 0.5, 1, 1.5, 1.8, 2.0, 2.5, 3.0, 3.5, 4.0, 4.5, 5.0, 5.5], dtype=float) # variable to store selection between torque limits or max torque in the current configuration for color visualization self.torque_color = TorqueVisualizationType.TORQUE_LIMITS # default is torque limits @@ -62,12 +62,16 @@ def __init__(self, node): # insert the reset payload button self.reset = self.menu_handler.insert('Reset payloads', parent=self.root_frames, callback=self.callback_reset) + # insert label for the color menu selection + self.label_color_selection = self.menu_handler.insert('Select torque visualization color :',callback=self.callback_label) + # insert the checker for visualization color choice between torque limits or max torque in the current configuration self.torque_limits_checker = self.menu_handler.insert('Torque limits',command=str(TorqueVisualizationType.TORQUE_LIMITS.value) , callback=self.callback_color_selection) self.max_torque_checker = self.menu_handler.insert('Max torque', command=str(TorqueVisualizationType.MAX_CURRENT_TORQUE.value) , callback=self.callback_color_selection) self.menu_handler.setVisible(self.torque_limits_checker, False) self.menu_handler.setVisible(self.max_torque_checker, False) + self.menu_handler.setVisible(self.label_color_selection, False) self.make_menu_marker('menu_frames') @@ -94,6 +98,15 @@ def insert_frame(self, name): self.menu_handler.reApply(self.server) self.server.applyChanges() + def callback_label(self, feedback): + """ + Callback for the label of color selection. Made only to avoid the error of missing callback. + + Args: + feedback: Feedback from the menu selection. + """ + pass + def callback_color_selection(self, feedback): """ Callback for torque selection type to change the visualization color in Rviz. @@ -151,6 +164,7 @@ def insert_dropdown_configuration(self, configuration : np.ndarray): # set visible true for color selection and put the default check state self.menu_handler.setVisible(self.torque_limits_checker, True) self.menu_handler.setVisible(self.max_torque_checker, True) + self.menu_handler.setVisible(self.label_color_selection, True) self.menu_handler.setCheckState(self.torque_limits_checker, MenuHandler.CHECKED) self.menu_handler.setCheckState(self.max_torque_checker, MenuHandler.UNCHECKED) @@ -223,9 +237,13 @@ def callback_reset(self, feedback): self.menu_handler.setCheckState(sub_item, MenuHandler.UNCHECKED) # check if the item is the reset payloads or compute workspace item and skip the unchanging of the check state - if item.title == "Reset payloads" or item.title == "Compute workspace": + if item.title == self.menu_handler.getTitle(self.reset) or item.title == self.menu_handler.getTitle(self.workspace_button): continue + if item.title == self.menu_handler.getTitle(self.label_color_selection): + self.menu_handler.setVisible(self.label_color_selection, False) + continue + # set the checked of frame to unchecked self.menu_handler.setCheckState(i,MenuHandler.UNCHECKED) @@ -250,6 +268,11 @@ def callback_selection(self, feedback): Args: feedback: Feedback from the menu selection. """ + + # reset the sub-menu configuration if the user change payload selection + self.reset_sub_menu_configuration() + # ------------- + # get name of the frame handle = feedback.menu_entry_id title = self.menu_handler.getTitle(handle) @@ -269,7 +292,29 @@ def callback_selection(self, feedback): # print the current state of the checked frames print(f"{self.get_item_state()} \n") - + + def reset_sub_menu_configuration(self): + """ + Function to reset the sub-menu configuration and related variables. + """ + # reset calculated configurations in the workspace submenu + for i, item in self.menu_handler.entry_contexts_.items(): + # check if the item is the workspace button and has sub entries and remove them from view + if item.sub_entries and item.title == self.menu_handler.getTitle(self.workspace_button): + # if the frame has payloads selection, we need to remove it + for sub_item in item.sub_entries: + self.menu_handler.setVisible(sub_item, False) + self.menu_handler.setCheckState(sub_item, MenuHandler.UNCHECKED) + + # hide the torque limits and max torque checkboxes when there is no configuration selected + self.menu_handler.setVisible(self.torque_limits_checker, False) + self.menu_handler.setVisible(self.max_torque_checker, False) + self.menu_handler.setVisible(self.label_color_selection, False) + + self.torque_color = TorqueVisualizationType.TORQUE_LIMITS # reset to default torque limits + + # reset the selected configuration + self.selected_configuration = None def update_item(self, name, check: bool): @@ -328,6 +373,9 @@ def update_payload_selection(self, feedback): Args: feedback: Feedback from the menu selection. """ + # reset the sub-menu configuration if the user change payload selection + self.reset_sub_menu_configuration() + # get the handle of the selected item (id) handle = feedback.menu_entry_id # get the title of the selected item (it contains number of kg) From 8f73ee4537e5cd3eb9dd0985493cd35aae0428a0 Mon Sep 17 00:00:00 2001 From: Enrico Moro Date: Mon, 14 Jul 2025 10:28:05 +0200 Subject: [PATCH 3/5] added calculation for max allowable payload in the calculated configurations of workspace --- .../dynamic_payload_analysis_core/core.py | 53 +++---------------- .../menu_visual.py | 3 +- .../rviz_visualization_menu.py | 19 ++++--- 3 files changed, 20 insertions(+), 55 deletions(-) diff --git a/dynamic_payload_analysis_core/dynamic_payload_analysis_core/core.py b/dynamic_payload_analysis_core/dynamic_payload_analysis_core/core.py index 0ec6f13..7f70440 100644 --- a/dynamic_payload_analysis_core/dynamic_payload_analysis_core/core.py +++ b/dynamic_payload_analysis_core/dynamic_payload_analysis_core/core.py @@ -25,8 +25,6 @@ from ikpy import chain import os -# TODO : If a workspace calculation is already done, store the results in a file to avoid recalculating it and make only the -# verification of the payload handling in the workspace. @@ -189,45 +187,6 @@ def update_configuration(self, q : np.ndarray): """ pin.forwardKinematics(self.model, self.data, q) pin.updateFramePlacements(self.model, self.data) - - - def compute_maximum_payload(self, q : np.ndarray, qdot : np.ndarray, tau : np.ndarray, frame_name : str) -> float: - """ - Compute the forward dynamics acceleration vector. - - :param q: Joint configuration vector. - :param qdot: Joint velocity vector. - :param tau: Joint torque vector. - :param frame_name: Name of the frame where the force is applied. - :return: Acceleration vector. - """ - # TODO : refactor this method - - # Update the configuration of the robot model - self.update_configuration(q) - - # basic idea for forward dynamics: M(q) * a_q + b = tau(q) --> a_q = (tau(q) - b)/ M(q) - # with external force, the equation becomes: M(q) * a_q + b = tau(q) + J_traspose(q) * f_ext -- > f_ext = (M(q) * a_q + b - tau(q))/ J_traspose(q) - - qddot0 = np.zeros(self.model.nv) # Initialize acceleration vector - - # calculate dynamics drift - b = pin.rnea(self.model, self.data, q, qdot, qddot0) - - #get jacobian of the frame where the payload is applied - J = self.compute_jacobian(q, frame_name) - - tau_r = b - tau - - try: - #a = np.linalg.solve(M, tau - b) - # get F = (J_transpose(q))^-1 X ( tau - b ) with b = M(q)*a_q + b || pinv = pseudo-inverse to prevent singularities - F_max = np.linalg.pinv(J.T) @ tau_r - - except np.linalg.LinAlgError as e: - raise ValueError(f"Failed to solve for acceleration: {e}") - - return F_max[2] # get the force in z axis of the world frame, which is the maximum force payload @@ -471,28 +430,30 @@ def get_valid_workspace(self, range : int, resolution : int, end_effector_name_l self.configurations_l = self.compute_all_configurations(range,resolution, end_effector_name_left) # Compute all configurations within the specified range for the right arm self.configurations_r = self.compute_all_configurations(range,resolution, end_effector_name_right) - + + # TODO make here the calculation for the maximum payload for each configuration in order to increase the speed of the verification + # Verify the configurations to check if they are valid for both arms valid_configurations = self.verify_configurations(self.configurations_l,self.configurations_r, masses, checked_frames) return valid_configurations - # TODO : test it + def compute_maximum_payloads(self, configs : np.ndarray): """ Compute the maximum payload for each provided configuration and return the results with the configs updated with the maximum payload as a new value. :param configs: Array of configurations , format {"config", "end_effector_pos", "tau", "arm", "max_payload" } """ - # payload limit + # payload limits payload_limit = 10 resolution_payload = 0.01 for config in configs: # iterate over a possible set of payloads for mass in np.arange(0, payload_limit , resolution_payload): - # Create external forces based on the masses and checked frames TODO change name in arm 7 for example - ext_forces = self.create_ext_force(mass, f"gripper_{config['arm']}_finger_joint", config["config"]) + # Create external forces based on the masses and checked frames + ext_forces = self.create_ext_force(mass, f"arm_{config['arm']}_7_joint", config["config"]) # calculate the current torques tau = self.compute_inverse_dynamics(config["config"], self.get_zero_velocity(), self.get_zero_acceleration(),extForce=ext_forces) diff --git a/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/menu_visual.py b/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/menu_visual.py index e4bcdb5..b35811a 100644 --- a/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/menu_visual.py +++ b/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/menu_visual.py @@ -157,7 +157,7 @@ def insert_dropdown_configuration(self, configuration : np.ndarray): for i, item in enumerate(configuration): # insert the parent in the command field to keep track of the parent id - last_entry = self.menu_handler.insert(f"Configuration {i} | arm: " + item["arm"], parent=self.workspace_button, command=str(self.workspace_button), callback=self.callback_configuration_selection) + last_entry = self.menu_handler.insert(f"Configuration {i} | arm: " + item["arm"] + " | max payload : " + str(item["max_payload"]) , parent=self.workspace_button, command=str(self.workspace_button), callback=self.callback_configuration_selection) self.menu_handler.setCheckState(last_entry, MenuHandler.UNCHECKED) self.menu_handler.setVisible(last_entry, True) @@ -165,6 +165,7 @@ def insert_dropdown_configuration(self, configuration : np.ndarray): self.menu_handler.setVisible(self.torque_limits_checker, True) self.menu_handler.setVisible(self.max_torque_checker, True) self.menu_handler.setVisible(self.label_color_selection, True) + self.menu_handler.setCheckState(self.torque_limits_checker, MenuHandler.CHECKED) self.menu_handler.setCheckState(self.max_torque_checker, MenuHandler.UNCHECKED) diff --git a/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/rviz_visualization_menu.py b/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/rviz_visualization_menu.py index a6825dd..02f678d 100644 --- a/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/rviz_visualization_menu.py +++ b/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/rviz_visualization_menu.py @@ -163,7 +163,9 @@ def workspace_calculation(self): """ # if the user choose to compute the workspace area then compute the valid configurations if self.menu.get_workspace_state(): - self.valid_configurations = self.robot.get_valid_workspace(2, 0.20, "gripper_left_finger_joint", "gripper_right_finger_joint", self.masses, self.checked_frames) + self.valid_configurations = self.robot.get_valid_workspace(2, 0.20, "arm_left_7_joint", "arm_right_7_joint", self.masses, self.checked_frames) + + self.valid_configurations = self.robot.compute_maximum_payloads(self.valid_configurations) # insert the valid configurations in the menu self.menu.insert_dropdown_configuration(self.valid_configurations) @@ -177,7 +179,7 @@ def workspace_calculation(self): # if there are valid configurations, publish the workspace area if self.valid_configurations is not None: # publish the workspace area - self.publish_workspace_area(self.valid_configurations) + self.publish_workspace_area() @@ -324,12 +326,10 @@ def publish_label_torques(self, torque: np.ndarray, status_torques : np.ndarray, self.publisher_rviz_torque.publish(marker_array) - def publish_workspace_area(self, valid_configs: np.ndarray ): + def publish_workspace_area(self): """ Publish the workspace area in RViz using points and labels for the end points. - Args: - valid_configs (np.ndarray): Current valid configurations in the workspace of the robot. """ # Create a MarkerArray to visualize the number of configuration of a specific point in the workspace marker_point_names = MarkerArray() @@ -339,11 +339,14 @@ def publish_workspace_area(self, valid_configs: np.ndarray ): # calculate the maximum torques for each joint in the current valid configurations for each arm only if the user selected the max current torque visualization if self.menu.get_torque_color() == TorqueVisualizationType.MAX_CURRENT_TORQUE: - max_torque_for_joint_left, max_torque_for_joint_right = self.robot.get_maximum_torques(valid_configs) + max_torque_for_joint_left, max_torque_for_joint_right = self.robot.get_maximum_torques(self.valid_configurations) + + + cont = 0 # Iterate through the valid configurations and create markers - for i, valid_config in enumerate(valid_configs): + for i, valid_config in enumerate(self.valid_configurations): # create the label for the end point (end effector position) of the valid configuration marker_point_name = Marker() @@ -414,7 +417,7 @@ def publish_workspace_area(self, valid_configs: np.ndarray ): # get the unified torque for the valid configurations - unified_configurations_torque = self.robot.get_unified_configurations_torque(valid_configs) + unified_configurations_torque = self.robot.get_unified_configurations_torque(self.valid_configurations) # insert points related to the end effector position in the workspace area and with color based on the normalized torque for each configuration # this is used to visualize the workspace area with the unified torques for each configuration From 4f719b5e2a71918c69f919b667affd95021061f5 Mon Sep 17 00:00:00 2001 From: Enrico Moro Date: Mon, 14 Jul 2025 14:13:19 +0200 Subject: [PATCH 4/5] fix personal name --- dynamic_payload_analysis_core/package.xml | 2 +- dynamic_payload_analysis_ros/package.xml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/dynamic_payload_analysis_core/package.xml b/dynamic_payload_analysis_core/package.xml index 3b5dfae..faf863a 100644 --- a/dynamic_payload_analysis_core/package.xml +++ b/dynamic_payload_analysis_core/package.xml @@ -4,7 +4,7 @@ dynamic_payload_analysis_core 0.0.0 Core package with calculation for torques calculator - morolinux + Moro Enrico Apache License 2.0 ament_copyright diff --git a/dynamic_payload_analysis_ros/package.xml b/dynamic_payload_analysis_ros/package.xml index f39adf4..51ba95e 100644 --- a/dynamic_payload_analysis_ros/package.xml +++ b/dynamic_payload_analysis_ros/package.xml @@ -4,7 +4,7 @@ dynamic_payload_analysis_ros 0.0.0 This package provides tools for dynamic payload analysis in robotics with a focus on torque calculations and external force handling. - morolinux + Moro Enrico Apache License 2.0 ament_copyright From c96daf7c65cbcd00840ac5e94ecb38c370b7f3fd Mon Sep 17 00:00:00 2001 From: Enrico Moro Date: Mon, 14 Jul 2025 15:15:19 +0200 Subject: [PATCH 5/5] improved usage in rviz panel --- doc/README.md | 3 - .../dynamic_payload_analysis_core/__init__.py | 0 .../dynamic_payload_analysis_core/core.py | 1005 ----------------- dynamic_payload_analysis_core/package.xml | 18 - .../resource/dynamic_payload_analysis_core | 0 dynamic_payload_analysis_core/setup.cfg | 4 - dynamic_payload_analysis_core/setup.py | 25 - .../test/test_copyright.py | 25 - .../test/test_flake8.py | 25 - .../test/test_pep257.py | 23 - .../dynamic_payload_analysis_ros/__init__.py | 0 .../menu_visual.py | 3 +- .../rviz_visualization_menu.py | 488 -------- dynamic_payload_analysis_ros/package.xml | 18 - .../resource/dynamic_payload_analysis_ros | 0 dynamic_payload_analysis_ros/setup.cfg | 4 - dynamic_payload_analysis_ros/setup.py | 27 - .../test/test_copyright.py | 25 - .../test/test_flake8.py | 25 - .../test/test_pep257.py | 23 - 20 files changed, 1 insertion(+), 1740 deletions(-) delete mode 100644 doc/README.md delete mode 100644 dynamic_payload_analysis_core/dynamic_payload_analysis_core/__init__.py delete mode 100644 dynamic_payload_analysis_core/dynamic_payload_analysis_core/core.py delete mode 100644 dynamic_payload_analysis_core/package.xml delete mode 100644 dynamic_payload_analysis_core/resource/dynamic_payload_analysis_core delete mode 100644 dynamic_payload_analysis_core/setup.cfg delete mode 100644 dynamic_payload_analysis_core/setup.py delete mode 100644 dynamic_payload_analysis_core/test/test_copyright.py delete mode 100644 dynamic_payload_analysis_core/test/test_flake8.py delete mode 100644 dynamic_payload_analysis_core/test/test_pep257.py delete mode 100644 dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/__init__.py delete mode 100644 dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/rviz_visualization_menu.py delete mode 100644 dynamic_payload_analysis_ros/package.xml delete mode 100644 dynamic_payload_analysis_ros/resource/dynamic_payload_analysis_ros delete mode 100644 dynamic_payload_analysis_ros/setup.cfg delete mode 100644 dynamic_payload_analysis_ros/setup.py delete mode 100644 dynamic_payload_analysis_ros/test/test_copyright.py delete mode 100644 dynamic_payload_analysis_ros/test/test_flake8.py delete mode 100644 dynamic_payload_analysis_ros/test/test_pep257.py diff --git a/doc/README.md b/doc/README.md deleted file mode 100644 index 66a490b..0000000 --- a/doc/README.md +++ /dev/null @@ -1,3 +0,0 @@ -# Documentation - -This file contains the doucmentation for the user setup and testing anf also to server as a tutorials for the users. \ No newline at end of file diff --git a/dynamic_payload_analysis_core/dynamic_payload_analysis_core/__init__.py b/dynamic_payload_analysis_core/dynamic_payload_analysis_core/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/dynamic_payload_analysis_core/dynamic_payload_analysis_core/core.py b/dynamic_payload_analysis_core/dynamic_payload_analysis_core/core.py deleted file mode 100644 index 7f70440..0000000 --- a/dynamic_payload_analysis_core/dynamic_payload_analysis_core/core.py +++ /dev/null @@ -1,1005 +0,0 @@ - -# Copyright (c) 2025 PAL Robotics S.L. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -import pinocchio as pin -import numpy as np -import math -from typing import Union -from pathlib import Path -from numpy.linalg import norm, solve -from optik import Robot, SolverConfig -import tempfile -from ikpy import chain -import os - - - - -class TorqueCalculator: - def __init__(self, robot_description : Union[str, Path]): - """ - Initialize the Torques_calculator with the URDF model or XML format provided by robot_description topic. - - :param urdf_path: Path to the URDF file of the robot. - :param robot_description: Robot description in XML format provided by /robot_description topic or path of URDF file. - """ - - # Load the robot model from path or XML string - if isinstance(robot_description, str): - self.model = pin.buildModelFromXML(robot_description) - - # TODO change parser in general for more unique solution - - # create temporary URDF file from the robot description string - with tempfile.NamedTemporaryFile(mode='w', suffix='.urdf', delete=False) as temp_file: - temp_file.write(robot_description) - temp_urdf_path = temp_file.name - - self.geom_model = pin.buildGeomFromUrdf(self.model,temp_urdf_path,pin.GeometryType.COLLISION) - - # Add collisition pairs - self.geom_model.addAllCollisionPairs() - - os.unlink(temp_urdf_path) - - elif isinstance(robot_description, Path): - self.model = pin.buildModelFromUrdf(str(robot_description.resolve())) - - # create data for the robot model - self.data = self.model.createData() - self.geom_data = pin.GeometryData(self.geom_model) - - # get the default collisions in the robot model to avoid take them into account in the computations - self.default_collisions = self.compute_static_collisions() - - # create array to store all possible configurations for left and right arms - self.configurations_l = None - self.configurations_r = None - - - def compute_static_collisions(self): - """ - Compute the static collisions for the robot model. - This method is used to compute the collisions in the robot model in the zero configuration. - """ - - # array to store the collision pairs - collision_pairs = [] - - # Compute all the collisions - pin.computeCollisions(self.model, self.data, self.geom_model, self.geom_data, self.get_zero_configuration(), False) - - # Print the status of collision for all collision pairs - for k in range(len(self.geom_model.collisionPairs)): - cr = self.geom_data.collisionResults[k] - cp = self.geom_model.collisionPairs[k] - if cr.isCollision(): - print(f"Collision between {cp.first} and {cp.second} detected.") - collision_pairs.append((cp.first, cp.second, k)) - - return collision_pairs - - - - - def compute_inverse_dynamics(self, q : np.ndarray , qdot : np.ndarray, qddot : np.ndarray, extForce : np.ndarray[pin.Force] = None) -> np.ndarray: - """ - Compute the inverse dynamics torque vector. - - :param q: Joint configuration vector. - :param qdot: Joint velocity vector. - :param qddot: Joint acceleration vector. - :param extForce: External forces vector applied to the robot model. - :return: Torques vector - """ - - # basic equation for inverse dynamics : M(q) *a + b = tau + J(q)_t * extForce --> tau = M(q) * a + b - J(q)_t * extForce - - if extForce: - tau = pin.rnea(self.model, self.data, q, qdot, qddot, extForce) - else: - tau = pin.rnea(self.model, self.data, q, qdot, qddot) - - if tau is None: - raise ValueError("Failed to compute torques") - - return tau - - - def create_ext_force(self, masses : Union[float, np.ndarray] , frame_name : Union[str | np.ndarray], q : np.ndarray) -> np.ndarray[pin.Force]: - """ - Create external forces vector based on the masses and frame ID. - The resulting vector will contain the force applied to the specified frame and with the local orientation of the parent joint. - - :param masses (float, np.ndarray) : Mass of the object to apply the force to or vector with masses related to frames names. - :param frame_name(str , np.ndarray) : Frame name where the force is applied or vector of frame names where the forces is applied. - :param q: Joint configuration vector. - :return: External force vector. - """ - if isinstance(masses, float): - if masses < 0: - raise ValueError("Mass must be a positive value") - - if frame_name is None: - raise ValueError("Frame name must be provided") - - #if frame_name not in self.model.frames: - # raise ValueError(f"Frame name '{frame_name}' not found in the robot model") - - # assumption made : the force is applied to the joint - - # Initialize external forces array - fext = [pin.Force(np.zeros(6)) for _ in range(self.model.njoints)] - - self.update_configuration(q) - - # Check if frame_name is a single string or an array of strings - if isinstance(frame_name, str): - # Get the frame ID and joint ID from the frame name - frame_id = self.model.getFrameId(frame_name) - joint_id = self.model.frames[frame_id].parentJoint - - # force expressed in the world frame (gravity force in z axis) - ext_force_world = pin.Force(np.array([0.0, 0.0, masses * -9.81]), np.array([0.0, 0.0, 0.0])) - - # placement of the frame in the world frame - #frame_placement = self.data.oMf[frame_id] - #print(f"Frame placement: {frame_placement}") - - # Convert the external force expressed in the world frame to the orientation of the joint frame where the force is applied - fext[joint_id] = self.data.oMi[joint_id].actInv(ext_force_world) - # Zero out the last 3 components (torques) of the force to ensure only the force in z axis (gravity force) is applied - fext[joint_id].angular = np.zeros(3) # TODO : make it more efficient - - else: - for mass, frame in zip(masses,frame_name): - frame_id = self.model.getFrameId(frame) - joint_id = self.model.frames[frame_id].parentJoint - - # force expressed in the world frame (gravity force in z axis) - ext_force_world = pin.Force(np.array([0.0, 0.0, mass * -9.81]), np.array([0.0, 0.0, 0.0])) - # Convert the external force expressed in the world frame to the orientation of the joint frame where the force is applied - fext[joint_id] = self.data.oMi[joint_id].actInv(ext_force_world) - # Zero out the last 3 components (torques) of the force to ensure only the force in z axis (gravity force) is applied - fext[joint_id].angular = np.zeros(3) - - return fext - - - def update_configuration(self, q : np.ndarray): - """ - Update the robot model configuration with the given joint configuration vector. - - :param q: Joint configuration vector. - """ - pin.forwardKinematics(self.model, self.data, q) - pin.updateFramePlacements(self.model, self.data) - - - - def compute_inverse_kinematics_optik(self, q : np.ndarray, end_effector_position: np.ndarray) -> np.ndarray: - """ - Compute the inverse kinematics for the robot model using the Optik library. - - :param q: current joint configuration vector. - :param end_effector_position: Position of the end effector in the world frame [rotation matrix , translation vector]. - :return: Joint configuration vector that achieves the desired end effector position. - """ - # TODO : It doees not work with the current version of the library - - # Compute the inverse kinematics - sol = self.ik_model.ik(self.ik_config, end_effector_position, q) - - return sol - - - - def compute_inverse_kinematics_ikpy(self, q : np.ndarray, end_effector_position: np.ndarray) -> np.ndarray: - """ - Compute the inverse kinematics for the robot model using the ikpy library. - - :param q: current joint configuration vector. - :param end_effector_position: Position of the end effector in the world frame [rotation matrix , translation vector]. - :return: Joint configuration vector that achieves the desired end effector position. - """ - # TODO : It doees not work with the current version of the library - - # Compute the inverse kinematics - sol = self.ik_model.inverse_kinematics(end_effector_position) - - return sol - - - - def compute_inverse_kinematics(self, q : np.ndarray, end_effector_position: np.ndarray, end_effector_name : str) -> np.ndarray: - """ - Compute the inverse kinematics for the robot model with joint limits consideration. - :param q: current joint configuration vector. - :param end_effector_position: Position of the end effector in the world frame [rotation matrix , translation vector]. - :param end_effector_name: Name of the end effector joint. - :return: Joint configuration vector that achieves the desired end effector position. - """ - - joint_id = self.model.getJointId(end_effector_name) # Get the joint ID of the end effector - - # Set parameters for the inverse kinematics solver - eps = 1e-3 # reduce for more precision - IT_MAX = 500 # Maximum number of iterations - DT = 1e-1 - damp = 1e-12 - - i = 0 - while True: - self.update_configuration(q) - iMd = self.data.oMi[joint_id].actInv(end_effector_position) # Get the transformation from the current end effector pose to the desired pose - - err = pin.log(iMd).vector # compute the error in the end effector position - if norm(err[:3]) < eps: - success = True - break - if i >= IT_MAX: - success = False - break - - J = pin.computeJointJacobian(self.model, self.data, q, joint_id) # compute the Jacobian of current pose of end effector - #J = -np.dot(pin.Jlog6(iMd.inverse()), J) - J = -J[:3, :] - - # compute the inverse kinematics v = -J^T * (J * J^T + damp * I)^-1 * err - v = -J.T.dot(solve(J.dot(J.T) + damp * np.eye(3), err[:3])) - - # Apply joint limits by clamping the resulting configuration - q_new = pin.integrate(self.model, q, v * DT) - # Ensure the new configuration is within joint limits - q_new = np.clip(q_new, self.model.lowerPositionLimit, self.model.upperPositionLimit) - - # Check if we're hitting joint limits and reduce step size if needed - if np.allclose(q_new, q, atol=1e-6): - DT *= 0.5 # Reduce step size if no progress due to joint limits - if DT < 1e-6: # Minimum step size threshold - success = False - break - - q = q_new - - # if not i % 10: - # print(f"{i}: error = {err.T}") - i += 1 - - if success: - print(f"Convergence achieved! in {i} iterations") - return q - else: - print( - "\n" - "Warning: the iterative algorithm has not reached convergence to the desired precision" - ) - return None # Return None if convergence is not achieved - - - - def compute_all_configurations(self, range : int, resolution : int, end_effector_name : str) -> np.ndarray: - """ - Compute all configurations for the robot model within a specified range. - - :param range (int): Range as side of a square where in the center there is the actual position of end effector. - :param resolution (int): Resolution of the grid to compute configurations. - :param end_effector_name (str): Name of the end effector joint. - :return : Array of joint configurations that achieve the desired end effector position. - """ - - if range <= 0: - raise ValueError("Range must be a positive value") - - # Get the current joint configuration - q = self.get_zero_configuration() - - #id_end_effector = self.model.getJointId(end_effector_name) - # Get the current position of the end effector - #end_effector_pos = self.data.oMi[id_end_effector] - - # Create an array to store all configurations - configurations = [] - - # Iterate over the range to compute all configurations - for x in np.arange(-range/2, range/2 , resolution): - for y in np.arange(-range/2, range/2 , resolution): - for z in np.arange(0, range , resolution): - target_position = pin.SE3(np.eye(3), np.array([x, y, z])) - new_q = self.compute_inverse_kinematics(q, target_position, end_effector_name) - - if new_q is not None: - q = new_q - # store the valid configuration and the position of the end effector relative to that configuration - configurations.append({"config" : new_q, "end_effector_pos": target_position.translation}) - - return np.array(configurations, dtype=object) - - - - def verify_configurations(self, configurations_left : np.ndarray, configurations_right : np.ndarray, masses : np.ndarray, checked_frames : np.ndarray) -> np.ndarray: - """ - Verify the configurations to check if they are valid. - - :param configurations_left: Array of joint configurations to verify for the left arm. - :param configurations_right: Array of joint configurations to verify for the right arm. - :param ext_forces: Array of external forces to apply to the robot model. - :return: Array of valid configurations with related torques in format: [{"config", "end_effector_pos, "tau"}]. - """ - - valid_configurations = [] - - # check valid configurations for left arm - for q in configurations_left: - # Update the configuration of the robot model - self.update_configuration(q["config"]) - - if masses is not None and checked_frames is not None: - # Create external forces based on the masses and checked frames - ext_forces = self.create_ext_force(masses, checked_frames, q["config"]) - # Compute the inverse dynamics for the current configuration - tau = self.compute_inverse_dynamics(q["config"], self.get_zero_velocity(), self.get_zero_acceleration(),extForce=ext_forces) - else: - # Compute the inverse dynamics for the current configuration without external forces - tau = self.compute_inverse_dynamics(q["config"], self.get_zero_velocity(), self.get_zero_acceleration()) - - # Check if the torques are within the effort limits - if self.check_effort_limits(tau,"left").all(): - valid = True - # Compute all the collisions - pin.computeCollisions(self.model, self.data, self.geom_model, self.geom_data, q["config"], False) - - # Print the status of collision for all collision pairs - for k in range(len(self.geom_model.collisionPairs)): - cr = self.geom_data.collisionResults[k] - cp = self.geom_model.collisionPairs[k] - - if cr.isCollision() and (cp.first, cp.second, k) not in self.default_collisions: - print(f"Collision detected between {cp.first} and {cp.second} in the left arm configuration.") - valid = False - break - - if valid: - valid_configurations.append({"config" : q["config"], "end_effector_pos" : q["end_effector_pos"], "tau" : tau, "arm" : "left" }) - - # check valid configurations for right arm - for q in configurations_right: - # Update the configuration of the robot model - self.update_configuration(q["config"]) - - if masses is not None and checked_frames is not None: - # Create external forces based on the masses and checked frames - ext_forces = self.create_ext_force(masses, checked_frames, q["config"]) - # Compute the inverse dynamics for the current configuration - tau = self.compute_inverse_dynamics(q["config"], self.get_zero_velocity(), self.get_zero_acceleration(),extForce=ext_forces) - else: - # Compute the inverse dynamics for the current configuration without external forces - tau = self.compute_inverse_dynamics(q["config"], self.get_zero_velocity(), self.get_zero_acceleration()) - - # Check if the torques are within the effort limits - if self.check_effort_limits(tau,"right").all(): - valid = True - # Compute all the collisions - pin.computeCollisions(self.model, self.data, self.geom_model, self.geom_data, q["config"], False) - - # Print the status of collision for all collision pairs - for k in range(len(self.geom_model.collisionPairs)): - cr = self.geom_data.collisionResults[k] - cp = self.geom_model.collisionPairs[k] - - if cr.isCollision() and (cp.first, cp.second, k) not in self.default_collisions: - print(f"Collision detected between {cp.first} and {cp.second} in the right arm configuration.") - valid = False - break - - if valid: - valid_configurations.append({"config" : q["config"], "end_effector_pos" : q["end_effector_pos"], "tau" : tau, "arm" : "right" }) - - - return np.array(valid_configurations, dtype=object) - - - def get_valid_workspace(self, range : int, resolution : int, end_effector_name_left : str, end_effector_name_right, masses : np.ndarray, checked_frames: np.ndarray) -> np.ndarray: - """ - Get the valid workspace of the robot model by computing all configurations within a specified range. - - :param range (int): Range as side of a square where in the center there is the actual position of end effector. - :param resolution (int): Resolution of the grid to compute configurations. - :param end_effector_name_left (str): Name of the end effector joint to test for the left arm. - :param end_effector_name_right (str): Name of the end effector joint to test for the right arm. - :param masses (np.ndarray): Array of masses to apply to the robot model. - :param checked_frames (np.ndarray): Array of frame names where the external forces are applied. - :return: Array of valid configurations that achieve the desired end effector position in format: [{"config", "end_effector_pos, "tau", "arm"}]. - """ - # compute all configurations for the left and right arms if they are not already computed - if self.configurations_l is None or self.configurations_r is None: - # Compute all configurations within the specified range for the left arm - self.configurations_l = self.compute_all_configurations(range,resolution, end_effector_name_left) - # Compute all configurations within the specified range for the right arm - self.configurations_r = self.compute_all_configurations(range,resolution, end_effector_name_right) - - # TODO make here the calculation for the maximum payload for each configuration in order to increase the speed of the verification - - # Verify the configurations to check if they are valid for both arms - valid_configurations = self.verify_configurations(self.configurations_l,self.configurations_r, masses, checked_frames) - - return valid_configurations - - - - def compute_maximum_payloads(self, configs : np.ndarray): - """ - Compute the maximum payload for each provided configuration and return the results with the configs updated with the maximum payload as a new value. - :param configs: Array of configurations , format {"config", "end_effector_pos", "tau", "arm", "max_payload" } - """ - # payload limits - payload_limit = 10 - resolution_payload = 0.01 - - for config in configs: - # iterate over a possible set of payloads - for mass in np.arange(0, payload_limit , resolution_payload): - # Create external forces based on the masses and checked frames - ext_forces = self.create_ext_force(mass, f"arm_{config['arm']}_7_joint", config["config"]) - # calculate the current torques - tau = self.compute_inverse_dynamics(config["config"], self.get_zero_velocity(), self.get_zero_acceleration(),extForce=ext_forces) - - if not self.check_effort_limits(tau,config["arm"]).all(): - # if the current payload is not valid, the maximum payload is the previous one - config["max_payload"] = mass - resolution_payload - break - else: - # if the current payload is valid, update the maximum payload - config["max_payload"] = mass - - - return configs - - - - def compute_forward_dynamics_aba_method(self, q : np.ndarray, qdot : np.ndarray, tau : np.ndarray, extForce : np.ndarray[pin.Force] = None) -> np.ndarray: - """ - Compute the forward dynamics acceleration vector with Articulated-Body algorithm(ABA). - - :param q: Joint configuration vector. - :param qdot: Joint velocity vector. - :param tau: Joint torque vector. - :param extForce: External forces vector applied to the robot model. - :return: Acceleration vector. - """ - - # calculate dynamics drift - if extForce is not None: - ddq = pin.aba(self.model, self.data, q, qdot, tau, extForce) - else: - ddq = pin.aba(self.model, self.data, q, qdot, tau) - - return ddq - - - def compute_jacobian(self, q : np.ndarray, frame_name : str) -> np.ndarray: - """ - Get the Jacobian matrix for a specific frame in the robot model. - - :param q: Joint configuration vector. - :param frame_name: Name of the frame to compute the Jacobian for. - :return: Jacobian matrix. - """ - - # Get the frame ID - frame_id = self.model.getFrameId(frame_name) - joint_id = self.model.frames[frame_id].parentJoint - - # Compute the Jacobian - J = pin.computeJointJacobians(self.model, self.data, q) - - J_frame = pin.getJointJacobian(self.model, self.data, joint_id, pin.ReferenceFrame.LOCAL_WORLD_ALIGNED) - - if J_frame is None: - raise ValueError("Failed to compute Jacobian") - - return J_frame - - - def get_maximum_torques(self, valid_configs : np.ndarray) -> np.ndarray | np.ndarray: - """ - Get the maximum torques for each joint in all valid configurations. - - :param valid_configs: Array of valid configurations with related torques in format: [{"config", "end_effector_pos, "tau"}]. - :return: Arrays of maximum torques for each joint in the current valid configurations for left and right arm. - """ - - # Get the number of joints - num_joints = len(valid_configs[0]["tau"]) - max_torques_left = np.array([], dtype=float) - max_torques_right = np.array([], dtype=float) - - # Find maximum absolute torque for each joint - for i in range(num_joints): - joint_torques_left = [abs(config["tau"][i]) for config in valid_configs if config["arm"] == "left"] - joint_torques_right = [abs(config["tau"][i]) for config in valid_configs if config["arm"] == "right"] - - max_torques_left = np.append( max_torques_left, max(joint_torques_left)) - max_torques_right = np.append( max_torques_right, max(joint_torques_right)) - - return max_torques_left, max_torques_right - - - - - def get_normalized_torques(self, tau : np.ndarray, target_torque : np.ndarray = None) -> np.ndarray: - """ - Normalize the torques vector to a unified scale. - - :param tau: Torques vector to normalize. - :return: Normalized torques vector. - """ - if tau is None: - raise ValueError("Torques vector is None") - - norm_tau = [] - - # if target_torque is not specified, normalize the torques vector to the effort limits of the robot model - if target_torque is None: - # Normalize the torques vector - for i, torque in enumerate(tau): - norm_tau.append(abs(torque) / self.model.effortLimit[i]) - else: - # Normalize the torques vector to the target torque - for i, torque in enumerate(tau): - if target_torque[i] != 0: - norm_tau.append(abs(torque) / target_torque[i]) - else: - norm_tau.append(0.0) - - - return norm_tau - - - def get_unified_configurations_torque(self, valid_configs : np.ndarray) -> np.ndarray | np.ndarray: - """ - Get a unified sum of torques for all possible configurations of the robot model. - - :param q: Joint configuration vector. - :param valid_configs: Array of - """ - torques_sum = np.array([], dtype=float) - norm_torques = np.array([], dtype=float) - sum = 0.0 - - for valid_config in valid_configs: - # get the joint configuration and torques vector from the valid configuration - q = valid_config["config"] - tau = valid_config["tau"] - - # calculate the sum of torques for each joint configuration - for joint, torque in zip(q, tau): - #if abs(torque) < 50: - sum += abs(torque) - - torques_sum = np.append(torques_sum, {"sum" : sum, "end_effector_pos" : valid_config["end_effector_pos"], "arm" : valid_config["arm"]}) - sum = 0.0 # reset the sum for the next configuration - - # get the maximum torque from the sum of torques for both arms - max_torque_l = max([item["sum"] for item in torques_sum if item["arm"] == "left"]) - max_torque_r = max([item["sum"] for item in torques_sum if item["arm"] == "right"]) - - # Normalize the torques vector to a unified scale - for tau in torques_sum: - if tau["arm"] == "left": - norm_tau = tau["sum"] / max_torque_l - else: - norm_tau = tau["sum"] / max_torque_r - - # append the normalized torque to the array - norm_torques = np.append(norm_torques, {"norm_tau" : norm_tau, "end_effector_pos" : tau["end_effector_pos"], "arm" : tau["arm"]}) - - return norm_torques - - - def check_zero(self, vec : np.ndarray) -> bool: - """ - Checks if the vector is zero. - - :param vec: Vector to check. - :return: True if the acceleration vector is zero, False otherwise. - """ - - return np.allclose(vec, np.zeros(self.model.nv), atol=1e-6) - - - def get_zero_configuration(self) -> np.ndarray: - """ - Get the zero configuration of the robot model. - - :return: Zero configuration vector. - """ - q0 = np.zeros(self.model.nq) - if q0 is None: - raise ValueError("Failed to get zero configuration") - - return q0 - - - def get_zero_velocity(self) -> np.ndarray: - """ - Get the zero velocity vector of the robot model. - - :return: Zero velocity vector. - """ - v0 = np.zeros(self.model.nv) - if v0 is None: - raise ValueError("Failed to get zero velocity") - - return v0 - - - def get_zero_acceleration(self) -> np.ndarray: - """ - Get the zero acceleration vector of the robot model. - - :return: Zero acceleration vector. - """ - a0 = np.zeros(self.model.nv) - if a0 is None: - raise ValueError("Failed to get zero acceleration") - - return a0 - - - def get_random_configuration(self) -> tuple[np.ndarray, np.ndarray]: - """ - Get a random configuration for configuration and velocity vectors. - :return: Random configuration vectors. - """ - q_limits_lower = self.model.lowerPositionLimit - q_limits_upper = self.model.upperPositionLimit - - # generate random configuration vector within the limits - q = np.random.uniform(q_limits_lower, q_limits_upper) - - if q is None: - raise ValueError("Failed to get random configuration") - - # Generate random velocity vector within the limits - qdot = np.random.uniform(-self.model.velocityLimit, self.model.velocityLimit) - if qdot is None: - raise ValueError("Failed to get random velocity") - - return q, qdot - - - def check_joint_limits(self, q : np.ndarray) -> np.ndarray: - """ - Check if the joint configuration vector is within the joint limits of the robot model. - - :param q: Joint configuration vector to check. - :return: Array of booleans indicating if each joint is within the limits. - """ - if q is None: - raise ValueError("Joint configuration vector is None") - - # array to store if the joint is within the limits - within_limits = np.zeros(self.model.njoints - 1, dtype=bool) - - # Check if the joint configuration is within the limits - for i in range(self.model.njoints - 1): - if q[i] < self.model.lowerPositionLimit[i] or q[i] > self.model.upperPositionLimit[i]: - within_limits[i] = False - else: - within_limits[i] = True - - return within_limits - - - def check_effort_limits(self, tau : np.ndarray, arm : str = None) -> np.ndarray: - """ - Check if the torques vector is within the effort limits of the robot model. - - :param tau: Torques vector to check. - :param arm: Arm name to filter the torques vector, right or left. - :return: Array of booleans indicating if each joint is within the effort limits. - """ - if tau is None: - raise ValueError("Torques vector is None") - - # array to store if the joint is within the limits - within_limits = np.array([], dtype=bool) - - # if arm is not specified, check all joints - if arm is None: - # Check if the torques are within the limits - for i in range(self.model.njoints -1): - if abs(tau[i]) > self.model.effortLimit[i]: - print(f"\033[91mJoint {i+2} exceeds effort limit: {tau[i]} > {self.model.effortLimit[i]} \033[0m\n") - within_limits = np.append(within_limits, False) - else: - within_limits = np.append(within_limits, True) - - if np.all(within_limits): - print("All joints are within effort limits. \n") - - else: - # Check if the torques are within the limits - for i in range(self.model.njoints -1): - # TODO arm is "left" or right" but the model has gripper with left and right in the same name - if arm in self.model.names[i+1]: - if abs(tau[i]) > self.model.effortLimit[i]: - print(f"\033[91mJoint {i+1} exceeds effort limit: {tau[i]} > {self.model.effortLimit[i]} \033[0m\n") - within_limits = np.append(within_limits, False) - else: - within_limits = np.append(within_limits, True) - - if np.all(within_limits): - print("All joints are within effort limits. \n") - - - return within_limits - - - - def set_position(self, pos_joints : list[float], name_positions : list[str] ) -> np.ndarray: - """ - Convert joint positions provided by jointstate publisher to the right format of the robot model. - - :param pos_joints: List of joint positions provided by jointstate publisher. - :param name_positions: List of joint names in the order provided by jointstate publisher. - """ - - q = np.zeros(self.model.nq) - cont = 0 - - for i in range(1, np.size(pos_joints) + 1): - # find index in name positions list that corrisponds to the joint name - # (REASON: the joint position from joint_state message are not in the same order as the joint indices in model object) - index = name_positions.index(self.model.names[i]) - - if self.model.joints[i].nq == 2: - # for continuous joints (wheels) - q[cont] = math.cos(pos_joints[index]) - q[cont + 1] = math.sin(pos_joints[index]) - else: - # for revolute joints - q[cont] = pos_joints[index] - - cont += self.model.joints[i].nq - - self.update_configuration(q) - - return q - - - def get_position_for_joint_states(self, q : np.ndarray) -> np.ndarray: - """ - Convert configuration in pinocchio format to right format of joint states publisher - Example: continuous joints (wheels) are represented as two values in the configuration vector but - in the joint state publisher they are represented as one value (angle). - - :param q : Joint configuration provided by pinocchio library - :return: Joint positions in the format of joint state publisher. - """ - - config = [] - - current_selected_config = q - - # Use this method to get the single values for joints, for example continuous joints (wheels) are represented as two values in the configuration vector but - # in the joint state publisher they are represented as one value (angle) - # so we need to convert the configuration vector to the right format for joint state publisher - cont = 0 - for i in range(1, self.model.njoints): - if self.model.joints[i].nq == 2: - # for continuous joints (wheels) - config.append({ "q" : math.atan2(current_selected_config[cont+1], current_selected_config[cont]), "joint_name" : self.model.names[i] }) - - elif self.model.joints[i].nq == 1: - # for revolute joints - config.append({"q" : current_selected_config[cont], "joint_name" : self.model.names[i]}) - - cont += self.model.joints[i].nq - - return config - - - def get_joints_placements(self, q : np.ndarray) -> np.ndarray | float: - """ - Get the placements of the joints in the robot model. - - :param q: Joint configuration vector. - :return: Array of joint placements with names of joint, and z offset of base link. - """ - base_link_id = self.model.getFrameId("base_link") - offset_z = self.data.oMf[base_link_id].translation[2] # Get the z offset of the base link - - self.update_configuration(q) - placements = np.array([({"name" : self.model.names[i], - "type" : self.model.joints[i].shortname() , - "x": self.data.oMi[i].translation[0], - "y": self.data.oMi[i].translation[1], - "z": self.data.oMi[i].translation[2]}) for i in range(1, self.model.njoints)], dtype=object) - - return placements, offset_z - - - def get_joint_placement(self, joint_id : int) -> dict: - """ - Get the placement of a specific joint in the robot model. - - :param joint_id: ID of the joint to get the placement for. - :return: Dictionary with coordinates x , y , z of the joint. - """ - - if joint_id < 0 or joint_id >= self.model.njoints: - raise ValueError(f"Joint ID {joint_id} is out of bounds for the robot model with {self.model.njoints} joints.") - - placement = self.data.oMi[joint_id].translation - - return {"x" : placement[0], "y": placement[1], "z": placement[2]} - - - def get_mass_matrix(self, q : np.ndarray) -> np.ndarray: - """ - Compute the mass matrix for the robot model. - - :param q: Joint configuration vector. - :return: Mass matrix. - """ - - mass_matrix = pin.crba(self.model, self.data, q) - if mass_matrix is None: - raise ValueError("Failed to compute mass matrix") - - return mass_matrix - - - - def get_joints(self) -> np.ndarray: - """ - Get the array joint names of the robot model. - - :return: array Joint names . - """ - - return np.array(self.model.names[1:], dtype=str) - - - - def get_frames(self) -> np.ndarray: - """ - Get the array of frame names in the robot model. - - :return: array of frame names. - """ - - return np.array([frame.name for frame in self.model.frames if frame.type == pin.FrameType.BODY], dtype=str) - - - - def get_active_frames(self) -> np.ndarray: - """ - Get the array of active joint names in the robot model. - - :return: array of active joint names. - """ - # Get frames where joints are parents - frame_names = [] - for i in range(1, self.model.njoints): - for frame in self.model.frames: - if frame.parentJoint == i and frame.type == pin.FrameType.BODY: - frame_names.append(frame.name) - break - - return np.array(frame_names, dtype=str) - - - - def get_parent_joint_id(self, frame_name : str) -> int: - """ - Get the parent joint ID for a given frame name. - - :param frame_name: Name of the frame. - :return: Joint ID. - """ - - if frame_name is None: - raise ValueError("Frame name must be provided") - - # Get the frame ID from the model - frame_id = self.model.getFrameId(frame_name) - joint_id = self.model.frames[frame_id].parentJoint - - if joint_id == -1: - raise ValueError(f"Joint '{joint_id}' not found in the robot model") - - return joint_id - - - def print_configuration(self, q : np.ndarray = None): - """ - Print the current configuration of the robot model. - """ - - self.update_configuration(q) - - for frame_id, frame in enumerate(self.model.frames): - placement = self.data.oMf[frame_id] - print(f"Frame: {frame.name}") - print(f" Rotation:\n{placement.rotation}") - print(f" Translation:\n{placement.translation}") - - - def print_torques(self, tau : np.ndarray): - """ - Print the torques vector. - - :param tau: Torques vector to print. - """ - if tau is None: - raise ValueError("Torques vector is None") - - print("Torques vector:") - for i, torque in enumerate(tau): - # check if the joint is a prismatic joint - if self.model.joints[i+1].shortname() == "JointModelPZ": - print(f"Joint {i+2} {self.model.names[i+1]}: {torque:.4f} N") - - # for revolute joints - else: - print(f"Joint {i+2} {self.model.names[i+1]}: {torque:.4f} Nm") - print("\n") - - - - def print_frames(self): - """ - Print the frames of the robot model. - """ - print("Body frames available in the robot: ") - for i, frame in enumerate(self.model.frames): - if frame.type == pin.FrameType.BODY: - print(f"ID: {i:2d} | Name: {frame.name:20s}") - print("\n") - - - def print_acceleration(self, qddot : np.ndarray): - """ - Print the acceleration vector. - - :param a: Acceleration vector to print. - """ - - print("Acceleration vector:") - for i, acc in enumerate(qddot): - print(f"Joint {i+2} {self.model.names[i+1]}: {acc:.6f} rad/s^2") - - print("\n") - - - def print_active_joint(self): - """ - Print the active joints of the robot model. - """ - if self.model is None: - raise ValueError("Model is not initialized") - - print("Active joints:") - for i in range(self.model.njoints): - print(f"Joint {i+1}: {self.model.names[i]}") - - print("\n") - # placement of each joint in the kinematic tree - # for name, oMi in zip(self.model.names, self.data.oMi): - # print("{:<24} : {: .2f} {: .2f} {: .2f}".format(name, *oMi.translation.T.flat)) \ No newline at end of file diff --git a/dynamic_payload_analysis_core/package.xml b/dynamic_payload_analysis_core/package.xml deleted file mode 100644 index faf863a..0000000 --- a/dynamic_payload_analysis_core/package.xml +++ /dev/null @@ -1,18 +0,0 @@ - - - - dynamic_payload_analysis_core - 0.0.0 - Core package with calculation for torques calculator - Moro Enrico - Apache License 2.0 - - ament_copyright - ament_flake8 - ament_pep257 - python3-pytest - - - ament_python - - diff --git a/dynamic_payload_analysis_core/resource/dynamic_payload_analysis_core b/dynamic_payload_analysis_core/resource/dynamic_payload_analysis_core deleted file mode 100644 index e69de29..0000000 diff --git a/dynamic_payload_analysis_core/setup.cfg b/dynamic_payload_analysis_core/setup.cfg deleted file mode 100644 index 62c9e13..0000000 --- a/dynamic_payload_analysis_core/setup.cfg +++ /dev/null @@ -1,4 +0,0 @@ -[develop] -script_dir=$base/lib/dynamic_payload_analysis_core -[install] -install_scripts=$base/lib/dynamic_payload_analysis_core diff --git a/dynamic_payload_analysis_core/setup.py b/dynamic_payload_analysis_core/setup.py deleted file mode 100644 index 70e2a59..0000000 --- a/dynamic_payload_analysis_core/setup.py +++ /dev/null @@ -1,25 +0,0 @@ -from setuptools import find_packages, setup - -package_name = 'dynamic_payload_analysis_core' - -setup( - name=package_name, - version='0.0.0', - packages=find_packages(exclude=['test']), - data_files=[ - ('share/ament_index/resource_index/packages', - ['resource/' + package_name]), - ('share/' + package_name, ['package.xml']), - ], - install_requires=['setuptools'], - zip_safe=True, - maintainer='Enrico Moro', - maintainer_email='enrimoro003@gmail.com', - description='This package implements core functionalities for dynamic payload analysis in robotics, focusing on torque calculations and external force handling.', - license='Apache License 2.0', - tests_require=['pytest'], - entry_points={ - 'console_scripts': [ - ], - }, -) diff --git a/dynamic_payload_analysis_core/test/test_copyright.py b/dynamic_payload_analysis_core/test/test_copyright.py deleted file mode 100644 index 97a3919..0000000 --- a/dynamic_payload_analysis_core/test/test_copyright.py +++ /dev/null @@ -1,25 +0,0 @@ -# Copyright 2015 Open Source Robotics Foundation, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from ament_copyright.main import main -import pytest - - -# Remove the `skip` decorator once the source file(s) have a copyright header -@pytest.mark.skip(reason='No copyright header has been placed in the generated source file.') -@pytest.mark.copyright -@pytest.mark.linter -def test_copyright(): - rc = main(argv=['.', 'test']) - assert rc == 0, 'Found errors' diff --git a/dynamic_payload_analysis_core/test/test_flake8.py b/dynamic_payload_analysis_core/test/test_flake8.py deleted file mode 100644 index 27ee107..0000000 --- a/dynamic_payload_analysis_core/test/test_flake8.py +++ /dev/null @@ -1,25 +0,0 @@ -# Copyright 2017 Open Source Robotics Foundation, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from ament_flake8.main import main_with_errors -import pytest - - -@pytest.mark.flake8 -@pytest.mark.linter -def test_flake8(): - rc, errors = main_with_errors(argv=[]) - assert rc == 0, \ - 'Found %d code style errors / warnings:\n' % len(errors) + \ - '\n'.join(errors) diff --git a/dynamic_payload_analysis_core/test/test_pep257.py b/dynamic_payload_analysis_core/test/test_pep257.py deleted file mode 100644 index b234a38..0000000 --- a/dynamic_payload_analysis_core/test/test_pep257.py +++ /dev/null @@ -1,23 +0,0 @@ -# Copyright 2015 Open Source Robotics Foundation, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from ament_pep257.main import main -import pytest - - -@pytest.mark.linter -@pytest.mark.pep257 -def test_pep257(): - rc = main(argv=['.', 'test']) - assert rc == 0, 'Found code style errors / warnings' diff --git a/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/__init__.py b/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/menu_visual.py b/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/menu_visual.py index b35811a..e4bcdb5 100644 --- a/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/menu_visual.py +++ b/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/menu_visual.py @@ -157,7 +157,7 @@ def insert_dropdown_configuration(self, configuration : np.ndarray): for i, item in enumerate(configuration): # insert the parent in the command field to keep track of the parent id - last_entry = self.menu_handler.insert(f"Configuration {i} | arm: " + item["arm"] + " | max payload : " + str(item["max_payload"]) , parent=self.workspace_button, command=str(self.workspace_button), callback=self.callback_configuration_selection) + last_entry = self.menu_handler.insert(f"Configuration {i} | arm: " + item["arm"], parent=self.workspace_button, command=str(self.workspace_button), callback=self.callback_configuration_selection) self.menu_handler.setCheckState(last_entry, MenuHandler.UNCHECKED) self.menu_handler.setVisible(last_entry, True) @@ -165,7 +165,6 @@ def insert_dropdown_configuration(self, configuration : np.ndarray): self.menu_handler.setVisible(self.torque_limits_checker, True) self.menu_handler.setVisible(self.max_torque_checker, True) self.menu_handler.setVisible(self.label_color_selection, True) - self.menu_handler.setCheckState(self.torque_limits_checker, MenuHandler.CHECKED) self.menu_handler.setCheckState(self.max_torque_checker, MenuHandler.UNCHECKED) diff --git a/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/rviz_visualization_menu.py b/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/rviz_visualization_menu.py deleted file mode 100644 index 02f678d..0000000 --- a/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/rviz_visualization_menu.py +++ /dev/null @@ -1,488 +0,0 @@ -#!/usr/bin/env python3 - -# Copyright (c) 2025 PAL Robotics S.L. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import rclpy -from rclpy.node import Node -from std_msgs.msg import String, ColorRGBA -from geometry_msgs.msg import WrenchStamped, Point -from sensor_msgs.msg import JointState -from dynamic_payload_analysis_core.core import TorqueCalculator -import numpy as np -from visualization_msgs.msg import Marker, MarkerArray -from dynamic_payload_analysis_ros.menu_visual import MenuPayload -from dynamic_payload_analysis_ros.menu_visual import TorqueVisualizationType - - - - - -class RobotDescriptionSubscriber(Node): - def __init__(self): - super().__init__('node_robot_description_subscriber') - self.subscription = self.create_subscription( - String, - '/robot_description', - self.robot_description_callback, - qos_profile=rclpy.qos.QoSProfile( durability=rclpy.qos.DurabilityPolicy.TRANSIENT_LOCAL, depth = 1) - ) - - # menu for selecting frames to apply payload - self.menu = MenuPayload(self) - - # Publisher for external force - self.publisher_force = self.create_publisher(MarkerArray, '/external_forces', 10) - - # Publisher for RViz visualization of torques - self.publisher_rviz_torque = self.create_publisher(MarkerArray, '/torque_visualization', 10) - - # Pusblisher for point cloud workspace area - self.publisher_workspace_area = self.create_publisher(MarkerArray, '/workspace_area', 10) - - # Publisher for point names in the workspace area - self.publisher_workspace_area_names = self.create_publisher(MarkerArray, '/workspace_area_names', 10) - - # subscription to joint states - self.joint_states_subscription = self.create_subscription(JointState, '/joint_states', self.joint_states_callback, 10) - - # publisher for the joint states - self.publisher_joint_states = self.create_publisher(JointState, '/joint_states', 10) - - # variable to store the object of the TorqueCalculator - self.robot = None - - # variable to store external force applied on the robot - self.external_force = None - - # variable to store checked frames in the menu where a payload is applied - self.checked_frames = [] - - # variable to store masses applied on the frames - self.masses = None - - # variable to store the currente selected configuration from the workspace menu - self.valid_configurations = None - - # variable to store if there is a selected configuration from the workspace menu to visualize - self.selected_configuration = None - - # timer to compute the valid workspace area - self.timer_workspace_calculation = self.create_timer(1, self.workspace_calculation) - # timer to publish the selected configuration in joint states - self.timer_publish_configuration = self.create_timer(2, self.publish_selected_configuration) - # timer to calculate the external forces based on the checked frames in the menu - self.timer_update_checked_frames = self.create_timer(0.5, self.update_checked_frames) - # timer to publish the external forces as arrows in RViz - self.timer_publish_force = self.create_timer(1, self.publish_payload_force) - - self.get_logger().info('Robot description subscriber node started') - - - def robot_description_callback(self, msg): - """ - Callback function for the robot description topic. - This function initializes the TorqueCalculator with the robot description received from the topic. - """ - - self.get_logger().info('Received robot description') - - self.robot = TorqueCalculator(robot_description = msg.data) - self.robot.print_active_joint() - self.robot.print_frames() - - # Add the frame to the menu for payload selection - for frame in self.robot.get_active_frames(): - self.menu.insert_frame(frame) - - # self.robot.print_configuration() - - - def publish_payload_force(self): - """ - Publish the gravity force on the frame with id `id_force`. - """ - external_force_array = MarkerArray() - - for frame in self.menu.get_item_state(): - - id_force = self.robot.get_parent_joint_id(frame["name"]) - joint_position = self.robot.get_joint_placement(id_force) - arrow_force = Marker() - - arrow_force.header.frame_id = "base_link" - arrow_force.header.stamp = self.get_clock().now().to_msg() - arrow_force.ns = "external_force" - arrow_force.id = id_force - arrow_force.type = Marker.ARROW - - # add the arrow if the frame is checked or delete it if not - if frame["checked"]: - arrow_force.action = Marker.ADD - else: - arrow_force.action = Marker.DELETE - - arrow_force.scale.x = 0.20 # Length of the arrow - arrow_force.scale.y = 0.05 # Width of the arrow - arrow_force.scale.z = 0.05 # Height of the arrow - arrow_force.color.a = 1.0 # Alpha - arrow_force.color.r = 0.0 - arrow_force.color.g = 0.0 # Green - arrow_force.color.b = 1.0 - - # Set the position of the arrow at the joint placement - arrow_force.pose.position.x = joint_position["x"] - arrow_force.pose.position.y = joint_position["y"] - arrow_force.pose.position.z = joint_position["z"] - # Set the direction of the arrow downwards - arrow_force.pose.orientation.x = 0.0 - arrow_force.pose.orientation.y = 0.7071 - arrow_force.pose.orientation.z = 0.0 - arrow_force.pose.orientation.w = 0.7071 - - external_force_array.markers.append(arrow_force) - - self.publisher_force.publish(external_force_array) - - - - def workspace_calculation(self): - """ - Callback for timer to compute the valid workspace area. - """ - # if the user choose to compute the workspace area then compute the valid configurations - if self.menu.get_workspace_state(): - self.valid_configurations = self.robot.get_valid_workspace(2, 0.20, "arm_left_7_joint", "arm_right_7_joint", self.masses, self.checked_frames) - - self.valid_configurations = self.robot.compute_maximum_payloads(self.valid_configurations) - - # insert the valid configurations in the menu - self.menu.insert_dropdown_configuration(self.valid_configurations) - - # clear all the workspace area markers - self.clear_workspace_area_markers() - - # set the workspace state to False to stop the computation - self.menu.set_workspace_state(False) - - # if there are valid configurations, publish the workspace area - if self.valid_configurations is not None: - # publish the workspace area - self.publish_workspace_area() - - - - def publish_selected_configuration(self): - """ - Timer to publish the selected configuration. - This will publish the joint states of the selected configuration in the menu. - """ - # get the selected configuration from the menu - self.selected_configuration = self.menu.get_selected_configuration() - - # if there is a selected configuration, publish the joint states based on the valid configurations calculated previously - if self.selected_configuration is not None: - configs = self.robot.get_position_for_joint_states(self.valid_configurations[self.selected_configuration]["config"]) - joint_state = JointState() - joint_state.header.stamp = self.get_clock().now().to_msg() - - joint_state.name = [joint["joint_name"] for joint in configs] - joint_state.position = [joint["q"] for joint in configs] - #joint_state.position = - self.publisher_joint_states.publish(joint_state) - - else: - if self.robot is not None: - # if there is no selected configuration, publish the joint states with zero positions - joint_state = JointState() - joint_state.header.stamp = self.get_clock().now().to_msg() - - joint_state.name = self.robot.get_joints().tolist() - zero_config = self.robot.get_position_for_joint_states(self.robot.get_zero_configuration()) - joint_state.position = [joint["q"] for joint in zero_config] - - self.publisher_joint_states.publish(joint_state) - - - def update_checked_frames(self): - """ - Function to update the external forces based on the checked frames in the menu. - """ - # create the array with only the checked frames (with external force applied) - self.checked_frames = np.array([check_frame["name"] for check_frame in self.menu.get_item_state() if check_frame['checked']]) - - if len(self.checked_frames) != 0: - # create the array with the masses of the checked frames - self.masses = np.array([check_frame["payload"] for check_frame in self.menu.get_item_state() if check_frame['checked']]) - else: - # if there are no checked frames, set the masses to None - self.masses = None - - - def joint_states_callback(self, msg): - """ - Callback function for the joint states topic. - """ - if self.robot is not None: - # if you are not using the calculated configuration from workspace, you can use the joint states to compute the torques because you don't have already the computed torques - if self.selected_configuration is None: - self.positions = list(msg.position) - self.name_positions = list(msg.name) - v = msg.velocity if msg.velocity else self.robot.get_zero_velocity() - - # set the positions based on the joint states - q = self.robot.set_position(self.positions, self.name_positions) - a = self.robot.get_zero_acceleration() - - # if there are no checked frames, set the external force to None - if len(self.checked_frames) != 0: - # create the external force with the masses and the checked frames - self.external_force = self.robot.create_ext_force(masses=self.masses, frame_name=self.checked_frames, q=q) - else: - self.external_force = None - - # compute the inverse dynamics - tau = self.robot.compute_inverse_dynamics(q, v, a, extForce=self.external_force) - - # check the effort limits - status_efforts = self.robot.check_effort_limits(tau) - - # print the torques - self.robot.print_torques(tau) - - # get the positions of the joints where the torques are applied based on - joints_position, offset_z = self.robot.get_joints_placements(q) - - # Publish the torques in RViz - self.publish_label_torques(tau, status_torques=status_efforts ,joints_position=joints_position) - - else: - # if you are using the calculated configuration from workspace, you can use the valid configurations to visualize the torques labels - - # get the positions of the joints where the torques are applied based on - joints_position, offset_z = self.robot.get_joints_placements(self.valid_configurations[self.selected_configuration]["config"]) - # get the torques status (if the torques are within the limits) - status_efforts = self.robot.check_effort_limits(self.valid_configurations[self.selected_configuration]["tau"]) - - self.publish_label_torques(self.valid_configurations[self.selected_configuration]["tau"], status_torques=status_efforts ,joints_position=joints_position) - - - - def publish_label_torques(self, torque: np.ndarray, status_torques : np.ndarray, joints_position: np.ndarray): - """ - Publish the torque with labels on the robot in RViz. - - Args: - torque (np.ndarray): The torque to be published - status_torques (np.ndarray): The status of the torques, True if the torque is within the limits, False otherwise - joints_position (np.ndarray): The positions of the joints where the torques are applied - """ - marker_array = MarkerArray() - for i, (t, joint) in enumerate(zip(torque, joints_position)): - # remove the gripper joints from the visualization TODO: make it more general (with MIMIC joints) - if "gripper" not in joint['name']: - marker = Marker() - marker.header.frame_id = "base_link" - marker.header.stamp = self.get_clock().now().to_msg() - marker.ns = "torque_visualization" - marker.id = i - marker.type = Marker.TEXT_VIEW_FACING - # Set the text based on the joint type - if joint['type'] == 'JointModelPZ': - marker.text = f"{joint['name']}: {t:.2f} N" - else: - marker.text = f"{joint['name']}: {t:.2f} Nm" - - marker.action = Marker.ADD - marker.pose.position.x = joint['x'] - marker.pose.position.y = joint['y'] - marker.pose.position.z = joint['z'] - marker.pose.orientation.w = 1.0 - marker.scale.x = 0.03 - marker.scale.y = 0.03 - marker.scale.z = 0.03 - marker.color.a = 1.0 # Alpha - if status_torques[i]: - marker.color.r = 0.0 # Red - marker.color.g = 1.0 # Green - marker.color.b = 0.0 # Blue - else: - marker.color.r = 1.0 # Red - marker.color.g = 0.0 # Green - marker.color.b = 0.0 # Blue - marker_array.markers.append(marker) - - self.publisher_rviz_torque.publish(marker_array) - - - def publish_workspace_area(self): - """ - Publish the workspace area in RViz using points and labels for the end points. - - """ - # Create a MarkerArray to visualize the number of configuration of a specific point in the workspace - marker_point_names = MarkerArray() - - # Create a Marker for the workspace area using points - marker_points = MarkerArray() - - # calculate the maximum torques for each joint in the current valid configurations for each arm only if the user selected the max current torque visualization - if self.menu.get_torque_color() == TorqueVisualizationType.MAX_CURRENT_TORQUE: - max_torque_for_joint_left, max_torque_for_joint_right = self.robot.get_maximum_torques(self.valid_configurations) - - - - - cont = 0 - # Iterate through the valid configurations and create markers - for i, valid_config in enumerate(self.valid_configurations): - - # create the label for the end point (end effector position) of the valid configuration - marker_point_name = Marker() - marker_point_name.header.frame_id = "base_link" - marker_point_name.header.stamp = self.get_clock().now().to_msg() - - marker_point_name.ns = f"workspace_area_{valid_config['arm']}_arm" - marker_point_name.id = i + 1 - marker_point_name.type = Marker.TEXT_VIEW_FACING - marker_point_name.text = f"Config {i}" - marker_point_name.action = Marker.ADD - marker_point_name.pose.position.x = valid_config["end_effector_pos"][0] - marker_point_name.pose.position.y = valid_config["end_effector_pos"][1] - marker_point_name.pose.position.z = valid_config["end_effector_pos"][2] + 0.05 # Offset to avoid overlap with the sphere - marker_point_name.pose.orientation.w = 1.0 - marker_point_name.scale.x = 0.02 - marker_point_name.scale.y = 0.02 - marker_point_name.scale.z = 0.02 - marker_point_name.color.a = 1.0 # Alpha - marker_point_name.color.r = 1.0 # Red - marker_point_name.color.g = 1.0 # Green - marker_point_name.color.b = 1.0 # Blue - - # get the joint positions for the valid configuration - joint_positions, offset_z = self.robot.get_joints_placements(valid_config["config"]) - - # get the normalized torques for the valid configuration with current target limits for color visualization - if self.menu.get_torque_color() == TorqueVisualizationType.TORQUE_LIMITS: - norm_joints_torques = self.robot.get_normalized_torques(valid_config["tau"]) - else: - # if the user selected the max torque, use the maximum torques for the joint - if valid_config["arm"] == "left": - norm_joints_torques = self.robot.get_normalized_torques(valid_config["tau"],max_torque_for_joint_left) - else: - norm_joints_torques = self.robot.get_normalized_torques(valid_config["tau"],max_torque_for_joint_right) - - # insert points related to the end effector position in the workspace area and with color based on the normalized torques for each joint - for joint_pos,tau in zip(joint_positions,norm_joints_torques): - # print only the points for the corrisponding arm, in this way we can visualize the workspace area for each arm separately and avoid confusion - if valid_config["arm"] in joint_pos["name"] : - point = Marker() - point.header.frame_id = "base_link" - point.header.stamp = self.get_clock().now().to_msg() - point.ns = joint_pos["name"] - point.id = cont - point.type = Marker.SPHERE - point.action = Marker.ADD - point.scale.x = 0.03 # Size of the sphere - point.scale.y = 0.03 - point.scale.z = 0.03 - - point.pose.position.x = valid_config["end_effector_pos"][0] - point.pose.position.y = valid_config["end_effector_pos"][1] - point.pose.position.z = valid_config["end_effector_pos"][2] - offset_z - point.pose.orientation.w = 1.0 - point.color.a = 1.0 # Alpha - point.color.r = tau # Red - point.color.g = 1 - tau # Green - point.color.b = 0.0 # Blue - - cont += 1 - - # Add the point to the points array - marker_points.markers.append(point) - - # Add the marker point name to the marker point names array - marker_point_names.markers.append(marker_point_name) - - - # get the unified torque for the valid configurations - unified_configurations_torque = self.robot.get_unified_configurations_torque(self.valid_configurations) - - # insert points related to the end effector position in the workspace area and with color based on the normalized torque for each configuration - # this is used to visualize the workspace area with the unified torques for each configuration - for i, unified_config in enumerate(unified_configurations_torque): - marker_point = Marker() - marker_point.header.frame_id = "base_link" - marker_point.header.stamp = self.get_clock().now().to_msg() - marker_point.ns = f"unified_torque_workspace_{unified_config['arm']}_arm" - marker_point.id = i - marker_point.type = Marker.SPHERE - marker_point.action = Marker.ADD - marker_point.scale.x = 0.03 # Size of the sphere - marker_point.scale.y = 0.03 - marker_point.scale.z = 0.03 - marker_point.pose.position.x = unified_config["end_effector_pos"][0] - marker_point.pose.position.y = unified_config["end_effector_pos"][1] - marker_point.pose.position.z = unified_config["end_effector_pos"][2] - offset_z # Offset to avoid overlap with the sphere - marker_point.pose.orientation.w = 1.0 - marker_point.color.a = 1.0 # Alpha - - # Color based on the normalized torques - marker_point.color.r = unified_config["norm_tau"] - marker_point.color.g = 1 - unified_config["norm_tau"] - marker_point.color.b = 0.0 # Blue - - # Add the marker point to the points array - marker_points.markers.append(marker_point) - - - # Publish the marker points and names - self.publisher_workspace_area.publish(marker_points) - self.publisher_workspace_area_names.publish(marker_point_names) - - - def clear_workspace_area_markers(self): - """ - Clear the workspace area markers in RViz. - This will publish an empty MarkerArray to remove the previous markers. - """ - # create an empty MarkerArray to clear the markers with a DELETEALL action - marker_array_msg = MarkerArray() - marker = Marker() - marker.id = 0 - marker.action = Marker.DELETEALL - marker_array_msg.markers.append(marker) - - # Publish the empty MarkerArray to clear the markers - self.publisher_workspace_area.publish(marker_array_msg) - self.publisher_workspace_area_names.publish(marker_array_msg) - - - - -def main(args=None): - rclpy.init(args=args) - node = RobotDescriptionSubscriber() - - try: - rclpy.spin(node) - except KeyboardInterrupt: - pass - finally: - node.destroy_node() - rclpy.shutdown() - - -if __name__ == '__main__': - main() \ No newline at end of file diff --git a/dynamic_payload_analysis_ros/package.xml b/dynamic_payload_analysis_ros/package.xml deleted file mode 100644 index 51ba95e..0000000 --- a/dynamic_payload_analysis_ros/package.xml +++ /dev/null @@ -1,18 +0,0 @@ - - - - dynamic_payload_analysis_ros - 0.0.0 - This package provides tools for dynamic payload analysis in robotics with a focus on torque calculations and external force handling. - Moro Enrico - Apache License 2.0 - - ament_copyright - ament_flake8 - ament_pep257 - python3-pytest - - - ament_python - - diff --git a/dynamic_payload_analysis_ros/resource/dynamic_payload_analysis_ros b/dynamic_payload_analysis_ros/resource/dynamic_payload_analysis_ros deleted file mode 100644 index e69de29..0000000 diff --git a/dynamic_payload_analysis_ros/setup.cfg b/dynamic_payload_analysis_ros/setup.cfg deleted file mode 100644 index 5a64333..0000000 --- a/dynamic_payload_analysis_ros/setup.cfg +++ /dev/null @@ -1,4 +0,0 @@ -[develop] -script_dir=$base/lib/dynamic_payload_analysis_ros -[install] -install_scripts=$base/lib/dynamic_payload_analysis_ros diff --git a/dynamic_payload_analysis_ros/setup.py b/dynamic_payload_analysis_ros/setup.py deleted file mode 100644 index 901e1da..0000000 --- a/dynamic_payload_analysis_ros/setup.py +++ /dev/null @@ -1,27 +0,0 @@ -from setuptools import find_packages, setup - -package_name = 'dynamic_payload_analysis_ros' - -setup( - name=package_name, - version='0.0.0', - packages=find_packages(exclude=['test']), - data_files=[ - ('share/ament_index/resource_index/packages', - ['resource/' + package_name]), - ('share/' + package_name, ['package.xml']), - ], - install_requires=['setuptools'], - zip_safe=True, - maintainer='Enrico Moro', - maintainer_email='enrimoro003@gmail.com', - description='This package provides graphics tools in Rviz for dynamic payload analysis in robotics with a focus on torque calculations and external force handling.', - license='Apache License 2.0', - tests_require=['pytest'], - entry_points={ - 'console_scripts': [ - 'node_rviz_visualization = dynamic_payload_analysis_ros.rviz_visualization:main', - 'node_rviz_visualization_menu = dynamic_payload_analysis_ros.rviz_visualization_menu:main', - ], - }, -) diff --git a/dynamic_payload_analysis_ros/test/test_copyright.py b/dynamic_payload_analysis_ros/test/test_copyright.py deleted file mode 100644 index 97a3919..0000000 --- a/dynamic_payload_analysis_ros/test/test_copyright.py +++ /dev/null @@ -1,25 +0,0 @@ -# Copyright 2015 Open Source Robotics Foundation, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from ament_copyright.main import main -import pytest - - -# Remove the `skip` decorator once the source file(s) have a copyright header -@pytest.mark.skip(reason='No copyright header has been placed in the generated source file.') -@pytest.mark.copyright -@pytest.mark.linter -def test_copyright(): - rc = main(argv=['.', 'test']) - assert rc == 0, 'Found errors' diff --git a/dynamic_payload_analysis_ros/test/test_flake8.py b/dynamic_payload_analysis_ros/test/test_flake8.py deleted file mode 100644 index 27ee107..0000000 --- a/dynamic_payload_analysis_ros/test/test_flake8.py +++ /dev/null @@ -1,25 +0,0 @@ -# Copyright 2017 Open Source Robotics Foundation, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from ament_flake8.main import main_with_errors -import pytest - - -@pytest.mark.flake8 -@pytest.mark.linter -def test_flake8(): - rc, errors = main_with_errors(argv=[]) - assert rc == 0, \ - 'Found %d code style errors / warnings:\n' % len(errors) + \ - '\n'.join(errors) diff --git a/dynamic_payload_analysis_ros/test/test_pep257.py b/dynamic_payload_analysis_ros/test/test_pep257.py deleted file mode 100644 index b234a38..0000000 --- a/dynamic_payload_analysis_ros/test/test_pep257.py +++ /dev/null @@ -1,23 +0,0 @@ -# Copyright 2015 Open Source Robotics Foundation, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from ament_pep257.main import main -import pytest - - -@pytest.mark.linter -@pytest.mark.pep257 -def test_pep257(): - rc = main(argv=['.', 'test']) - assert rc == 0, 'Found code style errors / warnings'