Seed the simulation
This commit is contained in:
@@ -1,10 +1,9 @@
|
||||
import numpy as np
|
||||
import scipy.interpolate
|
||||
from robot_utils.spatial import Transform
|
||||
import rospy
|
||||
|
||||
from active_grasp.policy import BasePolicy
|
||||
from robot_utils.ros import tf
|
||||
from robot_helpers.ros import tf
|
||||
from vgn.utils import look_at
|
||||
|
||||
|
||||
@@ -14,9 +13,8 @@ class SingleView(BasePolicy):
|
||||
"""
|
||||
|
||||
def update(self):
|
||||
self.integrate_latest_image()
|
||||
self.draw_scene_cloud()
|
||||
self.best_grasp = self.predict_best_grasp()
|
||||
self._integrate_latest_image()
|
||||
self.best_grasp = self._predict_best_grasp()
|
||||
self.done = True
|
||||
|
||||
|
||||
@@ -37,12 +35,10 @@ class TopView(BasePolicy):
|
||||
error = current.translation - self.target.translation
|
||||
|
||||
if np.linalg.norm(error) < 0.01:
|
||||
self.best_grasp = self.predict_best_grasp()
|
||||
self.best_grasp = self._predict_best_grasp()
|
||||
self.done = True
|
||||
else:
|
||||
self.integrate_latest_image()
|
||||
self.draw_scene_cloud()
|
||||
self.draw_camera_path()
|
||||
self._integrate_latest_image()
|
||||
return self.target
|
||||
|
||||
|
||||
@@ -71,12 +67,10 @@ class RandomView(BasePolicy):
|
||||
error = current.translation - self.target.translation
|
||||
|
||||
if np.linalg.norm(error) < 0.01:
|
||||
self.best_grasp = self.predict_best_grasp()
|
||||
self.best_grasp = self._predict_best_grasp()
|
||||
self.done = True
|
||||
else:
|
||||
self.integrate_latest_image()
|
||||
self.draw_scene_cloud()
|
||||
self.draw_camera_path()
|
||||
self._integrate_latest_image()
|
||||
return self.target
|
||||
|
||||
|
||||
@@ -101,17 +95,15 @@ class FixedTrajectory(BasePolicy):
|
||||
def update(self):
|
||||
elapsed_time = (rospy.Time.now() - self.tic).to_sec()
|
||||
if elapsed_time > self.duration:
|
||||
self.best_grasp = self.predict_best_grasp()
|
||||
self.best_grasp = self._predict_best_grasp()
|
||||
self.done = True
|
||||
else:
|
||||
self.integrate_latest_image()
|
||||
self._integrate_latest_image()
|
||||
t = self.m(elapsed_time)
|
||||
eye = self.circle_center + np.r_[self.r * np.cos(t), self.r * np.sin(t), 0]
|
||||
center = (self.bbox.min + self.bbox.max) / 2.0
|
||||
up = np.r_[1.0, 0.0, 0.0]
|
||||
target = self.T_B_task * (self.T_EE_cam * look_at(eye, center, up)).inv()
|
||||
self.draw_scene_cloud()
|
||||
self.draw_camera_path()
|
||||
return target
|
||||
|
||||
|
||||
@@ -122,9 +114,8 @@ class AlignmentView(BasePolicy):
|
||||
|
||||
def activate(self, bbox):
|
||||
super().activate(bbox)
|
||||
self.integrate_latest_image()
|
||||
self.draw_scene_cloud()
|
||||
self.best_grasp = self.predict_best_grasp()
|
||||
self._integrate_latest_image()
|
||||
self.best_grasp = self._predict_best_grasp()
|
||||
if self.best_grasp:
|
||||
R, t = self.best_grasp.rotation, self.best_grasp.translation
|
||||
center = t
|
||||
@@ -139,10 +130,8 @@ class AlignmentView(BasePolicy):
|
||||
error = current.translation - self.target.translation
|
||||
|
||||
if np.linalg.norm(error) < 0.01:
|
||||
self.best_grasp = self.predict_best_grasp()
|
||||
self.best_grasp = self._predict_best_grasp()
|
||||
self.done = True
|
||||
else:
|
||||
self.integrate_latest_image()
|
||||
self.draw_scene_cloud()
|
||||
self.draw_camera_path()
|
||||
self._integrate_latest_image()
|
||||
return self.target
|
||||
|
26
active_grasp/bbox.py
Normal file
26
active_grasp/bbox.py
Normal file
@@ -0,0 +1,26 @@
|
||||
import numpy as np
|
||||
|
||||
import active_grasp.msg
|
||||
from robot_helpers.ros.conversions import to_point_msg, from_point_msg
|
||||
|
||||
|
||||
class AABBox:
|
||||
def __init__(self, bbox_min, bbox_max):
|
||||
self.min = bbox_min
|
||||
self.max = bbox_max
|
||||
|
||||
def is_inside(self, p):
|
||||
return np.all(p > self.min) and np.all(p < self.max)
|
||||
|
||||
|
||||
def from_bbox_msg(msg):
|
||||
aabb_min = from_point_msg(msg.min)
|
||||
aabb_max = from_point_msg(msg.max)
|
||||
return AABBox(aabb_min, aabb_max)
|
||||
|
||||
|
||||
def to_bbox_msg(bbox):
|
||||
msg = active_grasp.msg.AABBox()
|
||||
msg.min = to_point_msg(bbox.min)
|
||||
msg.max = to_point_msg(bbox.max)
|
||||
return msg
|
@@ -1,64 +1,69 @@
|
||||
from geometry_msgs.msg import PoseStamped
|
||||
import numpy as np
|
||||
import rospy
|
||||
import time
|
||||
|
||||
from active_grasp.bbox import from_bbox_msg
|
||||
from active_grasp.srv import Reset, ResetRequest
|
||||
from active_grasp.utils import *
|
||||
from robot_utils.ros.panda import PandaGripperClient
|
||||
from robot_utils.spatial import Rotation, Transform
|
||||
from robot_helpers.ros.conversions import to_pose_stamped_msg
|
||||
from robot_helpers.ros.panda import PandaGripperClient
|
||||
from robot_helpers.spatial import Rotation, Transform
|
||||
|
||||
|
||||
class GraspController:
|
||||
def __init__(self, policy):
|
||||
self.policy = policy
|
||||
self.controller = CartesianPoseControllerClient()
|
||||
self.gripper = PandaGripperClient()
|
||||
self.reset_env = rospy.ServiceProxy("reset", Reset)
|
||||
self.load_parameters()
|
||||
self._reset_env = rospy.ServiceProxy("reset", Reset)
|
||||
self._load_parameters()
|
||||
self._init_robot_control()
|
||||
|
||||
def load_parameters(self):
|
||||
def _load_parameters(self):
|
||||
self.T_G_EE = Transform.from_list(rospy.get_param("~ee_grasp_offset")).inv()
|
||||
|
||||
def run(self):
|
||||
bbox = self.reset()
|
||||
with Timer("exploration_time"):
|
||||
grasp = self.explore(bbox)
|
||||
with Timer("execution_time"):
|
||||
res = self.execute_grasp(grasp)
|
||||
return self.collect_info(res)
|
||||
def _init_robot_control(self):
|
||||
self.target_pose_pub = rospy.Publisher("command", PoseStamped, queue_size=10)
|
||||
self.gripper = PandaGripperClient()
|
||||
|
||||
def reset(self):
|
||||
req = ResetRequest()
|
||||
res = self.reset_env(req)
|
||||
def _send_cmd(self, pose):
|
||||
msg = to_pose_stamped_msg(pose, "panda_link0")
|
||||
self.target_pose_pub.publish(msg)
|
||||
|
||||
def run(self):
|
||||
bbox = self._reset()
|
||||
with Timer("search_time"):
|
||||
grasp = self._search_grasp(bbox)
|
||||
res = self._execute_grasp(grasp)
|
||||
return self._collect_info(res)
|
||||
|
||||
def _reset(self):
|
||||
res = self._reset_env(ResetRequest())
|
||||
rospy.sleep(1.0) # wait for states to be updated
|
||||
return from_bbox_msg(res.bbox)
|
||||
|
||||
def explore(self, bbox):
|
||||
def _search_grasp(self, bbox):
|
||||
self.policy.activate(bbox)
|
||||
r = rospy.Rate(self.policy.rate)
|
||||
while not self.policy.done:
|
||||
while True:
|
||||
cmd = self.policy.update()
|
||||
if self.policy.done:
|
||||
break
|
||||
self.controller.send_target(cmd)
|
||||
self._send_cmd(cmd)
|
||||
r.sleep()
|
||||
return self.policy.best_grasp
|
||||
|
||||
def execute_grasp(self, grasp):
|
||||
def _execute_grasp(self, grasp):
|
||||
if not grasp:
|
||||
return "aborted"
|
||||
|
||||
T_B_G = self.postprocess(grasp)
|
||||
|
||||
T_B_G = self._postprocess(grasp)
|
||||
self.gripper.move(0.08)
|
||||
|
||||
# Move to an initial pose offset.
|
||||
self.controller.send_target(
|
||||
T_B_G * Transform.translation([0, 0, -0.05]) * self.T_G_EE
|
||||
)
|
||||
self._send_cmd(T_B_G * Transform.translation([0, 0, -0.05]) * self.T_G_EE)
|
||||
rospy.sleep(3.0)
|
||||
|
||||
# Approach grasp pose.
|
||||
self.controller.send_target(T_B_G * self.T_G_EE)
|
||||
self._send_cmd(T_B_G * self.T_G_EE)
|
||||
rospy.sleep(2.0)
|
||||
|
||||
# Close the fingers.
|
||||
@@ -66,7 +71,7 @@ class GraspController:
|
||||
|
||||
# Lift the object.
|
||||
target = Transform.translation([0, 0, 0.2]) * T_B_G * self.T_G_EE
|
||||
self.controller.send_target(target)
|
||||
self._send_cmd(target)
|
||||
rospy.sleep(2.0)
|
||||
|
||||
# Check whether the object remains in the hand
|
||||
@@ -74,22 +79,41 @@ class GraspController:
|
||||
|
||||
return "succeeded" if success else "failed"
|
||||
|
||||
def postprocess(self, T_B_G):
|
||||
def _postprocess(self, T_B_G):
|
||||
# Ensure that the camera is pointing forward.
|
||||
rot = T_B_G.rotation
|
||||
if rot.as_matrix()[:, 0][0] < 0:
|
||||
T_B_G.rotation = rot * Rotation.from_euler("z", np.pi)
|
||||
return T_B_G
|
||||
|
||||
def collect_info(self, result):
|
||||
def _collect_info(self, result):
|
||||
points = [p.translation for p in self.policy.viewpoints]
|
||||
d = np.sum([np.linalg.norm(p2 - p1) for p1, p2 in zip(points, points[1:])])
|
||||
|
||||
info = {
|
||||
"result": result,
|
||||
"viewpoint_count": len(points),
|
||||
"distance_travelled": d,
|
||||
"distance": d,
|
||||
}
|
||||
info.update(self.policy.info)
|
||||
info.update(Timer.timers)
|
||||
return info
|
||||
|
||||
|
||||
class Timer:
|
||||
timers = dict()
|
||||
|
||||
def __init__(self, name):
|
||||
self.name = name
|
||||
|
||||
def __enter__(self):
|
||||
self.start()
|
||||
return self
|
||||
|
||||
def __exit__(self, *exc_info):
|
||||
self.stop()
|
||||
|
||||
def start(self):
|
||||
self.tic = time.perf_counter()
|
||||
|
||||
def stop(self):
|
||||
elapsed_time = time.perf_counter() - self.tic
|
||||
self.timers[self.name] = elapsed_time
|
||||
|
@@ -2,35 +2,37 @@ import cv_bridge
|
||||
import numpy as np
|
||||
from pathlib import Path
|
||||
import rospy
|
||||
from rospy import Publisher
|
||||
import sensor_msgs.msg
|
||||
from visualization_msgs.msg import Marker, MarkerArray
|
||||
from sensor_msgs.msg import CameraInfo, Image, PointCloud2
|
||||
|
||||
from robot_utils.perception import Image
|
||||
from robot_utils.ros import tf
|
||||
from robot_utils.ros.conversions import *
|
||||
from robot_utils.ros.rviz import *
|
||||
from robot_utils.spatial import Transform
|
||||
from .visualization import Visualizer
|
||||
from robot_helpers.ros import tf
|
||||
from robot_helpers.ros.conversions import *
|
||||
from vgn.detection import VGN, compute_grasps
|
||||
from vgn.perception import UniformTSDFVolume
|
||||
from vgn.utils import *
|
||||
|
||||
|
||||
class BasePolicy:
|
||||
class Policy:
|
||||
def activate(self, bbox):
|
||||
raise NotImplementedError
|
||||
|
||||
def update(self):
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class BasePolicy(Policy):
|
||||
def __init__(self):
|
||||
self.cv_bridge = cv_bridge.CvBridge()
|
||||
self.vgn = VGN(Path(rospy.get_param("vgn/model")))
|
||||
self.finger_depth = 0.05
|
||||
|
||||
self.load_parameters()
|
||||
self.lookup_transforms()
|
||||
self.connect_to_camera()
|
||||
self.connect_to_rviz()
|
||||
|
||||
self.rate = 5
|
||||
self.info = {}
|
||||
self._load_parameters()
|
||||
self._lookup_transforms()
|
||||
self._init_camera_stream()
|
||||
self._init_publishers()
|
||||
self._init_visualizer()
|
||||
|
||||
def load_parameters(self):
|
||||
def _load_parameters(self):
|
||||
self.task_frame = rospy.get_param("~frame_id")
|
||||
self.base_frame = rospy.get_param("~base_frame_id")
|
||||
self.ee_frame = rospy.get_param("~ee_frame_id")
|
||||
@@ -38,114 +40,64 @@ class BasePolicy:
|
||||
self.info_topic = rospy.get_param("~camera/info_topic")
|
||||
self.depth_topic = rospy.get_param("~camera/depth_topic")
|
||||
|
||||
def lookup_transforms(self):
|
||||
tf._init_listener()
|
||||
rospy.sleep(1.0) # wait to receive transforms
|
||||
def _lookup_transforms(self):
|
||||
self.T_B_task = tf.lookup(self.base_frame, self.task_frame)
|
||||
self.T_EE_cam = tf.lookup(self.ee_frame, self.cam_frame)
|
||||
|
||||
def connect_to_camera(self):
|
||||
msg = rospy.wait_for_message(
|
||||
self.info_topic, sensor_msgs.msg.CameraInfo, rospy.Duration(2.0)
|
||||
)
|
||||
def _init_camera_stream(self):
|
||||
msg = rospy.wait_for_message(self.info_topic, CameraInfo, rospy.Duration(2.0))
|
||||
self.intrinsic = from_camera_info_msg(msg)
|
||||
rospy.Subscriber(
|
||||
self.depth_topic, sensor_msgs.msg.Image, self.sensor_cb, queue_size=1
|
||||
)
|
||||
rospy.Subscriber(self.depth_topic, Image, self._sensor_cb, queue_size=1)
|
||||
|
||||
def sensor_cb(self, msg):
|
||||
self.img = Image(depth=self.cv_bridge.imgmsg_to_cv2(msg).astype(np.float32))
|
||||
self.extrinsic = tf.lookup(
|
||||
self.cam_frame,
|
||||
self.task_frame,
|
||||
msg.header.stamp,
|
||||
rospy.Duration(0.2),
|
||||
)
|
||||
def _sensor_cb(self, msg):
|
||||
self.img = self.cv_bridge.imgmsg_to_cv2(msg).astype(np.float32)
|
||||
self.extrinsic = tf.lookup(self.cam_frame, self.task_frame, msg.header.stamp)
|
||||
|
||||
def connect_to_rviz(self):
|
||||
self.bbox_pub = Publisher("bbox", Marker, queue_size=1, latch=True)
|
||||
self.cloud_pub = Publisher("cloud", PointCloud2, queue_size=1, latch=True)
|
||||
self.path_pub = Publisher("path", MarkerArray, queue_size=1, latch=True)
|
||||
self.grasps_pub = Publisher("grasps", MarkerArray, queue_size=1, latch=True)
|
||||
def _init_publishers(self):
|
||||
self.scene_cloud_pub = rospy.Publisher("scene_cloud", PointCloud2, queue_size=1)
|
||||
|
||||
def _init_visualizer(self):
|
||||
self.visualizer = Visualizer(self.task_frame)
|
||||
|
||||
def activate(self, bbox):
|
||||
self.clear_grasps()
|
||||
self.bbox = bbox
|
||||
self.draw_bbox()
|
||||
self.tsdf = UniformTSDFVolume(0.3, 40)
|
||||
self.viewpoints = []
|
||||
self.done = False
|
||||
self.best_grasp = None # grasp pose defined w.r.t. the robot's base frame
|
||||
self.visualizer.clear()
|
||||
self.visualizer.bbox(bbox)
|
||||
|
||||
def update(self):
|
||||
raise NotImplementedError
|
||||
|
||||
def integrate_latest_image(self):
|
||||
def _integrate_latest_image(self):
|
||||
self.viewpoints.append(self.extrinsic.inv())
|
||||
self.tsdf.integrate(
|
||||
self.img,
|
||||
self.intrinsic,
|
||||
self.extrinsic,
|
||||
)
|
||||
self._publish_scene_cloud()
|
||||
self.visualizer.path(self.viewpoints)
|
||||
|
||||
def predict_best_grasp(self):
|
||||
def _publish_scene_cloud(self):
|
||||
cloud = self.tsdf.get_scene_cloud()
|
||||
msg = to_cloud_msg(self.task_frame, np.asarray(cloud.points))
|
||||
self.scene_cloud_pub.publish(msg)
|
||||
|
||||
def _predict_best_grasp(self):
|
||||
tsdf_grid = self.tsdf.get_grid()
|
||||
out = self.vgn.predict(tsdf_grid)
|
||||
score_fn = lambda g: g.pose.translation[2]
|
||||
grasps = compute_grasps(self.tsdf.voxel_size, out, score_fn, max_filter_size=3)
|
||||
grasps = self.filter_grasps_on_target_object(grasps)
|
||||
self.draw_grasps(grasps)
|
||||
grasps = self._select_grasps_on_target_object(grasps)
|
||||
return self.T_B_task * grasps[0].pose if len(grasps) > 0 else None
|
||||
|
||||
def filter_grasps_on_target_object(self, grasps):
|
||||
return [
|
||||
g
|
||||
for g in grasps
|
||||
if self.bbox.is_inside(
|
||||
g.pose.rotation.apply([0, 0, 0.05]) + g.pose.translation
|
||||
)
|
||||
]
|
||||
|
||||
def clear_grasps(self):
|
||||
self.grasps_pub.publish(DELETE_MARKER_ARRAY_MSG)
|
||||
|
||||
def draw_bbox(self):
|
||||
pose = Transform.translation((self.bbox.min + self.bbox.max) / 2.0)
|
||||
scale = self.bbox.max - self.bbox.min
|
||||
color = np.r_[0.8, 0.2, 0.2, 0.6]
|
||||
msg = create_marker(Marker.CUBE, self.task_frame, pose, scale, color)
|
||||
self.bbox_pub.publish(msg)
|
||||
|
||||
def draw_scene_cloud(self):
|
||||
cloud = self.tsdf.get_scene_cloud()
|
||||
msg = to_cloud_msg(self.task_frame, np.asarray(cloud.points))
|
||||
self.cloud_pub.publish(msg)
|
||||
|
||||
def draw_grasps(self, grasps):
|
||||
msg = create_grasp_marker_array(self.task_frame, grasps, self.finger_depth)
|
||||
self.grasps_pub.publish(msg)
|
||||
|
||||
def draw_camera_path(self):
|
||||
identity = Transform.identity()
|
||||
color = np.r_[31, 119, 180] / 255.0
|
||||
|
||||
# Spheres for each viewpoint
|
||||
scale = 0.01 * np.ones(3)
|
||||
spheres = create_marker(
|
||||
Marker.SPHERE_LIST, self.task_frame, identity, scale, color
|
||||
)
|
||||
spheres.id = 0
|
||||
spheres.points = [to_point_msg(p.translation) for p in self.viewpoints]
|
||||
|
||||
# Line strip connecting viewpoints
|
||||
scale = [0.005, 0.0, 0.0]
|
||||
lines = create_marker(
|
||||
Marker.LINE_STRIP, self.task_frame, identity, scale, color
|
||||
)
|
||||
lines.id = 1
|
||||
lines.points = [to_point_msg(p.translation) for p in self.viewpoints]
|
||||
|
||||
self.path_pub.publish(MarkerArray([spheres, lines]))
|
||||
def _select_grasps_on_target_object(self, grasps):
|
||||
result = []
|
||||
for g in grasps:
|
||||
tip = g.pose.rotation.apply([0, 0, 0.05]) + g.pose.translation
|
||||
if self.bbox.is_inside(tip):
|
||||
result.append(g)
|
||||
return result
|
||||
|
||||
|
||||
registry = {}
|
||||
|
@@ -1,31 +1,40 @@
|
||||
from pathlib import Path
|
||||
import pybullet as p
|
||||
import pybullet_data
|
||||
import rospkg
|
||||
import time
|
||||
|
||||
from active_grasp.utils import *
|
||||
from robot_utils.bullet import *
|
||||
from robot_utils.spatial import Rotation, Transform
|
||||
from active_grasp.bbox import AABBox
|
||||
from robot_helpers.bullet import *
|
||||
from robot_helpers.model import Model
|
||||
from robot_helpers.spatial import Rotation, Transform
|
||||
|
||||
|
||||
class Simulation(BtSim):
|
||||
def __init__(self, gui, rng):
|
||||
super().__init__(gui=gui, sleep=False)
|
||||
self.rng = rng
|
||||
self.object_uids = []
|
||||
|
||||
class Simulation:
|
||||
def __init__(self, gui):
|
||||
self.configure_physics_engine(gui, 60, 4)
|
||||
self.configure_visualizer()
|
||||
self.find_object_urdfs()
|
||||
self.find_urdfs()
|
||||
self.load_table()
|
||||
self.load_robot()
|
||||
self.load_controller()
|
||||
self.object_uids = []
|
||||
|
||||
def configure_physics_engine(self, gui, rate, sub_step_count):
|
||||
self.rate = rate
|
||||
self.dt = 1.0 / self.rate
|
||||
p.connect(p.GUI if gui else p.DIRECT)
|
||||
p.setAdditionalSearchPath(pybullet_data.getDataPath())
|
||||
p.setPhysicsEngineParameter(fixedTimeStep=self.dt, numSubSteps=sub_step_count)
|
||||
p.setGravity(0.0, 0.0, -9.81)
|
||||
|
||||
def configure_visualizer(self):
|
||||
# p.configureDebugVisualizer(p.COV_ENABLE_GUI, 0)
|
||||
p.resetDebugVisualizerCamera(1.4, 50, -35, [0.0, 0.0, 0.6])
|
||||
|
||||
def find_object_urdfs(self):
|
||||
def find_urdfs(self):
|
||||
rospack = rospkg.RosPack()
|
||||
assets_path = Path(rospack.get_path("active_grasp")) / "assets"
|
||||
self.panda_urdf = assets_path / "urdfs/franka/panda_arm_hand.urdf"
|
||||
root = Path(rospack.get_path("vgn")) / "assets/urdfs/packed/test"
|
||||
self.urdfs = [str(f) for f in root.iterdir() if f.suffix == ".urdf"]
|
||||
|
||||
@@ -38,9 +47,9 @@ class Simulation(BtSim):
|
||||
|
||||
def load_robot(self):
|
||||
self.T_W_B = Transform(Rotation.identity(), np.r_[-0.6, 0.0, 0.4])
|
||||
self.arm = BtPandaArm(self.T_W_B)
|
||||
self.arm = BtPandaArm(self.panda_urdf, self.T_W_B)
|
||||
self.gripper = BtPandaGripper(self.arm)
|
||||
self.model = Model(self.arm.urdf_path, self.arm.base_frame, self.arm.ee_frame)
|
||||
self.model = Model(self.panda_urdf, self.arm.base_frame, self.arm.ee_frame)
|
||||
self.camera = BtCamera(320, 240, 1.047, 0.1, 1.0, self.arm.uid, 11)
|
||||
|
||||
def load_controller(self):
|
||||
@@ -48,6 +57,9 @@ class Simulation(BtSim):
|
||||
x0 = self.model.pose(self.arm.ee_frame, q)
|
||||
self.controller = CartesianPoseController(self.model, self.arm.ee_frame, x0)
|
||||
|
||||
def seed(self, seed):
|
||||
self.rng = np.random.default_rng(seed)
|
||||
|
||||
def reset(self):
|
||||
self.remove_all_objects()
|
||||
self.set_initial_arm_configuration()
|
||||
@@ -55,6 +67,9 @@ class Simulation(BtSim):
|
||||
uid = self.select_target()
|
||||
return self.get_target_bbox(uid)
|
||||
|
||||
def step(self):
|
||||
p.stepSimulation()
|
||||
|
||||
def set_initial_arm_configuration(self):
|
||||
q = [
|
||||
self.rng.uniform(-0.17, 0.17), # 0.0
|
||||
@@ -110,8 +125,8 @@ class Simulation(BtSim):
|
||||
self.remove_object(uid)
|
||||
|
||||
def select_target(self):
|
||||
img = self.camera.get_image()
|
||||
uids, counts = np.unique(img.mask, return_counts=True)
|
||||
_, _, mask = self.camera.get_image()
|
||||
uids, counts = np.unique(mask, return_counts=True)
|
||||
mask = np.isin(uids, self.object_uids) # remove ids of the floor, table, etc
|
||||
uids, counts = uids[mask], counts[mask]
|
||||
target_uid = uids[np.argmin(counts)]
|
||||
|
@@ -1,71 +0,0 @@
|
||||
from datetime import datetime
|
||||
from geometry_msgs.msg import PoseStamped
|
||||
import pandas as pd
|
||||
import rospy
|
||||
import time
|
||||
|
||||
import active_grasp.msg
|
||||
from robot_utils.ros.conversions import *
|
||||
|
||||
|
||||
class CartesianPoseControllerClient:
|
||||
def __init__(self, topic="/command"):
|
||||
self.target_pub = rospy.Publisher(topic, PoseStamped, queue_size=10)
|
||||
|
||||
def send_target(self, pose):
|
||||
msg = to_pose_stamped_msg(pose, "panda_link0")
|
||||
self.target_pub.publish(msg)
|
||||
|
||||
|
||||
class AABBox:
|
||||
def __init__(self, bbox_min, bbox_max):
|
||||
self.min = bbox_min
|
||||
self.max = bbox_max
|
||||
|
||||
def is_inside(self, p):
|
||||
return np.all(p > self.min) and np.all(p < self.max)
|
||||
|
||||
|
||||
def from_bbox_msg(msg):
|
||||
aabb_min = from_point_msg(msg.min)
|
||||
aabb_max = from_point_msg(msg.max)
|
||||
return AABBox(aabb_min, aabb_max)
|
||||
|
||||
|
||||
def to_bbox_msg(bbox):
|
||||
msg = active_grasp.msg.AABBox()
|
||||
msg.min = to_point_msg(bbox.min)
|
||||
msg.max = to_point_msg(bbox.max)
|
||||
return msg
|
||||
|
||||
|
||||
class Timer:
|
||||
timers = dict()
|
||||
|
||||
def __init__(self, name):
|
||||
self.name = name
|
||||
|
||||
def __enter__(self):
|
||||
self.start()
|
||||
return self
|
||||
|
||||
def __exit__(self, *exc_info):
|
||||
self.stop()
|
||||
|
||||
def start(self):
|
||||
self.tic = time.perf_counter()
|
||||
|
||||
def stop(self):
|
||||
elapsed_time = time.perf_counter() - self.tic
|
||||
self.timers[self.name] = elapsed_time
|
||||
|
||||
|
||||
class Logger:
|
||||
def __init__(self, logdir, policy, desc):
|
||||
stamp = datetime.now().strftime("%y%m%d-%H%M%S")
|
||||
name = "{}_policy={},{}".format(stamp, policy, desc).strip(",")
|
||||
self.path = logdir / (name + ".csv")
|
||||
|
||||
def log_run(self, info):
|
||||
df = pd.DataFrame.from_records([info])
|
||||
df.to_csv(self.path, mode="a", header=not self.path.exists(), index=False)
|
49
active_grasp/visualization.py
Normal file
49
active_grasp/visualization.py
Normal file
@@ -0,0 +1,49 @@
|
||||
import numpy as np
|
||||
import rospy
|
||||
|
||||
|
||||
from robot_helpers.ros.rviz import *
|
||||
from robot_helpers.spatial import Transform
|
||||
|
||||
|
||||
class Visualizer:
|
||||
def __init__(self, frame, topic="visualization_marker_array"):
|
||||
self.frame = frame
|
||||
self.marker_pub = rospy.Publisher(topic, MarkerArray, queue_size=1)
|
||||
|
||||
def clear(self):
|
||||
marker = Marker(action=Marker.DELETEALL)
|
||||
self.draw([marker])
|
||||
|
||||
def bbox(self, bbox):
|
||||
pose = Transform.translation((bbox.min + bbox.max) / 2.0)
|
||||
scale = bbox.max - bbox.min
|
||||
color = np.r_[0.8, 0.2, 0.2, 0.6]
|
||||
marker = create_cube_marker(self.frame, pose, scale, color, ns="bbox")
|
||||
self.draw([marker])
|
||||
|
||||
def path(self, poses):
|
||||
color = np.r_[31, 119, 180] / 255.0
|
||||
points = [p.translation for p in poses]
|
||||
spheres = create_sphere_list_marker(
|
||||
self.frame,
|
||||
Transform.identity(),
|
||||
np.full(3, 0.01),
|
||||
color,
|
||||
points,
|
||||
"path",
|
||||
0,
|
||||
)
|
||||
lines = create_line_strip_marker(
|
||||
self.frame,
|
||||
Transform.identity(),
|
||||
[0.005, 0.0, 0.0],
|
||||
color,
|
||||
points,
|
||||
"path",
|
||||
1,
|
||||
)
|
||||
self.draw([spheres, lines])
|
||||
|
||||
def draw(self, markers):
|
||||
self.marker_pub.publish(MarkerArray(markers=markers))
|
Reference in New Issue
Block a user