Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ def __init__(self, robot_description : Union[str, Path]):
# compute main trees of the robot model
self.compute_subtrees()


# array to store all analyzed points
self.analyzed_points = np.array([], dtype=object)

# array to store all configurations for the robot model
self.configurations = np.array([], dtype=object)
Expand Down Expand Up @@ -422,11 +423,17 @@ def compute_all_configurations(self, range : int, resolution : int, end_joint_id

# Create an array to store all configurations
configurations = []

# restore analyzed points array
self.analyzed_points = np.array([], dtype=object)

# Iterate over the range to compute all configurations
for x in np.arange(-range, range , resolution):
for y in np.arange(-range, range , resolution):
for z in np.arange(-range/2, range , resolution):

self.analyzed_points = np.append(self.analyzed_points, {"position" : [x, y, z]})

target_position = pin.SE3(np.eye(3), np.array([x, y, z]))
new_q = self.compute_inverse_kinematics(q, target_position, end_joint_id)

Expand Down Expand Up @@ -873,6 +880,15 @@ def get_zero_acceleration(self) -> np.ndarray:
return a0


def get_analyzed_points(self) -> np.ndarray:
"""
Get the analyzed points during the computation of all configurations.

:return: Array of analyzed points.
"""

return self.analyzed_points

def get_random_configuration(self) -> tuple[np.ndarray, np.ndarray]:
"""
Get a random configuration for configuration and velocity vectors.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ def __init__(self):
# Pusblisher for point cloud of maximum payloads in the workspace area
self.publisher_maximum_payloads = self.create_publisher(MarkerArray, '/maximum_payloads', 10)

# Publisher for point cloud of all analyzed points
self.publisher_analyzed_points = self.create_publisher(MarkerArray, '/analyzed_points', 10)

# subscription to joint states
self.joint_states_subscription = self.create_subscription(JointState, '/joint_states', self.joint_states_callback, 10)

Expand Down Expand Up @@ -103,6 +106,9 @@ def __init__(self):
# variable to store if there is a selected configuration from the workspace menu to visualize
self.selected_configuration = None

# variable to store analyzed points of the workspace area
self.marker_analyzed_points = None

# timer to compute the valid workspace area
self.timer_workspace_calculation = self.create_timer(1.0, self.workspace_calculation)
# timer to publish the selected configuration in joint states
Expand All @@ -113,6 +119,8 @@ def __init__(self):
self.timer_publish_force = self.create_timer(1.0, self.publish_payload_force)
# timer to update items in the menu for payload selection
self.timer_update_payload_selection = self.create_timer(0.5, self.update_payload_selection)
# timer to publish all analyzed points in the workspace area
self.timer_publish_analyzed_points = self.create_timer(2, self.publish_analyzed_points)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are you publishing with a timer?
I didn't notice the previous one earlier

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in this way I reduce the time comsuming in that function, or do you mean that it is possible to publish it once and then stop to publish it because it is the same for all time ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check "transient local” here Quality of Service settings — ROS 2 Documentation: Rolling documentation https://share.google/2BqvINadfaCiZ6Qrl

It is similar to the one you are using to subscribe to a robot_description

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so in that way I will publish once and it will not require multiple publishing

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct


self.get_logger().info('Robot description subscriber node started')

Expand Down Expand Up @@ -161,6 +169,53 @@ def update_payload_selection(self):

self.links = links # Update the links to the current ones


def publish_analyzed_points(self):
"""
Publish the analyzed points in the workspace area.
This will publish all the points in the workspace area where the robot can reach with the current configuration.
"""
if self.menu is not None:

if len(self.robot_handler.get_analyzed_points()) == 0:
return
else:
if self.marker_analyzed_points is None:

self.marker_analyzed_points = MarkerArray()

# publish the analyzed points
for i, analyzed_point in enumerate(self.robot_handler.get_analyzed_points()):
point = Marker()
point.header.frame_id = self.robot_handler.get_root_name()
point.header.stamp = Time()
point.ns = f"analyzed_points_ns"
point.id = i
point.type = Marker.SPHERE
point.action = Marker.ADD
point.scale.x = 0.01
point.scale.y = 0.01
point.scale.z = 0.01
point.pose.position.x = analyzed_point['position'][0]
point.pose.position.y = analyzed_point['position'][1]
point.pose.position.z = analyzed_point['position'][2]
point.pose.orientation.w = 1.0
point.color.a = 1.0 # Alpha
point.color.r = 1.0 # Red
point.color.g = 1.0 # Green
point.color.b = 1.0 # Blue

self.marker_analyzed_points.markers.append(point)

self.publisher_analyzed_points.publish(self.marker_analyzed_points)

else:
# update with current time
for marker in self.marker_analyzed_points.markers:
marker.header.stamp = Time()

self.publisher_analyzed_points.publish(self.marker_analyzed_points)

def publish_payload_force(self):
"""
Publish the gravity force on the frame with id `id_force`.
Expand Down