diff --git a/doc/README.md b/doc/README.md
deleted file mode 100644
index 66a490b..0000000
--- a/doc/README.md
+++ /dev/null
@@ -1,3 +0,0 @@
-# Documentation
-
-This file contains the doucmentation for the user setup and testing anf also to server as a tutorials for the users.
\ No newline at end of file
diff --git a/dynamic_payload_analysis_core/dynamic_payload_analysis_core/__init__.py b/dynamic_payload_analysis_core/dynamic_payload_analysis_core/__init__.py
deleted file mode 100644
index e69de29..0000000
diff --git a/dynamic_payload_analysis_core/dynamic_payload_analysis_core/core.py b/dynamic_payload_analysis_core/dynamic_payload_analysis_core/core.py
deleted file mode 100644
index cdfa3dd..0000000
--- a/dynamic_payload_analysis_core/dynamic_payload_analysis_core/core.py
+++ /dev/null
@@ -1,1014 +0,0 @@
-
-# Copyright (c) 2025 PAL Robotics S.L. All rights reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-import pinocchio as pin
-import numpy as np
-import math
-from typing import Union
-from pathlib import Path
-from numpy.linalg import norm, solve
-from optik import Robot, SolverConfig
-import tempfile
-from ikpy import chain
-import os
-
-# TODO : If a workspace calculation is already done, store the results in a file to avoid recalculating it and make only the
-# verification of the payload handling in the workspace.
-
-
-
-class TorqueCalculator:
- def __init__(self, robot_description : Union[str, Path]):
- """
- Initialize the Torques_calculator with the URDF model or XML format provided by robot_description topic.
-
- :param urdf_path: Path to the URDF file of the robot.
- :param robot_description: Robot description in XML format provided by /robot_description topic or path of URDF file.
- """
-
- # Load the robot model from path or XML string
- if isinstance(robot_description, str):
- self.model = pin.buildModelFromXML(robot_description)
-
- # TODO change parser in general for more unique solution
-
- # create temporary URDF file from the robot description string
- with tempfile.NamedTemporaryFile(mode='w', suffix='.urdf', delete=False) as temp_file:
- temp_file.write(robot_description)
- temp_urdf_path = temp_file.name
-
- self.geom_model = pin.buildGeomFromUrdf(self.model,temp_urdf_path,pin.GeometryType.COLLISION)
-
- # Add collisition pairs
- self.geom_model.addAllCollisionPairs()
-
- os.unlink(temp_urdf_path)
-
- elif isinstance(robot_description, Path):
- self.model = pin.buildModelFromUrdf(str(robot_description.resolve()))
-
- # create data for the robot model
- self.data = self.model.createData()
- self.geom_data = pin.GeometryData(self.geom_model)
-
- # get the default collisions in the robot model to avoid take them into account in the computations
- self.default_collisions = self.compute_static_collisions()
-
- # create array to store all possible configurations for left and right arms
- self.configurations_l = None
- self.configurations_r = None
-
-
- def compute_static_collisions(self):
- """
- Compute the static collisions for the robot model.
- This method is used to compute the collisions in the robot model in the zero configuration.
- """
-
- # array to store the collision pairs
- collision_pairs = []
-
- # Compute all the collisions
- pin.computeCollisions(self.model, self.data, self.geom_model, self.geom_data, self.get_zero_configuration(), False)
-
- # Print the status of collision for all collision pairs
- for k in range(len(self.geom_model.collisionPairs)):
- cr = self.geom_data.collisionResults[k]
- cp = self.geom_model.collisionPairs[k]
- if cr.isCollision():
- print(f"Collision between {cp.first} and {cp.second} detected.")
- collision_pairs.append((cp.first, cp.second, k))
-
- return collision_pairs
-
-
-
-
- def compute_inverse_dynamics(self, q : np.ndarray , qdot : np.ndarray, qddot : np.ndarray, extForce : np.ndarray[pin.Force] = None) -> np.ndarray:
- """
- Compute the inverse dynamics torque vector.
-
- :param q: Joint configuration vector.
- :param qdot: Joint velocity vector.
- :param qddot: Joint acceleration vector.
- :param extForce: External forces vector applied to the robot model.
- :return: Torques vector
- """
-
- # basic equation for inverse dynamics : M(q) *a + b = tau + J(q)_t * extForce --> tau = M(q) * a + b - J(q)_t * extForce
-
- if extForce:
- tau = pin.rnea(self.model, self.data, q, qdot, qddot, extForce)
- else:
- tau = pin.rnea(self.model, self.data, q, qdot, qddot)
-
- if tau is None:
- raise ValueError("Failed to compute torques")
-
- return tau
-
-
- def create_ext_force(self, masses : Union[float, np.ndarray] , frame_name : Union[str | np.ndarray], q : np.ndarray) -> np.ndarray[pin.Force]:
- """
- Create external forces vector based on the masses and frame ID.
- The resulting vector will contain the force applied to the specified frame and with the local orientation of the parent joint.
-
- :param masses (float, np.ndarray) : Mass of the object to apply the force to or vector with masses related to frames names.
- :param frame_name(str , np.ndarray) : Frame name where the force is applied or vector of frame names where the forces is applied.
- :param q: Joint configuration vector.
- :return: External force vector.
- """
- if isinstance(masses, float):
- if masses < 0:
- raise ValueError("Mass must be a positive value")
-
- if frame_name is None:
- raise ValueError("Frame name must be provided")
-
- #if frame_name not in self.model.frames:
- # raise ValueError(f"Frame name '{frame_name}' not found in the robot model")
-
- # assumption made : the force is applied to the joint
-
- # Initialize external forces array
- fext = [pin.Force(np.zeros(6)) for _ in range(self.model.njoints)]
-
- self.update_configuration(q)
-
- # Check if frame_name is a single string or an array of strings
- if isinstance(frame_name, str):
- # Get the frame ID and joint ID from the frame name
- frame_id = self.model.getFrameId(frame_name)
- joint_id = self.model.frames[frame_id].parentJoint
-
- # force expressed in the world frame (gravity force in z axis)
- ext_force_world = pin.Force(np.array([0.0, 0.0, masses * -9.81]), np.array([0.0, 0.0, 0.0]))
-
- # placement of the frame in the world frame
- #frame_placement = self.data.oMf[frame_id]
- #print(f"Frame placement: {frame_placement}")
-
- # Convert the external force expressed in the world frame to the orientation of the joint frame where the force is applied
- fext[joint_id] = self.data.oMi[joint_id].actInv(ext_force_world)
- # Zero out the last 3 components (torques) of the force to ensure only the force in z axis (gravity force) is applied
- fext[joint_id].angular = np.zeros(3) # TODO : make it more efficient
-
- else:
- for mass, frame in zip(masses,frame_name):
- frame_id = self.model.getFrameId(frame)
- joint_id = self.model.frames[frame_id].parentJoint
-
- # force expressed in the world frame (gravity force in z axis)
- ext_force_world = pin.Force(np.array([0.0, 0.0, mass * -9.81]), np.array([0.0, 0.0, 0.0]))
- # Convert the external force expressed in the world frame to the orientation of the joint frame where the force is applied
- fext[joint_id] = self.data.oMi[joint_id].actInv(ext_force_world)
- # Zero out the last 3 components (torques) of the force to ensure only the force in z axis (gravity force) is applied
- fext[joint_id].angular = np.zeros(3)
-
- return fext
-
-
- def update_configuration(self, q : np.ndarray):
- """
- Update the robot model configuration with the given joint configuration vector.
-
- :param q: Joint configuration vector.
- """
- pin.forwardKinematics(self.model, self.data, q)
- pin.updateFramePlacements(self.model, self.data)
-
-
- def compute_maximum_payload(self, q : np.ndarray, qdot : np.ndarray, tau : np.ndarray, frame_name : str) -> float:
- """
- Compute the forward dynamics acceleration vector.
-
- :param q: Joint configuration vector.
- :param qdot: Joint velocity vector.
- :param tau: Joint torque vector.
- :param frame_name: Name of the frame where the force is applied.
- :return: Acceleration vector.
- """
- # TODO : refactor this method
-
- # Update the configuration of the robot model
- self.update_configuration(q)
-
- # basic idea for forward dynamics: M(q) * a_q + b = tau(q) --> a_q = (tau(q) - b)/ M(q)
- # with external force, the equation becomes: M(q) * a_q + b = tau(q) + J_traspose(q) * f_ext -- > f_ext = (M(q) * a_q + b - tau(q))/ J_traspose(q)
-
- qddot0 = np.zeros(self.model.nv) # Initialize acceleration vector
-
- # calculate dynamics drift
- b = pin.rnea(self.model, self.data, q, qdot, qddot0)
-
- #get jacobian of the frame where the payload is applied
- J = self.compute_jacobian(q, frame_name)
-
- tau_r = b - tau
-
- try:
- #a = np.linalg.solve(M, tau - b)
- # get F = (J_transpose(q))^-1 X ( tau - b ) with b = M(q)*a_q + b || pinv = pseudo-inverse to prevent singularities
- F_max = np.linalg.pinv(J.T) @ tau_r
-
- except np.linalg.LinAlgError as e:
- raise ValueError(f"Failed to solve for acceleration: {e}")
-
- return F_max[2] # get the force in z axis of the world frame, which is the maximum force payload
-
-
-
- def compute_inverse_kinematics_optik(self, q : np.ndarray, end_effector_position: np.ndarray) -> np.ndarray:
- """
- Compute the inverse kinematics for the robot model using the Optik library.
-
- :param q: current joint configuration vector.
- :param end_effector_position: Position of the end effector in the world frame [rotation matrix , translation vector].
- :return: Joint configuration vector that achieves the desired end effector position.
- """
- # TODO : It doees not work with the current version of the library
-
- # Compute the inverse kinematics
- sol = self.ik_model.ik(self.ik_config, end_effector_position, q)
-
- return sol
-
-
-
- def compute_inverse_kinematics_ikpy(self, q : np.ndarray, end_effector_position: np.ndarray) -> np.ndarray:
- """
- Compute the inverse kinematics for the robot model using the ikpy library.
-
- :param q: current joint configuration vector.
- :param end_effector_position: Position of the end effector in the world frame [rotation matrix , translation vector].
- :return: Joint configuration vector that achieves the desired end effector position.
- """
- # TODO : It doees not work with the current version of the library
-
- # Compute the inverse kinematics
- sol = self.ik_model.inverse_kinematics(end_effector_position)
-
- return sol
-
-
-
- def compute_inverse_kinematics(self, q : np.ndarray, end_effector_position: np.ndarray, end_effector_name : str) -> np.ndarray:
- """
- Compute the inverse kinematics for the robot model with joint limits consideration.
- :param q: current joint configuration vector.
- :param end_effector_position: Position of the end effector in the world frame [rotation matrix , translation vector].
- :param end_effector_name: Name of the end effector joint.
- :return: Joint configuration vector that achieves the desired end effector position.
- """
-
- joint_id = self.model.getJointId(end_effector_name) # Get the joint ID of the end effector
-
- # Set parameters for the inverse kinematics solver
- eps = 1e-3 # reduce for more precision
- IT_MAX = 500 # Maximum number of iterations
- DT = 1e-1
- damp = 1e-12
-
- i = 0
- while True:
- self.update_configuration(q)
- iMd = self.data.oMi[joint_id].actInv(end_effector_position) # Get the transformation from the current end effector pose to the desired pose
-
- err = pin.log(iMd).vector # compute the error in the end effector position
- if norm(err[:3]) < eps:
- success = True
- break
- if i >= IT_MAX:
- success = False
- break
-
- J = pin.computeJointJacobian(self.model, self.data, q, joint_id) # compute the Jacobian of current pose of end effector
- #J = -np.dot(pin.Jlog6(iMd.inverse()), J)
- J = -J[:3, :]
-
- # compute the inverse kinematics v = -J^T * (J * J^T + damp * I)^-1 * err
- v = -J.T.dot(solve(J.dot(J.T) + damp * np.eye(3), err[:3]))
-
- # Apply joint limits by clamping the resulting configuration
- q_new = pin.integrate(self.model, q, v * DT)
- # Ensure the new configuration is within joint limits
- q_new = np.clip(q_new, self.model.lowerPositionLimit, self.model.upperPositionLimit)
-
- # Check if we're hitting joint limits and reduce step size if needed
- if np.allclose(q_new, q, atol=1e-6):
- DT *= 0.5 # Reduce step size if no progress due to joint limits
- if DT < 1e-6: # Minimum step size threshold
- success = False
- break
-
- q = q_new
-
- # if not i % 10:
- # print(f"{i}: error = {err.T}")
- i += 1
-
- if success:
- print(f"Convergence achieved! in {i} iterations")
- return q
- else:
- print(
- "\n"
- "Warning: the iterative algorithm has not reached convergence to the desired precision"
- )
- return None # Return None if convergence is not achieved
-
-
-
- def compute_all_configurations(self, range : int, resolution : int, end_effector_name : str) -> np.ndarray:
- """
- Compute all configurations for the robot model within a specified range.
-
- :param range (int): Range as side of a square where in the center there is the actual position of end effector.
- :param resolution (int): Resolution of the grid to compute configurations.
- :param end_effector_name (str): Name of the end effector joint.
- :return : Array of joint configurations that achieve the desired end effector position.
- """
-
- if range <= 0:
- raise ValueError("Range must be a positive value")
-
- # Get the current joint configuration
- q = self.get_zero_configuration()
-
- #id_end_effector = self.model.getJointId(end_effector_name)
- # Get the current position of the end effector
- #end_effector_pos = self.data.oMi[id_end_effector]
-
- # Create an array to store all configurations
- configurations = []
-
- # Iterate over the range to compute all configurations
- for x in np.arange(-range/2, range/2 , resolution):
- for y in np.arange(-range/2, range/2 , resolution):
- for z in np.arange(0, range , resolution):
- target_position = pin.SE3(np.eye(3), np.array([x, y, z]))
- new_q = self.compute_inverse_kinematics(q, target_position, end_effector_name)
-
- if new_q is not None:
- q = new_q
- # store the valid configuration and the position of the end effector relative to that configuration
- configurations.append({"config" : new_q, "end_effector_pos": target_position.translation})
-
- return np.array(configurations, dtype=object)
-
-
-
- def verify_configurations(self, configurations_left : np.ndarray, configurations_right : np.ndarray, masses : np.ndarray, checked_frames : np.ndarray) -> np.ndarray:
- """
- Verify the configurations to check if they are valid.
-
- :param configurations_left: Array of joint configurations to verify for the left arm.
- :param configurations_right: Array of joint configurations to verify for the right arm.
- :param ext_forces: Array of external forces to apply to the robot model.
- :return: Array of valid configurations with related torques in format: [{"config", "end_effector_pos, "tau"}].
- """
-
- valid_configurations = []
-
- # check valid configurations for left arm
- for q in configurations_left:
- # Update the configuration of the robot model
- self.update_configuration(q["config"])
-
- if masses is not None and checked_frames is not None:
- # Create external forces based on the masses and checked frames
- ext_forces = self.create_ext_force(masses, checked_frames, q["config"])
- # Compute the inverse dynamics for the current configuration
- tau = self.compute_inverse_dynamics(q["config"], self.get_zero_velocity(), self.get_zero_acceleration(),extForce=ext_forces)
- else:
- # Compute the inverse dynamics for the current configuration without external forces
- tau = self.compute_inverse_dynamics(q["config"], self.get_zero_velocity(), self.get_zero_acceleration())
-
- # Check if the torques are within the effort limits
- if self.check_effort_limits(tau,"left").all():
- valid = True
- # Compute all the collisions
- pin.computeCollisions(self.model, self.data, self.geom_model, self.geom_data, q["config"], False)
-
- # Print the status of collision for all collision pairs
- for k in range(len(self.geom_model.collisionPairs)):
- cr = self.geom_data.collisionResults[k]
- cp = self.geom_model.collisionPairs[k]
-
- if cr.isCollision() and (cp.first, cp.second, k) not in self.default_collisions:
- print(f"Collision detected between {cp.first} and {cp.second} in the left arm configuration.")
- valid = False
- break
-
- if valid:
- valid_configurations.append({"config" : q["config"], "end_effector_pos" : q["end_effector_pos"], "tau" : tau, "arm" : "left" })
-
- # check valid configurations for right arm
- for q in configurations_right:
- # Update the configuration of the robot model
- self.update_configuration(q["config"])
-
- if masses is not None and checked_frames is not None:
- # Create external forces based on the masses and checked frames
- ext_forces = self.create_ext_force(masses, checked_frames, q["config"])
- # Compute the inverse dynamics for the current configuration
- tau = self.compute_inverse_dynamics(q["config"], self.get_zero_velocity(), self.get_zero_acceleration(),extForce=ext_forces)
- else:
- # Compute the inverse dynamics for the current configuration without external forces
- tau = self.compute_inverse_dynamics(q["config"], self.get_zero_velocity(), self.get_zero_acceleration())
-
- # Check if the torques are within the effort limits
- if self.check_effort_limits(tau,"right").all():
- valid = True
- # Compute all the collisions
- pin.computeCollisions(self.model, self.data, self.geom_model, self.geom_data, q["config"], False)
-
- # Print the status of collision for all collision pairs
- for k in range(len(self.geom_model.collisionPairs)):
- cr = self.geom_data.collisionResults[k]
- cp = self.geom_model.collisionPairs[k]
-
- if cr.isCollision() and (cp.first, cp.second, k) not in self.default_collisions:
- print(f"Collision detected between {cp.first} and {cp.second} in the right arm configuration.")
- valid = False
- break
-
- if valid:
- valid_configurations.append({"config" : q["config"], "end_effector_pos" : q["end_effector_pos"], "tau" : tau, "arm" : "right" })
-
-
- return np.array(valid_configurations, dtype=object)
-
-
- def get_valid_workspace(self, range : int, resolution : int, end_effector_name_left : str, end_effector_name_right, masses : np.ndarray, checked_frames: np.ndarray) -> np.ndarray:
- """
- Get the valid workspace of the robot model by computing all configurations within a specified range.
-
- :param range (int): Range as side of a square where in the center there is the actual position of end effector.
- :param resolution (int): Resolution of the grid to compute configurations.
- :param end_effector_name_left (str): Name of the end effector joint to test for the left arm.
- :param end_effector_name_right (str): Name of the end effector joint to test for the right arm.
- :param masses (np.ndarray): Array of masses to apply to the robot model.
- :param checked_frames (np.ndarray): Array of frame names where the external forces are applied.
- :return: Array of valid configurations that achieve the desired end effector position in format: [{"config", "end_effector_pos, "tau", "arm"}].
- """
- # compute all configurations for the left and right arms if they are not already computed
- if self.configurations_l is None or self.configurations_r is None:
- # Compute all configurations within the specified range for the left arm
- self.configurations_l = self.compute_all_configurations(range,resolution, end_effector_name_left)
- # Compute all configurations within the specified range for the right arm
- self.configurations_r = self.compute_all_configurations(range,resolution, end_effector_name_right)
-
- # Verify the configurations to check if they are valid for both arms
- valid_configurations = self.verify_configurations(self.configurations_l,self.configurations_r, masses, checked_frames)
-
- return valid_configurations
-
-
-
- def compute_forward_dynamics_aba_method(self, q : np.ndarray, qdot : np.ndarray, tau : np.ndarray, extForce : np.ndarray[pin.Force] = None) -> np.ndarray:
- """
- Compute the forward dynamics acceleration vector with Articulated-Body algorithm(ABA).
-
- :param q: Joint configuration vector.
- :param qdot: Joint velocity vector.
- :param tau: Joint torque vector.
- :param extForce: External forces vector applied to the robot model.
- :return: Acceleration vector.
- """
-
- # calculate dynamics drift
- if extForce is not None:
- ddq = pin.aba(self.model, self.data, q, qdot, tau, extForce)
- else:
- ddq = pin.aba(self.model, self.data, q, qdot, tau)
-
- return ddq
-
-
- def compute_jacobian(self, q : np.ndarray, frame_name : str) -> np.ndarray:
- """
- Get the Jacobian matrix for a specific frame in the robot model.
-
- :param q: Joint configuration vector.
- :param frame_name: Name of the frame to compute the Jacobian for.
- :return: Jacobian matrix.
- """
-
- # Get the frame ID
- frame_id = self.model.getFrameId(frame_name)
- joint_id = self.model.frames[frame_id].parentJoint
-
- # Compute the Jacobian
- J = pin.computeJointJacobians(self.model, self.data, q)
-
- J_frame = pin.getJointJacobian(self.model, self.data, joint_id, pin.ReferenceFrame.LOCAL_WORLD_ALIGNED)
-
- if J_frame is None:
- raise ValueError("Failed to compute Jacobian")
-
- return J_frame
-
-
- def get_maximum_torques(self, valid_configs : np.ndarray) -> np.ndarray | np.ndarray:
- """
- Get the maximum torques for each joint in all valid configurations.
-
- :param valid_configs: Array of valid configurations with related torques in format: [{"config", "end_effector_pos, "tau"}].
- :return: Arrays of maximum torques for each joint in the current valid configurations for left and right arm.
- """
-
- # Get the number of joints
- num_joints = len(valid_configs[0]["tau"])
- max_torques_left = np.array([], dtype=float)
- max_torques_right = np.array([], dtype=float)
-
- # Find maximum absolute torque for each joint
- for i in range(num_joints):
- joint_torques_left = [abs(config["tau"][i]) for config in valid_configs if config["arm"] == "left"]
- joint_torques_right = [abs(config["tau"][i]) for config in valid_configs if config["arm"] == "right"]
-
- max_torques_left = np.append( max_torques_left, max(joint_torques_left))
- max_torques_right = np.append( max_torques_right, max(joint_torques_right))
-
- return max_torques_left, max_torques_right
-
-
-
-
- def get_normalized_torques(self, tau : np.ndarray, target_torque : np.ndarray = None) -> np.ndarray:
- """
- Normalize the torques vector to a unified scale.
-
- :param tau: Torques vector to normalize.
- :return: Normalized torques vector.
- """
- if tau is None:
- raise ValueError("Torques vector is None")
-
- norm_tau = []
-
- # if target_torque is not specified, normalize the torques vector to the effort limits of the robot model
- if target_torque is None:
- # Normalize the torques vector
- for i, torque in enumerate(tau):
- norm_tau.append(abs(torque) / self.model.effortLimit[i])
- else:
- # Normalize the torques vector to the target torque
- for i, torque in enumerate(tau):
- if target_torque[i] != 0:
- norm_tau.append(abs(torque) / target_torque[i])
- else:
- norm_tau.append(0.0)
-
-
- return norm_tau
-
-
- def get_unified_configurations_torque(self, valid_configs : np.ndarray) -> np.ndarray | np.ndarray:
- """
- Get a unified sum of torques for all possible configurations of the robot model.
-
- :param q: Joint configuration vector.
- :param valid_configs: Array of
- """
- torques_sum = np.array([], dtype=float)
- norm_torques = np.array([], dtype=float)
- sum = 0.0
-
- for valid_config in valid_configs:
- # get the joint configuration and torques vector from the valid configuration
- q = valid_config["config"]
- tau = valid_config["tau"]
-
- # calculate the sum of torques for each joint configuration
- for joint, torque in zip(q, tau):
- #if abs(torque) < 50:
- sum += abs(torque)
-
- torques_sum = np.append(torques_sum, {"sum" : sum, "end_effector_pos" : valid_config["end_effector_pos"], "arm" : valid_config["arm"]})
- sum = 0.0 # reset the sum for the next configuration
-
- # get the maximum torque from the sum of torques for both arms
- max_torque_l = max([item["sum"] for item in torques_sum if item["arm"] == "left"])
- max_torque_r = max([item["sum"] for item in torques_sum if item["arm"] == "right"])
-
- # Normalize the torques vector to a unified scale
- for tau in torques_sum:
- if tau["arm"] == "left":
- norm_tau = tau["sum"] / max_torque_l
- else:
- norm_tau = tau["sum"] / max_torque_r
-
- # append the normalized torque to the array
- norm_torques = np.append(norm_torques, {"norm_tau" : norm_tau, "end_effector_pos" : tau["end_effector_pos"], "arm" : tau["arm"]})
-
- return norm_torques
-
-
- def check_zero(self, vec : np.ndarray) -> bool:
- """
- Checks if the vector is zero.
-
- :param vec: Vector to check.
- :return: True if the acceleration vector is zero, False otherwise.
- """
-
- return np.allclose(vec, np.zeros(self.model.nv), atol=1e-6)
-
-
- def get_zero_configuration(self) -> np.ndarray:
- """
- Get the zero configuration of the robot model.
-
- :return: Zero configuration vector.
- """
- q0 = np.zeros(self.model.nq)
- if q0 is None:
- raise ValueError("Failed to get zero configuration")
-
- return q0
-
-
- def get_zero_velocity(self) -> np.ndarray:
- """
- Get the zero velocity vector of the robot model.
-
- :return: Zero velocity vector.
- """
- v0 = np.zeros(self.model.nv)
- if v0 is None:
- raise ValueError("Failed to get zero velocity")
-
- return v0
-
-
- def get_zero_acceleration(self) -> np.ndarray:
- """
- Get the zero acceleration vector of the robot model.
-
- :return: Zero acceleration vector.
- """
- a0 = np.zeros(self.model.nv)
- if a0 is None:
- raise ValueError("Failed to get zero acceleration")
-
- return a0
-
-
- def get_random_configuration(self) -> tuple[np.ndarray, np.ndarray]:
- """
- Get a random configuration for configuration and velocity vectors.
- :return: Random configuration vectors.
- """
- q_limits_lower = self.model.lowerPositionLimit
- q_limits_upper = self.model.upperPositionLimit
-
- # generate random configuration vector within the limits
- q = np.random.uniform(q_limits_lower, q_limits_upper)
-
- if q is None:
- raise ValueError("Failed to get random configuration")
-
- # Generate random velocity vector within the limits
- qdot = np.random.uniform(-self.model.velocityLimit, self.model.velocityLimit)
- if qdot is None:
- raise ValueError("Failed to get random velocity")
-
- return q, qdot
-
-
- def check_joint_limits(self, q : np.ndarray) -> np.ndarray:
- """
- Check if the joint configuration vector is within the joint limits of the robot model.
-
- :param q: Joint configuration vector to check.
- :return: Array of booleans indicating if each joint is within the limits.
- """
- if q is None:
- raise ValueError("Joint configuration vector is None")
-
- # array to store if the joint is within the limits
- within_limits = np.zeros(self.model.njoints - 1, dtype=bool)
-
- # Check if the joint configuration is within the limits
- for i in range(self.model.njoints - 1):
- if q[i] < self.model.lowerPositionLimit[i] or q[i] > self.model.upperPositionLimit[i]:
- within_limits[i] = False
- else:
- within_limits[i] = True
-
- return within_limits
-
-
- def check_effort_limits(self, tau : np.ndarray, arm : str = None) -> np.ndarray:
- """
- Check if the torques vector is within the effort limits of the robot model.
-
- :param tau: Torques vector to check.
- :param arm: Arm name to filter the torques vector, right or left.
- :return: Array of booleans indicating if each joint is within the effort limits.
- """
- if tau is None:
- raise ValueError("Torques vector is None")
-
- # array to store if the joint is within the limits
- within_limits = np.array([], dtype=bool)
-
- # if arm is not specified, check all joints
- if arm is None:
- # Check if the torques are within the limits
- for i in range(self.model.njoints -1):
- if abs(tau[i]) > self.model.effortLimit[i]:
- print(f"\033[91mJoint {i+2} exceeds effort limit: {tau[i]} > {self.model.effortLimit[i]} \033[0m\n")
- within_limits = np.append(within_limits, False)
- else:
- within_limits = np.append(within_limits, True)
-
- if np.all(within_limits):
- print("All joints are within effort limits. \n")
-
- else:
- # Check if the torques are within the limits
- for i in range(self.model.njoints -1):
- # TODO arm is "left" or right" but the model has gripper with left and right in the same name
- if arm in self.model.names[i+1]:
- if abs(tau[i]) > self.model.effortLimit[i]:
- print(f"\033[91mJoint {i+1} exceeds effort limit: {tau[i]} > {self.model.effortLimit[i]} \033[0m\n")
- within_limits = np.append(within_limits, False)
- else:
- within_limits = np.append(within_limits, True)
-
- if np.all(within_limits):
- print("All joints are within effort limits. \n")
-
-
- return within_limits
-
-
-
- def set_position(self, pos_joints : list[float], name_positions : list[str] ) -> np.ndarray:
- """
- Convert joint positions provided by jointstate publisher to the right format of the robot model.
-
- :param pos_joints: List of joint positions provided by jointstate publisher.
- :param name_positions: List of joint names in the order provided by jointstate publisher.
- """
-
- q = np.zeros(self.model.nq)
- cont = 0
-
- for i in range(1, np.size(pos_joints) + 1):
- # find index in name positions list that corrisponds to the joint name
- # (REASON: the joint position from joint_state message are not in the same order as the joint indices in model object)
- index = name_positions.index(self.model.names[i])
-
- if self.model.joints[i].nq == 2:
- # for continuous joints (wheels)
- q[cont] = math.cos(pos_joints[index])
- q[cont + 1] = math.sin(pos_joints[index])
- else:
- # for revolute joints
- q[cont] = pos_joints[index]
-
- cont += self.model.joints[i].nq
-
- self.update_configuration(q)
-
- return q
-
-
- def get_position_for_joint_states(self, q : np.ndarray) -> np.ndarray:
- """
- Convert configuration in pinocchio format to right format of joint states publisher
- Example: continuous joints (wheels) are represented as two values in the configuration vector but
- in the joint state publisher they are represented as one value (angle).
-
- :param q : Joint configuration provided by pinocchio library
- :return: Joint positions in the format of joint state publisher.
- """
-
- config = []
-
- current_selected_config = q
-
- # Use this method to get the single values for joints, for example continuous joints (wheels) are represented as two values in the configuration vector but
- # in the joint state publisher they are represented as one value (angle)
- # so we need to convert the configuration vector to the right format for joint state publisher
- cont = 0
- for i in range(1, self.model.njoints):
- if self.model.joints[i].nq == 2:
- # for continuous joints (wheels)
- config.append({ "q" : math.atan2(current_selected_config[cont+1], current_selected_config[cont]), "joint_name" : self.model.names[i] })
-
- elif self.model.joints[i].nq == 1:
- # for revolute joints
- config.append({"q" : current_selected_config[cont], "joint_name" : self.model.names[i]})
-
- cont += self.model.joints[i].nq
-
- return config
-
-
- def get_joints_placements(self, q : np.ndarray) -> np.ndarray | float:
- """
- Get the placements of the joints in the robot model.
-
- :param q: Joint configuration vector.
- :return: Array of joint placements with names of joint, and z offset of base link.
- """
- base_link_id = self.model.getFrameId("base_link")
- offset_z = self.data.oMf[base_link_id].translation[2] # Get the z offset of the base link
-
- self.update_configuration(q)
- placements = np.array([({"name" : self.model.names[i],
- "type" : self.model.joints[i].shortname() ,
- "x": self.data.oMi[i].translation[0],
- "y": self.data.oMi[i].translation[1],
- "z": self.data.oMi[i].translation[2]}) for i in range(1, self.model.njoints)], dtype=object)
-
- return placements, offset_z
-
-
- def get_joint_placement(self, joint_id : int) -> dict:
- """
- Get the placement of a specific joint in the robot model.
-
- :param joint_id: ID of the joint to get the placement for.
- :return: Dictionary with coordinates x , y , z of the joint.
- """
-
- if joint_id < 0 or joint_id >= self.model.njoints:
- raise ValueError(f"Joint ID {joint_id} is out of bounds for the robot model with {self.model.njoints} joints.")
-
- placement = self.data.oMi[joint_id].translation
-
- return {"x" : placement[0], "y": placement[1], "z": placement[2]}
-
-
- def get_mass_matrix(self, q : np.ndarray) -> np.ndarray:
- """
- Compute the mass matrix for the robot model.
-
- :param q: Joint configuration vector.
- :return: Mass matrix.
- """
-
- mass_matrix = pin.crba(self.model, self.data, q)
- if mass_matrix is None:
- raise ValueError("Failed to compute mass matrix")
-
- return mass_matrix
-
-
-
- def get_joints(self) -> np.ndarray:
- """
- Get the array joint names of the robot model.
-
- :return: array Joint names .
- """
-
- return np.array(self.model.names[1:], dtype=str)
-
-
-
- def get_frames(self) -> np.ndarray:
- """
- Get the array of frame names in the robot model.
-
- :return: array of frame names.
- """
-
- return np.array([frame.name for frame in self.model.frames if frame.type == pin.FrameType.BODY], dtype=str)
-
-
-
- def get_active_frames(self) -> np.ndarray:
- """
- Get the array of active joint names in the robot model.
-
- :return: array of active joint names.
- """
- # Get frames where joints are parents
- frame_names = []
- for i in range(1, self.model.njoints):
- for frame in self.model.frames:
- if frame.parentJoint == i and frame.type == pin.FrameType.BODY:
- frame_names.append(frame.name)
- break
-
- return np.array(frame_names, dtype=str)
-
-
-
- def get_parent_joint_id(self, frame_name : str) -> int:
- """
- Get the parent joint ID for a given frame name.
-
- :param frame_name: Name of the frame.
- :return: Joint ID.
- """
-
- if frame_name is None:
- raise ValueError("Frame name must be provided")
-
- # Get the frame ID from the model
- frame_id = self.model.getFrameId(frame_name)
- joint_id = self.model.frames[frame_id].parentJoint
-
- if joint_id == -1:
- raise ValueError(f"Joint '{joint_id}' not found in the robot model")
-
- return joint_id
-
-
- def print_configuration(self, q : np.ndarray = None):
- """
- Print the current configuration of the robot model.
- """
-
- self.update_configuration(q)
-
- for frame_id, frame in enumerate(self.model.frames):
- placement = self.data.oMf[frame_id]
- print(f"Frame: {frame.name}")
- print(f" Rotation:\n{placement.rotation}")
- print(f" Translation:\n{placement.translation}")
-
-
- def print_torques(self, tau : np.ndarray):
- """
- Print the torques vector.
-
- :param tau: Torques vector to print.
- """
- if tau is None:
- raise ValueError("Torques vector is None")
-
- print("Torques vector:")
- for i, torque in enumerate(tau):
- # check if the joint is a prismatic joint
- if self.model.joints[i+1].shortname() == "JointModelPZ":
- print(f"Joint {i+2} {self.model.names[i+1]}: {torque:.4f} N")
-
- # for revolute joints
- else:
- print(f"Joint {i+2} {self.model.names[i+1]}: {torque:.4f} Nm")
- print("\n")
-
-
-
- def print_frames(self):
- """
- Print the frames of the robot model.
- """
- print("Body frames available in the robot: ")
- for i, frame in enumerate(self.model.frames):
- if frame.type == pin.FrameType.BODY:
- print(f"ID: {i:2d} | Name: {frame.name:20s}")
- print("\n")
-
-
- def print_acceleration(self, qddot : np.ndarray):
- """
- Print the acceleration vector.
-
- :param a: Acceleration vector to print.
- """
-
- print("Acceleration vector:")
- for i, acc in enumerate(qddot):
- print(f"Joint {i+2} {self.model.names[i+1]}: {acc:.6f} rad/s^2")
-
- print("\n")
-
-
- def print_active_joint(self):
- """
- Print the active joints of the robot model.
- """
- if self.model is None:
- raise ValueError("Model is not initialized")
-
- print("Active joints:")
- for i in range(self.model.njoints):
- print(f"Joint {i+1}: {self.model.names[i]}")
-
- print("\n")
- # placement of each joint in the kinematic tree
- # for name, oMi in zip(self.model.names, self.data.oMi):
- # print("{:<24} : {: .2f} {: .2f} {: .2f}".format(name, *oMi.translation.T.flat))
\ No newline at end of file
diff --git a/dynamic_payload_analysis_core/package.xml b/dynamic_payload_analysis_core/package.xml
deleted file mode 100644
index 3b5dfae..0000000
--- a/dynamic_payload_analysis_core/package.xml
+++ /dev/null
@@ -1,18 +0,0 @@
-
-
-
- dynamic_payload_analysis_core
- 0.0.0
- Core package with calculation for torques calculator
- morolinux
- Apache License 2.0
-
- ament_copyright
- ament_flake8
- ament_pep257
- python3-pytest
-
-
- ament_python
-
-
diff --git a/dynamic_payload_analysis_core/resource/dynamic_payload_analysis_core b/dynamic_payload_analysis_core/resource/dynamic_payload_analysis_core
deleted file mode 100644
index e69de29..0000000
diff --git a/dynamic_payload_analysis_core/setup.cfg b/dynamic_payload_analysis_core/setup.cfg
deleted file mode 100644
index 62c9e13..0000000
--- a/dynamic_payload_analysis_core/setup.cfg
+++ /dev/null
@@ -1,4 +0,0 @@
-[develop]
-script_dir=$base/lib/dynamic_payload_analysis_core
-[install]
-install_scripts=$base/lib/dynamic_payload_analysis_core
diff --git a/dynamic_payload_analysis_core/setup.py b/dynamic_payload_analysis_core/setup.py
deleted file mode 100644
index 70e2a59..0000000
--- a/dynamic_payload_analysis_core/setup.py
+++ /dev/null
@@ -1,25 +0,0 @@
-from setuptools import find_packages, setup
-
-package_name = 'dynamic_payload_analysis_core'
-
-setup(
- name=package_name,
- version='0.0.0',
- packages=find_packages(exclude=['test']),
- data_files=[
- ('share/ament_index/resource_index/packages',
- ['resource/' + package_name]),
- ('share/' + package_name, ['package.xml']),
- ],
- install_requires=['setuptools'],
- zip_safe=True,
- maintainer='Enrico Moro',
- maintainer_email='enrimoro003@gmail.com',
- description='This package implements core functionalities for dynamic payload analysis in robotics, focusing on torque calculations and external force handling.',
- license='Apache License 2.0',
- tests_require=['pytest'],
- entry_points={
- 'console_scripts': [
- ],
- },
-)
diff --git a/dynamic_payload_analysis_core/test/test_copyright.py b/dynamic_payload_analysis_core/test/test_copyright.py
deleted file mode 100644
index 97a3919..0000000
--- a/dynamic_payload_analysis_core/test/test_copyright.py
+++ /dev/null
@@ -1,25 +0,0 @@
-# Copyright 2015 Open Source Robotics Foundation, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ament_copyright.main import main
-import pytest
-
-
-# Remove the `skip` decorator once the source file(s) have a copyright header
-@pytest.mark.skip(reason='No copyright header has been placed in the generated source file.')
-@pytest.mark.copyright
-@pytest.mark.linter
-def test_copyright():
- rc = main(argv=['.', 'test'])
- assert rc == 0, 'Found errors'
diff --git a/dynamic_payload_analysis_core/test/test_flake8.py b/dynamic_payload_analysis_core/test/test_flake8.py
deleted file mode 100644
index 27ee107..0000000
--- a/dynamic_payload_analysis_core/test/test_flake8.py
+++ /dev/null
@@ -1,25 +0,0 @@
-# Copyright 2017 Open Source Robotics Foundation, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ament_flake8.main import main_with_errors
-import pytest
-
-
-@pytest.mark.flake8
-@pytest.mark.linter
-def test_flake8():
- rc, errors = main_with_errors(argv=[])
- assert rc == 0, \
- 'Found %d code style errors / warnings:\n' % len(errors) + \
- '\n'.join(errors)
diff --git a/dynamic_payload_analysis_core/test/test_pep257.py b/dynamic_payload_analysis_core/test/test_pep257.py
deleted file mode 100644
index b234a38..0000000
--- a/dynamic_payload_analysis_core/test/test_pep257.py
+++ /dev/null
@@ -1,23 +0,0 @@
-# Copyright 2015 Open Source Robotics Foundation, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ament_pep257.main import main
-import pytest
-
-
-@pytest.mark.linter
-@pytest.mark.pep257
-def test_pep257():
- rc = main(argv=['.', 'test'])
- assert rc == 0, 'Found code style errors / warnings'
diff --git a/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/__init__.py b/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/__init__.py
deleted file mode 100644
index e69de29..0000000
diff --git a/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/menu_visual.py b/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/menu_visual.py
index 571703b..e4bcdb5 100644
--- a/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/menu_visual.py
+++ b/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/menu_visual.py
@@ -51,7 +51,7 @@ def __init__(self, node):
self.selected_configuration = None
# payload mass selection array
- self.payload_selection = np.array([0.1, 0.2, 0.3, 0.4, 0.5, 1, 1.5, 1.8, 2.0, 2.5, 3.0, 3.5], dtype=float)
+ self.payload_selection = np.array([0.1, 0.2, 0.3, 0.4, 0.5, 1, 1.5, 1.8, 2.0, 2.5, 3.0, 3.5, 4.0, 4.5, 5.0, 5.5], dtype=float)
# variable to store selection between torque limits or max torque in the current configuration for color visualization
self.torque_color = TorqueVisualizationType.TORQUE_LIMITS # default is torque limits
@@ -62,12 +62,16 @@ def __init__(self, node):
# insert the reset payload button
self.reset = self.menu_handler.insert('Reset payloads', parent=self.root_frames, callback=self.callback_reset)
+ # insert label for the color menu selection
+ self.label_color_selection = self.menu_handler.insert('Select torque visualization color :',callback=self.callback_label)
+
# insert the checker for visualization color choice between torque limits or max torque in the current configuration
self.torque_limits_checker = self.menu_handler.insert('Torque limits',command=str(TorqueVisualizationType.TORQUE_LIMITS.value) , callback=self.callback_color_selection)
self.max_torque_checker = self.menu_handler.insert('Max torque', command=str(TorqueVisualizationType.MAX_CURRENT_TORQUE.value) , callback=self.callback_color_selection)
self.menu_handler.setVisible(self.torque_limits_checker, False)
self.menu_handler.setVisible(self.max_torque_checker, False)
+ self.menu_handler.setVisible(self.label_color_selection, False)
self.make_menu_marker('menu_frames')
@@ -94,6 +98,15 @@ def insert_frame(self, name):
self.menu_handler.reApply(self.server)
self.server.applyChanges()
+ def callback_label(self, feedback):
+ """
+ Callback for the label of color selection. Made only to avoid the error of missing callback.
+
+ Args:
+ feedback: Feedback from the menu selection.
+ """
+ pass
+
def callback_color_selection(self, feedback):
"""
Callback for torque selection type to change the visualization color in Rviz.
@@ -151,6 +164,7 @@ def insert_dropdown_configuration(self, configuration : np.ndarray):
# set visible true for color selection and put the default check state
self.menu_handler.setVisible(self.torque_limits_checker, True)
self.menu_handler.setVisible(self.max_torque_checker, True)
+ self.menu_handler.setVisible(self.label_color_selection, True)
self.menu_handler.setCheckState(self.torque_limits_checker, MenuHandler.CHECKED)
self.menu_handler.setCheckState(self.max_torque_checker, MenuHandler.UNCHECKED)
@@ -223,9 +237,13 @@ def callback_reset(self, feedback):
self.menu_handler.setCheckState(sub_item, MenuHandler.UNCHECKED)
# check if the item is the reset payloads or compute workspace item and skip the unchanging of the check state
- if item.title == "Reset payloads" or item.title == "Compute workspace":
+ if item.title == self.menu_handler.getTitle(self.reset) or item.title == self.menu_handler.getTitle(self.workspace_button):
continue
+ if item.title == self.menu_handler.getTitle(self.label_color_selection):
+ self.menu_handler.setVisible(self.label_color_selection, False)
+ continue
+
# set the checked of frame to unchecked
self.menu_handler.setCheckState(i,MenuHandler.UNCHECKED)
@@ -250,6 +268,11 @@ def callback_selection(self, feedback):
Args:
feedback: Feedback from the menu selection.
"""
+
+ # reset the sub-menu configuration if the user change payload selection
+ self.reset_sub_menu_configuration()
+ # -------------
+
# get name of the frame
handle = feedback.menu_entry_id
title = self.menu_handler.getTitle(handle)
@@ -269,7 +292,29 @@ def callback_selection(self, feedback):
# print the current state of the checked frames
print(f"{self.get_item_state()} \n")
-
+
+ def reset_sub_menu_configuration(self):
+ """
+ Function to reset the sub-menu configuration and related variables.
+ """
+ # reset calculated configurations in the workspace submenu
+ for i, item in self.menu_handler.entry_contexts_.items():
+ # check if the item is the workspace button and has sub entries and remove them from view
+ if item.sub_entries and item.title == self.menu_handler.getTitle(self.workspace_button):
+ # if the frame has payloads selection, we need to remove it
+ for sub_item in item.sub_entries:
+ self.menu_handler.setVisible(sub_item, False)
+ self.menu_handler.setCheckState(sub_item, MenuHandler.UNCHECKED)
+
+ # hide the torque limits and max torque checkboxes when there is no configuration selected
+ self.menu_handler.setVisible(self.torque_limits_checker, False)
+ self.menu_handler.setVisible(self.max_torque_checker, False)
+ self.menu_handler.setVisible(self.label_color_selection, False)
+
+ self.torque_color = TorqueVisualizationType.TORQUE_LIMITS # reset to default torque limits
+
+ # reset the selected configuration
+ self.selected_configuration = None
def update_item(self, name, check: bool):
@@ -328,6 +373,9 @@ def update_payload_selection(self, feedback):
Args:
feedback: Feedback from the menu selection.
"""
+ # reset the sub-menu configuration if the user change payload selection
+ self.reset_sub_menu_configuration()
+
# get the handle of the selected item (id)
handle = feedback.menu_entry_id
# get the title of the selected item (it contains number of kg)
diff --git a/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/rviz_visualization_menu.py b/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/rviz_visualization_menu.py
deleted file mode 100644
index a6825dd..0000000
--- a/dynamic_payload_analysis_ros/dynamic_payload_analysis_ros/rviz_visualization_menu.py
+++ /dev/null
@@ -1,485 +0,0 @@
-#!/usr/bin/env python3
-
-# Copyright (c) 2025 PAL Robotics S.L. All rights reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import rclpy
-from rclpy.node import Node
-from std_msgs.msg import String, ColorRGBA
-from geometry_msgs.msg import WrenchStamped, Point
-from sensor_msgs.msg import JointState
-from dynamic_payload_analysis_core.core import TorqueCalculator
-import numpy as np
-from visualization_msgs.msg import Marker, MarkerArray
-from dynamic_payload_analysis_ros.menu_visual import MenuPayload
-from dynamic_payload_analysis_ros.menu_visual import TorqueVisualizationType
-
-
-
-
-
-class RobotDescriptionSubscriber(Node):
- def __init__(self):
- super().__init__('node_robot_description_subscriber')
- self.subscription = self.create_subscription(
- String,
- '/robot_description',
- self.robot_description_callback,
- qos_profile=rclpy.qos.QoSProfile( durability=rclpy.qos.DurabilityPolicy.TRANSIENT_LOCAL, depth = 1)
- )
-
- # menu for selecting frames to apply payload
- self.menu = MenuPayload(self)
-
- # Publisher for external force
- self.publisher_force = self.create_publisher(MarkerArray, '/external_forces', 10)
-
- # Publisher for RViz visualization of torques
- self.publisher_rviz_torque = self.create_publisher(MarkerArray, '/torque_visualization', 10)
-
- # Pusblisher for point cloud workspace area
- self.publisher_workspace_area = self.create_publisher(MarkerArray, '/workspace_area', 10)
-
- # Publisher for point names in the workspace area
- self.publisher_workspace_area_names = self.create_publisher(MarkerArray, '/workspace_area_names', 10)
-
- # subscription to joint states
- self.joint_states_subscription = self.create_subscription(JointState, '/joint_states', self.joint_states_callback, 10)
-
- # publisher for the joint states
- self.publisher_joint_states = self.create_publisher(JointState, '/joint_states', 10)
-
- # variable to store the object of the TorqueCalculator
- self.robot = None
-
- # variable to store external force applied on the robot
- self.external_force = None
-
- # variable to store checked frames in the menu where a payload is applied
- self.checked_frames = []
-
- # variable to store masses applied on the frames
- self.masses = None
-
- # variable to store the currente selected configuration from the workspace menu
- self.valid_configurations = None
-
- # variable to store if there is a selected configuration from the workspace menu to visualize
- self.selected_configuration = None
-
- # timer to compute the valid workspace area
- self.timer_workspace_calculation = self.create_timer(1, self.workspace_calculation)
- # timer to publish the selected configuration in joint states
- self.timer_publish_configuration = self.create_timer(2, self.publish_selected_configuration)
- # timer to calculate the external forces based on the checked frames in the menu
- self.timer_update_checked_frames = self.create_timer(0.5, self.update_checked_frames)
- # timer to publish the external forces as arrows in RViz
- self.timer_publish_force = self.create_timer(1, self.publish_payload_force)
-
- self.get_logger().info('Robot description subscriber node started')
-
-
- def robot_description_callback(self, msg):
- """
- Callback function for the robot description topic.
- This function initializes the TorqueCalculator with the robot description received from the topic.
- """
-
- self.get_logger().info('Received robot description')
-
- self.robot = TorqueCalculator(robot_description = msg.data)
- self.robot.print_active_joint()
- self.robot.print_frames()
-
- # Add the frame to the menu for payload selection
- for frame in self.robot.get_active_frames():
- self.menu.insert_frame(frame)
-
- # self.robot.print_configuration()
-
-
- def publish_payload_force(self):
- """
- Publish the gravity force on the frame with id `id_force`.
- """
- external_force_array = MarkerArray()
-
- for frame in self.menu.get_item_state():
-
- id_force = self.robot.get_parent_joint_id(frame["name"])
- joint_position = self.robot.get_joint_placement(id_force)
- arrow_force = Marker()
-
- arrow_force.header.frame_id = "base_link"
- arrow_force.header.stamp = self.get_clock().now().to_msg()
- arrow_force.ns = "external_force"
- arrow_force.id = id_force
- arrow_force.type = Marker.ARROW
-
- # add the arrow if the frame is checked or delete it if not
- if frame["checked"]:
- arrow_force.action = Marker.ADD
- else:
- arrow_force.action = Marker.DELETE
-
- arrow_force.scale.x = 0.20 # Length of the arrow
- arrow_force.scale.y = 0.05 # Width of the arrow
- arrow_force.scale.z = 0.05 # Height of the arrow
- arrow_force.color.a = 1.0 # Alpha
- arrow_force.color.r = 0.0
- arrow_force.color.g = 0.0 # Green
- arrow_force.color.b = 1.0
-
- # Set the position of the arrow at the joint placement
- arrow_force.pose.position.x = joint_position["x"]
- arrow_force.pose.position.y = joint_position["y"]
- arrow_force.pose.position.z = joint_position["z"]
- # Set the direction of the arrow downwards
- arrow_force.pose.orientation.x = 0.0
- arrow_force.pose.orientation.y = 0.7071
- arrow_force.pose.orientation.z = 0.0
- arrow_force.pose.orientation.w = 0.7071
-
- external_force_array.markers.append(arrow_force)
-
- self.publisher_force.publish(external_force_array)
-
-
-
- def workspace_calculation(self):
- """
- Callback for timer to compute the valid workspace area.
- """
- # if the user choose to compute the workspace area then compute the valid configurations
- if self.menu.get_workspace_state():
- self.valid_configurations = self.robot.get_valid_workspace(2, 0.20, "gripper_left_finger_joint", "gripper_right_finger_joint", self.masses, self.checked_frames)
-
- # insert the valid configurations in the menu
- self.menu.insert_dropdown_configuration(self.valid_configurations)
-
- # clear all the workspace area markers
- self.clear_workspace_area_markers()
-
- # set the workspace state to False to stop the computation
- self.menu.set_workspace_state(False)
-
- # if there are valid configurations, publish the workspace area
- if self.valid_configurations is not None:
- # publish the workspace area
- self.publish_workspace_area(self.valid_configurations)
-
-
-
- def publish_selected_configuration(self):
- """
- Timer to publish the selected configuration.
- This will publish the joint states of the selected configuration in the menu.
- """
- # get the selected configuration from the menu
- self.selected_configuration = self.menu.get_selected_configuration()
-
- # if there is a selected configuration, publish the joint states based on the valid configurations calculated previously
- if self.selected_configuration is not None:
- configs = self.robot.get_position_for_joint_states(self.valid_configurations[self.selected_configuration]["config"])
- joint_state = JointState()
- joint_state.header.stamp = self.get_clock().now().to_msg()
-
- joint_state.name = [joint["joint_name"] for joint in configs]
- joint_state.position = [joint["q"] for joint in configs]
- #joint_state.position =
- self.publisher_joint_states.publish(joint_state)
-
- else:
- if self.robot is not None:
- # if there is no selected configuration, publish the joint states with zero positions
- joint_state = JointState()
- joint_state.header.stamp = self.get_clock().now().to_msg()
-
- joint_state.name = self.robot.get_joints().tolist()
- zero_config = self.robot.get_position_for_joint_states(self.robot.get_zero_configuration())
- joint_state.position = [joint["q"] for joint in zero_config]
-
- self.publisher_joint_states.publish(joint_state)
-
-
- def update_checked_frames(self):
- """
- Function to update the external forces based on the checked frames in the menu.
- """
- # create the array with only the checked frames (with external force applied)
- self.checked_frames = np.array([check_frame["name"] for check_frame in self.menu.get_item_state() if check_frame['checked']])
-
- if len(self.checked_frames) != 0:
- # create the array with the masses of the checked frames
- self.masses = np.array([check_frame["payload"] for check_frame in self.menu.get_item_state() if check_frame['checked']])
- else:
- # if there are no checked frames, set the masses to None
- self.masses = None
-
-
- def joint_states_callback(self, msg):
- """
- Callback function for the joint states topic.
- """
- if self.robot is not None:
- # if you are not using the calculated configuration from workspace, you can use the joint states to compute the torques because you don't have already the computed torques
- if self.selected_configuration is None:
- self.positions = list(msg.position)
- self.name_positions = list(msg.name)
- v = msg.velocity if msg.velocity else self.robot.get_zero_velocity()
-
- # set the positions based on the joint states
- q = self.robot.set_position(self.positions, self.name_positions)
- a = self.robot.get_zero_acceleration()
-
- # if there are no checked frames, set the external force to None
- if len(self.checked_frames) != 0:
- # create the external force with the masses and the checked frames
- self.external_force = self.robot.create_ext_force(masses=self.masses, frame_name=self.checked_frames, q=q)
- else:
- self.external_force = None
-
- # compute the inverse dynamics
- tau = self.robot.compute_inverse_dynamics(q, v, a, extForce=self.external_force)
-
- # check the effort limits
- status_efforts = self.robot.check_effort_limits(tau)
-
- # print the torques
- self.robot.print_torques(tau)
-
- # get the positions of the joints where the torques are applied based on
- joints_position, offset_z = self.robot.get_joints_placements(q)
-
- # Publish the torques in RViz
- self.publish_label_torques(tau, status_torques=status_efforts ,joints_position=joints_position)
-
- else:
- # if you are using the calculated configuration from workspace, you can use the valid configurations to visualize the torques labels
-
- # get the positions of the joints where the torques are applied based on
- joints_position, offset_z = self.robot.get_joints_placements(self.valid_configurations[self.selected_configuration]["config"])
- # get the torques status (if the torques are within the limits)
- status_efforts = self.robot.check_effort_limits(self.valid_configurations[self.selected_configuration]["tau"])
-
- self.publish_label_torques(self.valid_configurations[self.selected_configuration]["tau"], status_torques=status_efforts ,joints_position=joints_position)
-
-
-
- def publish_label_torques(self, torque: np.ndarray, status_torques : np.ndarray, joints_position: np.ndarray):
- """
- Publish the torque with labels on the robot in RViz.
-
- Args:
- torque (np.ndarray): The torque to be published
- status_torques (np.ndarray): The status of the torques, True if the torque is within the limits, False otherwise
- joints_position (np.ndarray): The positions of the joints where the torques are applied
- """
- marker_array = MarkerArray()
- for i, (t, joint) in enumerate(zip(torque, joints_position)):
- # remove the gripper joints from the visualization TODO: make it more general (with MIMIC joints)
- if "gripper" not in joint['name']:
- marker = Marker()
- marker.header.frame_id = "base_link"
- marker.header.stamp = self.get_clock().now().to_msg()
- marker.ns = "torque_visualization"
- marker.id = i
- marker.type = Marker.TEXT_VIEW_FACING
- # Set the text based on the joint type
- if joint['type'] == 'JointModelPZ':
- marker.text = f"{joint['name']}: {t:.2f} N"
- else:
- marker.text = f"{joint['name']}: {t:.2f} Nm"
-
- marker.action = Marker.ADD
- marker.pose.position.x = joint['x']
- marker.pose.position.y = joint['y']
- marker.pose.position.z = joint['z']
- marker.pose.orientation.w = 1.0
- marker.scale.x = 0.03
- marker.scale.y = 0.03
- marker.scale.z = 0.03
- marker.color.a = 1.0 # Alpha
- if status_torques[i]:
- marker.color.r = 0.0 # Red
- marker.color.g = 1.0 # Green
- marker.color.b = 0.0 # Blue
- else:
- marker.color.r = 1.0 # Red
- marker.color.g = 0.0 # Green
- marker.color.b = 0.0 # Blue
- marker_array.markers.append(marker)
-
- self.publisher_rviz_torque.publish(marker_array)
-
-
- def publish_workspace_area(self, valid_configs: np.ndarray ):
- """
- Publish the workspace area in RViz using points and labels for the end points.
-
- Args:
- valid_configs (np.ndarray): Current valid configurations in the workspace of the robot.
- """
- # Create a MarkerArray to visualize the number of configuration of a specific point in the workspace
- marker_point_names = MarkerArray()
-
- # Create a Marker for the workspace area using points
- marker_points = MarkerArray()
-
- # calculate the maximum torques for each joint in the current valid configurations for each arm only if the user selected the max current torque visualization
- if self.menu.get_torque_color() == TorqueVisualizationType.MAX_CURRENT_TORQUE:
- max_torque_for_joint_left, max_torque_for_joint_right = self.robot.get_maximum_torques(valid_configs)
-
- cont = 0
- # Iterate through the valid configurations and create markers
- for i, valid_config in enumerate(valid_configs):
-
- # create the label for the end point (end effector position) of the valid configuration
- marker_point_name = Marker()
- marker_point_name.header.frame_id = "base_link"
- marker_point_name.header.stamp = self.get_clock().now().to_msg()
-
- marker_point_name.ns = f"workspace_area_{valid_config['arm']}_arm"
- marker_point_name.id = i + 1
- marker_point_name.type = Marker.TEXT_VIEW_FACING
- marker_point_name.text = f"Config {i}"
- marker_point_name.action = Marker.ADD
- marker_point_name.pose.position.x = valid_config["end_effector_pos"][0]
- marker_point_name.pose.position.y = valid_config["end_effector_pos"][1]
- marker_point_name.pose.position.z = valid_config["end_effector_pos"][2] + 0.05 # Offset to avoid overlap with the sphere
- marker_point_name.pose.orientation.w = 1.0
- marker_point_name.scale.x = 0.02
- marker_point_name.scale.y = 0.02
- marker_point_name.scale.z = 0.02
- marker_point_name.color.a = 1.0 # Alpha
- marker_point_name.color.r = 1.0 # Red
- marker_point_name.color.g = 1.0 # Green
- marker_point_name.color.b = 1.0 # Blue
-
- # get the joint positions for the valid configuration
- joint_positions, offset_z = self.robot.get_joints_placements(valid_config["config"])
-
- # get the normalized torques for the valid configuration with current target limits for color visualization
- if self.menu.get_torque_color() == TorqueVisualizationType.TORQUE_LIMITS:
- norm_joints_torques = self.robot.get_normalized_torques(valid_config["tau"])
- else:
- # if the user selected the max torque, use the maximum torques for the joint
- if valid_config["arm"] == "left":
- norm_joints_torques = self.robot.get_normalized_torques(valid_config["tau"],max_torque_for_joint_left)
- else:
- norm_joints_torques = self.robot.get_normalized_torques(valid_config["tau"],max_torque_for_joint_right)
-
- # insert points related to the end effector position in the workspace area and with color based on the normalized torques for each joint
- for joint_pos,tau in zip(joint_positions,norm_joints_torques):
- # print only the points for the corrisponding arm, in this way we can visualize the workspace area for each arm separately and avoid confusion
- if valid_config["arm"] in joint_pos["name"] :
- point = Marker()
- point.header.frame_id = "base_link"
- point.header.stamp = self.get_clock().now().to_msg()
- point.ns = joint_pos["name"]
- point.id = cont
- point.type = Marker.SPHERE
- point.action = Marker.ADD
- point.scale.x = 0.03 # Size of the sphere
- point.scale.y = 0.03
- point.scale.z = 0.03
-
- point.pose.position.x = valid_config["end_effector_pos"][0]
- point.pose.position.y = valid_config["end_effector_pos"][1]
- point.pose.position.z = valid_config["end_effector_pos"][2] - offset_z
- point.pose.orientation.w = 1.0
- point.color.a = 1.0 # Alpha
- point.color.r = tau # Red
- point.color.g = 1 - tau # Green
- point.color.b = 0.0 # Blue
-
- cont += 1
-
- # Add the point to the points array
- marker_points.markers.append(point)
-
- # Add the marker point name to the marker point names array
- marker_point_names.markers.append(marker_point_name)
-
-
- # get the unified torque for the valid configurations
- unified_configurations_torque = self.robot.get_unified_configurations_torque(valid_configs)
-
- # insert points related to the end effector position in the workspace area and with color based on the normalized torque for each configuration
- # this is used to visualize the workspace area with the unified torques for each configuration
- for i, unified_config in enumerate(unified_configurations_torque):
- marker_point = Marker()
- marker_point.header.frame_id = "base_link"
- marker_point.header.stamp = self.get_clock().now().to_msg()
- marker_point.ns = f"unified_torque_workspace_{unified_config['arm']}_arm"
- marker_point.id = i
- marker_point.type = Marker.SPHERE
- marker_point.action = Marker.ADD
- marker_point.scale.x = 0.03 # Size of the sphere
- marker_point.scale.y = 0.03
- marker_point.scale.z = 0.03
- marker_point.pose.position.x = unified_config["end_effector_pos"][0]
- marker_point.pose.position.y = unified_config["end_effector_pos"][1]
- marker_point.pose.position.z = unified_config["end_effector_pos"][2] - offset_z # Offset to avoid overlap with the sphere
- marker_point.pose.orientation.w = 1.0
- marker_point.color.a = 1.0 # Alpha
-
- # Color based on the normalized torques
- marker_point.color.r = unified_config["norm_tau"]
- marker_point.color.g = 1 - unified_config["norm_tau"]
- marker_point.color.b = 0.0 # Blue
-
- # Add the marker point to the points array
- marker_points.markers.append(marker_point)
-
-
- # Publish the marker points and names
- self.publisher_workspace_area.publish(marker_points)
- self.publisher_workspace_area_names.publish(marker_point_names)
-
-
- def clear_workspace_area_markers(self):
- """
- Clear the workspace area markers in RViz.
- This will publish an empty MarkerArray to remove the previous markers.
- """
- # create an empty MarkerArray to clear the markers with a DELETEALL action
- marker_array_msg = MarkerArray()
- marker = Marker()
- marker.id = 0
- marker.action = Marker.DELETEALL
- marker_array_msg.markers.append(marker)
-
- # Publish the empty MarkerArray to clear the markers
- self.publisher_workspace_area.publish(marker_array_msg)
- self.publisher_workspace_area_names.publish(marker_array_msg)
-
-
-
-
-def main(args=None):
- rclpy.init(args=args)
- node = RobotDescriptionSubscriber()
-
- try:
- rclpy.spin(node)
- except KeyboardInterrupt:
- pass
- finally:
- node.destroy_node()
- rclpy.shutdown()
-
-
-if __name__ == '__main__':
- main()
\ No newline at end of file
diff --git a/dynamic_payload_analysis_ros/package.xml b/dynamic_payload_analysis_ros/package.xml
deleted file mode 100644
index f39adf4..0000000
--- a/dynamic_payload_analysis_ros/package.xml
+++ /dev/null
@@ -1,18 +0,0 @@
-
-
-
- dynamic_payload_analysis_ros
- 0.0.0
- This package provides tools for dynamic payload analysis in robotics with a focus on torque calculations and external force handling.
- morolinux
- Apache License 2.0
-
- ament_copyright
- ament_flake8
- ament_pep257
- python3-pytest
-
-
- ament_python
-
-
diff --git a/dynamic_payload_analysis_ros/resource/dynamic_payload_analysis_ros b/dynamic_payload_analysis_ros/resource/dynamic_payload_analysis_ros
deleted file mode 100644
index e69de29..0000000
diff --git a/dynamic_payload_analysis_ros/setup.cfg b/dynamic_payload_analysis_ros/setup.cfg
deleted file mode 100644
index 5a64333..0000000
--- a/dynamic_payload_analysis_ros/setup.cfg
+++ /dev/null
@@ -1,4 +0,0 @@
-[develop]
-script_dir=$base/lib/dynamic_payload_analysis_ros
-[install]
-install_scripts=$base/lib/dynamic_payload_analysis_ros
diff --git a/dynamic_payload_analysis_ros/setup.py b/dynamic_payload_analysis_ros/setup.py
deleted file mode 100644
index 901e1da..0000000
--- a/dynamic_payload_analysis_ros/setup.py
+++ /dev/null
@@ -1,27 +0,0 @@
-from setuptools import find_packages, setup
-
-package_name = 'dynamic_payload_analysis_ros'
-
-setup(
- name=package_name,
- version='0.0.0',
- packages=find_packages(exclude=['test']),
- data_files=[
- ('share/ament_index/resource_index/packages',
- ['resource/' + package_name]),
- ('share/' + package_name, ['package.xml']),
- ],
- install_requires=['setuptools'],
- zip_safe=True,
- maintainer='Enrico Moro',
- maintainer_email='enrimoro003@gmail.com',
- description='This package provides graphics tools in Rviz for dynamic payload analysis in robotics with a focus on torque calculations and external force handling.',
- license='Apache License 2.0',
- tests_require=['pytest'],
- entry_points={
- 'console_scripts': [
- 'node_rviz_visualization = dynamic_payload_analysis_ros.rviz_visualization:main',
- 'node_rviz_visualization_menu = dynamic_payload_analysis_ros.rviz_visualization_menu:main',
- ],
- },
-)
diff --git a/dynamic_payload_analysis_ros/test/test_copyright.py b/dynamic_payload_analysis_ros/test/test_copyright.py
deleted file mode 100644
index 97a3919..0000000
--- a/dynamic_payload_analysis_ros/test/test_copyright.py
+++ /dev/null
@@ -1,25 +0,0 @@
-# Copyright 2015 Open Source Robotics Foundation, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ament_copyright.main import main
-import pytest
-
-
-# Remove the `skip` decorator once the source file(s) have a copyright header
-@pytest.mark.skip(reason='No copyright header has been placed in the generated source file.')
-@pytest.mark.copyright
-@pytest.mark.linter
-def test_copyright():
- rc = main(argv=['.', 'test'])
- assert rc == 0, 'Found errors'
diff --git a/dynamic_payload_analysis_ros/test/test_flake8.py b/dynamic_payload_analysis_ros/test/test_flake8.py
deleted file mode 100644
index 27ee107..0000000
--- a/dynamic_payload_analysis_ros/test/test_flake8.py
+++ /dev/null
@@ -1,25 +0,0 @@
-# Copyright 2017 Open Source Robotics Foundation, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ament_flake8.main import main_with_errors
-import pytest
-
-
-@pytest.mark.flake8
-@pytest.mark.linter
-def test_flake8():
- rc, errors = main_with_errors(argv=[])
- assert rc == 0, \
- 'Found %d code style errors / warnings:\n' % len(errors) + \
- '\n'.join(errors)
diff --git a/dynamic_payload_analysis_ros/test/test_pep257.py b/dynamic_payload_analysis_ros/test/test_pep257.py
deleted file mode 100644
index b234a38..0000000
--- a/dynamic_payload_analysis_ros/test/test_pep257.py
+++ /dev/null
@@ -1,23 +0,0 @@
-# Copyright 2015 Open Source Robotics Foundation, Inc.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ament_pep257.main import main
-import pytest
-
-
-@pytest.mark.linter
-@pytest.mark.pep257
-def test_pep257():
- rc = main(argv=['.', 'test'])
- assert rc == 0, 'Found code style errors / warnings'