Define task frame based on bbox

This commit is contained in:
Michel Breyer
2021-08-03 18:11:30 +02:00
parent 8a0dd9fd45
commit 5d17498084
10 changed files with 244 additions and 251 deletions

View File

@@ -12,9 +12,9 @@ class SingleView(BasePolicy):
Process a single image from the initial viewpoint.
"""
def update(self):
self._integrate_latest_image()
self.best_grasp = self._predict_best_grasp()
def update(self, img, extrinsic):
self.integrate_img(img, extrinsic)
self.best_grasp = self.predict_best_grasp()
self.done = True
@@ -25,21 +25,17 @@ class TopView(BasePolicy):
def activate(self, bbox):
super().activate(bbox)
center = (bbox.min + bbox.max) / 2.0
eye = np.r_[center[:2], center[2] + 0.3]
eye = np.r_[self.center[:2], self.center[2] + 0.3]
up = np.r_[1.0, 0.0, 0.0]
self.target = self.T_B_task * (self.T_EE_cam * look_at(eye, center, up)).inv()
def update(self):
current = tf.lookup(self.base_frame, self.ee_frame)
error = current.translation - self.target.translation
self.target = look_at(eye, self.center, up)
def update(self, img, extrinsic):
self.integrate_img(img, extrinsic)
error = extrinsic.translation - self.target.translation
if np.linalg.norm(error) < 0.01:
self.best_grasp = self._predict_best_grasp()
self.best_grasp = self.predict_best_grasp()
self.done = True
else:
self._integrate_latest_image()
return self.target
return self.target
class RandomView(BasePolicy):
@@ -47,31 +43,25 @@ class RandomView(BasePolicy):
Move the camera to a random viewpoint on a circle centered above the target.
"""
def __init__(self):
super().__init__()
self.r = 0.06
self.h = 0.3
def __init__(self, intrinsic):
super().__init__(intrinsic)
self.r = 0.06 # radius of the circle
self.h = 0.3 # distance above bbox center
def activate(self, bbox):
super().activate(bbox)
circle_center = (bbox.min + bbox.max) / 2.0
circle_center[2] += self.h
t = np.random.uniform(np.pi, 3.0 * np.pi)
eye = circle_center + np.r_[self.r * np.cos(t), self.r * np.sin(t), 0]
center = (self.bbox.min + self.bbox.max) / 2.0
eye = self.center + np.r_[self.r * np.cos(t), self.r * np.sin(t), self.h]
up = np.r_[1.0, 0.0, 0.0]
self.target = self.T_B_task * (self.T_EE_cam * look_at(eye, center, up)).inv()
def update(self):
current = tf.lookup(self.base_frame, self.ee_frame)
error = current.translation - self.target.translation
self.target = look_at(eye, self.center, up)
def update(self, img, extrinsic):
self.integrate_img(img, extrinsic)
error = extrinsic.translation - self.target.translation
if np.linalg.norm(error) < 0.01:
self.best_grasp = self._predict_best_grasp()
self.best_grasp = self.predict_best_grasp()
self.done = True
else:
self._integrate_latest_image()
return self.target
return self.target
class FixedTrajectory(BasePolicy):
@@ -79,9 +69,9 @@ class FixedTrajectory(BasePolicy):
Follow a pre-defined circular trajectory centered above the target object.
"""
def __init__(self):
super().__init__()
self.r = 0.06
def __init__(self, intrinsic):
super().__init__(intrinsic)
self.r = 0.08
self.h = 0.3
self.duration = 6.0
self.m = scipy.interpolate.interp1d([0, self.duration], [np.pi, 3.0 * np.pi])
@@ -89,21 +79,18 @@ class FixedTrajectory(BasePolicy):
def activate(self, bbox):
super().activate(bbox)
self.tic = rospy.Time.now()
self.circle_center = (bbox.min + bbox.max) / 2.0
self.circle_center[2] += self.h
def update(self):
def update(self, img, extrinsic):
self.integrate_img(img, extrinsic)
elapsed_time = (rospy.Time.now() - self.tic).to_sec()
if elapsed_time > self.duration:
self.best_grasp = self._predict_best_grasp()
self.best_grasp = self.predict_best_grasp()
self.done = True
else:
self._integrate_latest_image()
t = self.m(elapsed_time)
eye = self.circle_center + np.r_[self.r * np.cos(t), self.r * np.sin(t), 0]
center = (self.bbox.min + self.bbox.max) / 2.0
eye = self.center + np.r_[self.r * np.cos(t), self.r * np.sin(t), self.h]
up = np.r_[1.0, 0.0, 0.0]
target = self.T_B_task * (self.T_EE_cam * look_at(eye, center, up)).inv()
target = look_at(eye, self.center, up)
return target
@@ -114,24 +101,24 @@ class AlignmentView(BasePolicy):
def activate(self, bbox):
super().activate(bbox)
self._integrate_latest_image()
self.best_grasp = self._predict_best_grasp()
if self.best_grasp:
R, t = self.best_grasp.rotation, self.best_grasp.translation
center = t
self.target = None
def update(self, img, extrinsic):
self.integrate_img(img, extrinsic)
if not self.target:
grasp = self.predict_best_grasp()
if not grasp:
self.done = True
return
R, t = grasp.pose.rotation, grasp.pose.translation
eye = R.apply([0.0, 0.0, -0.16]) + t
center = t
up = np.r_[1.0, 0.0, 0.0]
self.target = (self.T_EE_cam * look_at(eye, center, up)).inv()
else:
self.done = True
def update(self):
current = tf.lookup(self.base_frame, self.ee_frame)
error = current.translation - self.target.translation
self.target = look_at(eye, center, up)
error = extrinsic.translation - self.target.translation
if np.linalg.norm(error) < 0.01:
self.best_grasp = self._predict_best_grasp()
self.best_grasp = self.predict_best_grasp()
self.done = True
else:
self._integrate_latest_image()
return self.target
return self.target

View File

@@ -1,77 +1,113 @@
import copy
import cv_bridge
from geometry_msgs.msg import PoseStamped
import numpy as np
import rospy
from sensor_msgs.msg import CameraInfo, Image
import time
from active_grasp.bbox import from_bbox_msg
from active_grasp.policy import make
from active_grasp.srv import Reset, ResetRequest
from robot_helpers.ros.conversions import to_pose_stamped_msg
from robot_helpers.ros import tf
from robot_helpers.ros.conversions import *
from robot_helpers.ros.panda import PandaGripperClient
from robot_helpers.spatial import Rotation, Transform
class GraspController:
def __init__(self, policy):
self.policy = policy
self._reset_env = rospy.ServiceProxy("reset", Reset)
self._load_parameters()
self._init_robot_control()
def __init__(self, policy_id):
self.reset_env = rospy.ServiceProxy("reset", Reset)
self.load_parameters()
self.lookup_transforms()
self.init_robot_connection()
self.init_camera_stream()
self.make_policy(policy_id)
def _load_parameters(self):
self.T_G_EE = Transform.from_list(rospy.get_param("~ee_grasp_offset")).inv()
def load_parameters(self):
self.base_frame = rospy.get_param("~base_frame_id")
self.ee_frame = rospy.get_param("~ee_frame_id")
self.cam_frame = rospy.get_param("~camera/frame_id")
self.info_topic = rospy.get_param("~camera/info_topic")
self.depth_topic = rospy.get_param("~camera/depth_topic")
self.T_grasp_ee = Transform.from_list(rospy.get_param("~ee_grasp_offset")).inv()
def _init_robot_control(self):
def lookup_transforms(self):
self.T_ee_cam = tf.lookup(self.ee_frame, self.cam_frame)
def init_robot_connection(self):
self.target_pose_pub = rospy.Publisher("command", PoseStamped, queue_size=10)
self.gripper = PandaGripperClient()
def _send_cmd(self, pose):
msg = to_pose_stamped_msg(pose, "panda_link0")
def send_cmd(self, pose):
msg = to_pose_stamped_msg(pose, self.base_frame)
self.target_pose_pub.publish(msg)
def run(self):
bbox = self._reset()
with Timer("search_time"):
grasp = self._search_grasp(bbox)
res = self._execute_grasp(grasp)
return self._collect_info(res)
def init_camera_stream(self):
msg = rospy.wait_for_message(self.info_topic, CameraInfo, rospy.Duration(2.0))
self.intrinsic = from_camera_info_msg(msg)
self.cv_bridge = cv_bridge.CvBridge()
rospy.Subscriber(self.depth_topic, Image, self.sensor_cb, queue_size=1)
def _reset(self):
res = self._reset_env(ResetRequest())
def sensor_cb(self, msg):
self.latest_depth_msg = msg
def make_policy(self, name):
self.policy = make(name, self.intrinsic)
def run(self):
bbox = self.reset()
with Timer("search_time"):
grasp = self.search_grasp(bbox)
res = self.execute_grasp(grasp)
return self.collect_info(res)
def reset(self):
res = self.reset_env(ResetRequest())
rospy.sleep(1.0) # wait for states to be updated
return from_bbox_msg(res.bbox)
def _search_grasp(self, bbox):
def search_grasp(self, bbox):
self.policy.activate(bbox)
r = rospy.Rate(self.policy.rate)
while True:
cmd = self.policy.update()
img, extrinsic = self.get_state()
next_extrinsic = self.policy.update(img, extrinsic)
if self.policy.done:
break
self._send_cmd(cmd)
self.send_cmd((self.T_ee_cam * next_extrinsic).inv())
r.sleep()
return self.policy.best_grasp
def _execute_grasp(self, grasp):
def get_state(self):
msg = copy.deepcopy(self.latest_depth_msg)
img = self.cv_bridge.imgmsg_to_cv2(msg).astype(np.float32)
extrinsic = tf.lookup(self.cam_frame, self.base_frame, msg.header.stamp)
return img, extrinsic
def execute_grasp(self, grasp):
if not grasp:
return "aborted"
T_B_G = self._postprocess(grasp)
T_base_grasp = self.postprocess(grasp.pose)
self.gripper.move(0.08)
# Move to an initial pose offset.
self._send_cmd(T_B_G * Transform.translation([0, 0, -0.05]) * self.T_G_EE)
self.send_cmd(
T_base_grasp * Transform.translation([0, 0, -0.05]) * self.T_grasp_ee
)
rospy.sleep(3.0)
# Approach grasp pose.
self._send_cmd(T_B_G * self.T_G_EE)
self.send_cmd(T_base_grasp * self.T_grasp_ee)
rospy.sleep(2.0)
# Close the fingers.
self.gripper.grasp()
# Lift the object.
target = Transform.translation([0, 0, 0.2]) * T_B_G * self.T_G_EE
self._send_cmd(target)
target = Transform.translation([0, 0, 0.2]) * T_base_grasp * self.T_grasp_ee
self.send_cmd(target)
rospy.sleep(2.0)
# Check whether the object remains in the hand
@@ -79,14 +115,14 @@ class GraspController:
return "succeeded" if success else "failed"
def _postprocess(self, T_B_G):
def postprocess(self, T_base_grasp):
# Ensure that the camera is pointing forward.
rot = T_B_G.rotation
rot = T_base_grasp.rotation
if rot.as_matrix()[:, 0][0] < 0:
T_B_G.rotation = rot * Rotation.from_euler("z", np.pi)
return T_B_G
T_base_grasp.rotation = rot * Rotation.from_euler("z", np.pi)
return T_base_grasp
def _collect_info(self, result):
def collect_info(self, result):
points = [p.translation for p in self.policy.viewpoints]
d = np.sum([np.linalg.norm(p2 - p1) for p1, p2 in zip(points, points[1:])])
info = {

View File

@@ -1,8 +1,6 @@
import cv_bridge
import numpy as np
from pathlib import Path
import rospy
from sensor_msgs.msg import CameraInfo, Image, PointCloud2
from .visualization import Visualizer
from robot_helpers.ros import tf
@@ -16,87 +14,68 @@ class Policy:
def activate(self, bbox):
raise NotImplementedError
def update(self):
def update(self, img, extrinsic):
raise NotImplementedError
class BasePolicy(Policy):
def __init__(self):
self.cv_bridge = cv_bridge.CvBridge()
self.vgn = VGN(Path(rospy.get_param("vgn/model")))
self.finger_depth = 0.05
def __init__(self, intrinsic):
self.intrinsic = intrinsic
self.rate = 5
self._load_parameters()
self._lookup_transforms()
self._init_camera_stream()
self._init_publishers()
self._init_visualizer()
self.load_parameters()
self.init_visualizer()
def _load_parameters(self):
self.task_frame = rospy.get_param("~frame_id")
self.base_frame = rospy.get_param("~base_frame_id")
self.ee_frame = rospy.get_param("~ee_frame_id")
self.cam_frame = rospy.get_param("~camera/frame_id")
self.info_topic = rospy.get_param("~camera/info_topic")
self.depth_topic = rospy.get_param("~camera/depth_topic")
def load_parameters(self):
self.base_frame = rospy.get_param("active_grasp/base_frame_id")
self.task_frame = "task"
self.vgn = VGN(Path(rospy.get_param("vgn/model")))
def _lookup_transforms(self):
self.T_B_task = tf.lookup(self.base_frame, self.task_frame)
self.T_EE_cam = tf.lookup(self.ee_frame, self.cam_frame)
def _init_camera_stream(self):
msg = rospy.wait_for_message(self.info_topic, CameraInfo, rospy.Duration(2.0))
self.intrinsic = from_camera_info_msg(msg)
rospy.Subscriber(self.depth_topic, Image, self._sensor_cb, queue_size=1)
def _sensor_cb(self, msg):
self.img = self.cv_bridge.imgmsg_to_cv2(msg).astype(np.float32)
self.extrinsic = tf.lookup(self.cam_frame, self.task_frame, msg.header.stamp)
def _init_publishers(self):
self.scene_cloud_pub = rospy.Publisher("scene_cloud", PointCloud2, queue_size=1)
def _init_visualizer(self):
self.visualizer = Visualizer(self.task_frame)
def init_visualizer(self):
self.visualizer = Visualizer(self.base_frame)
def activate(self, bbox):
self.bbox = bbox
# Define the VGN task frame s.t. the bounding box is in its center
self.center = 0.5 * (bbox.min + bbox.max)
self.T_base_task = Transform.translation(self.center - np.full(3, 0.15))
tf.broadcast(self.T_base_task, self.base_frame, self.task_frame)
rospy.sleep(0.1) # wait for the transform to be published
self.tsdf = UniformTSDFVolume(0.3, 40)
self.viewpoints = []
self.done = False
self.best_grasp = None # grasp pose defined w.r.t. the robot's base frame
self.best_grasp = None
self.visualizer.clear()
self.visualizer.bbox(bbox)
def _integrate_latest_image(self):
self.viewpoints.append(self.extrinsic.inv())
self.tsdf.integrate(
self.img,
self.intrinsic,
self.extrinsic,
)
self._publish_scene_cloud()
def integrate_img(self, img, extrinsic):
self.viewpoints.append(extrinsic.inv())
self.tsdf.integrate(img, self.intrinsic, extrinsic * self.T_base_task)
self.visualizer.scene_cloud(self.task_frame, self.tsdf.get_scene_cloud())
self.visualizer.path(self.viewpoints)
def _publish_scene_cloud(self):
cloud = self.tsdf.get_scene_cloud()
msg = to_cloud_msg(self.task_frame, np.asarray(cloud.points))
self.scene_cloud_pub.publish(msg)
def _predict_best_grasp(self):
def predict_best_grasp(self):
tsdf_grid = self.tsdf.get_grid()
out = self.vgn.predict(tsdf_grid)
score_fn = lambda g: g.pose.translation[2]
grasps = compute_grasps(self.tsdf.voxel_size, out, score_fn, max_filter_size=3)
grasps = self._select_grasps_on_target_object(grasps)
return self.T_B_task * grasps[0].pose if len(grasps) > 0 else None
grasps = self.transform_grasps_to_base_frame(grasps)
grasps = self.select_grasps_on_target_object(grasps)
return grasps[0] if len(grasps) > 0 else None
def _select_grasps_on_target_object(self, grasps):
def transform_grasps_to_base_frame(self, grasps):
for grasp in grasps:
grasp.pose = self.T_base_task * grasp.pose
return grasps
def select_grasps_on_target_object(self, grasps):
result = []
for g in grasps:
tip = g.pose.rotation.apply([0, 0, 0.05]) + g.pose.translation
for grasp in grasps:
tip = grasp.pose.rotation.apply([0, 0, 0.05]) + grasp.pose.translation
if self.bbox.is_inside(tip):
result.append(g)
result.append(grasp)
return result
@@ -108,8 +87,8 @@ def register(id, cls):
registry[id] = cls
def make(id):
def make(id, *args, **kwargs):
if id in registry:
return registry[id]()
return registry[id](*args, **kwargs)
else:
raise ValueError("{} policy does not exist.".format(id))

View File

@@ -46,8 +46,8 @@ class Simulation:
self.origin = [-0.3, -0.5 * self.length, 0.5]
def load_robot(self):
self.T_W_B = Transform(Rotation.identity(), np.r_[-0.6, 0.0, 0.4])
self.arm = BtPandaArm(self.panda_urdf, self.T_W_B)
self.T_world_base = Transform.translation(np.r_[-0.6, 0.0, 0.4])
self.arm = BtPandaArm(self.panda_urdf, self.T_world_base)
self.gripper = BtPandaGripper(self.arm)
self.model = Model(self.panda_urdf, self.arm.base_frame, self.arm.ee_frame)
self.camera = BtCamera(320, 240, 1.047, 0.1, 1.0, self.arm.uid, 11)
@@ -135,33 +135,34 @@ class Simulation:
def get_target_bbox(self, uid):
aabb_min, aabb_max = p.getAABB(uid)
aabb_min = np.array(aabb_min) - self.origin
aabb_max = np.array(aabb_max) - self.origin
# Transform the coordinates to base_frame
aabb_min = np.array(aabb_min) - self.T_world_base.translation
aabb_max = np.array(aabb_max) - self.T_world_base.translation
return AABBox(aabb_min, aabb_max)
class CartesianPoseController:
def __init__(self, model, frame, x0):
self._model = model
self._frame = frame
self.model = model
self.frame = frame
self.kp = np.ones(6) * 4.0
self.max_linear_vel = 0.2
self.max_linear_vel = 0.1
self.max_angular_vel = 1.57
self.x_d = x0
def update(self, q):
x = self._model.pose(self._frame, q)
x = self.model.pose(self.frame, q)
error = np.zeros(6)
error[:3] = self.x_d.translation - x.translation
error[3:] = (self.x_d.rotation * x.rotation.inv()).as_rotvec()
dx = self._limit_rate(self.kp * error)
J_pinv = np.linalg.pinv(self._model.jacobian(self._frame, q))
dx = self.limit_rate(self.kp * error)
J_pinv = np.linalg.pinv(self.model.jacobian(self.frame, q))
cmd = np.dot(J_pinv, dx)
return cmd
def _limit_rate(self, dx):
def limit_rate(self, dx):
linear, angular = dx[:3], dx[3:]
linear = np.clip(linear, -self.max_linear_vel, self.max_linear_vel)
angular = np.clip(angular, -self.max_angular_vel, self.max_angular_vel)

View File

@@ -1,15 +1,16 @@
import numpy as np
import rospy
from robot_helpers.ros.rviz import *
from robot_helpers.spatial import Transform
from vgn.utils import *
class Visualizer:
def __init__(self, frame, topic="visualization_marker_array"):
self.frame = frame
self.marker_pub = rospy.Publisher(topic, MarkerArray, queue_size=1)
self.scene_cloud_pub = rospy.Publisher("scene_cloud", PointCloud2, queue_size=1)
def clear(self):
marker = Marker(action=Marker.DELETEALL)
@@ -22,6 +23,10 @@ class Visualizer:
marker = create_cube_marker(self.frame, pose, scale, color, ns="bbox")
self.draw([marker])
def scene_cloud(self, frame, cloud):
msg = to_cloud_msg(frame, np.asarray(cloud.points))
self.scene_cloud_pub.publish(msg)
def path(self, poses):
color = np.r_[31, 119, 180] / 255.0
points = [p.translation for p in poses]