Add setup.py file

This commit is contained in:
Michel Breyer
2021-07-07 16:29:50 +02:00
parent f29d861fd4
commit 6658b8c7f0
11 changed files with 61 additions and 65 deletions

0
active_grasp/__init__.py Normal file
View File

View File

@@ -0,0 +1,75 @@
import numpy as np
import rospy
from active_grasp.srv import Reset, ResetRequest
from active_grasp.utils import *
from robot_utils.ros.panda import PandaGripperClient
from robot_utils.spatial import Rotation, Transform
class GraspController:
def __init__(self, policy):
self.policy = policy
self.controller = CartesianPoseControllerClient()
self.gripper = PandaGripperClient()
self.reset_env = rospy.ServiceProxy("reset", Reset)
self.load_parameters()
def load_parameters(self):
self.T_G_EE = Transform.from_list(rospy.get_param("~ee_grasp_offset")).inv()
def run(self):
bbox = self.reset()
grasp = self.explore(bbox)
if grasp:
self.execute_grasp(grasp)
def reset(self):
req = ResetRequest()
res = self.reset_env(req)
rospy.sleep(1.0) # wait for states to be updated
return from_bbox_msg(res.bbox)
def explore(self, bbox):
self.policy.activate(bbox)
r = rospy.Rate(self.policy.rate)
while True:
cmd = self.policy.update()
if self.policy.done:
break
self.controller.send_target(cmd)
r.sleep()
return self.policy.best_grasp
def execute_grasp(self, grasp):
T_B_G = self.postprocess(grasp)
self.gripper.move(0.08)
# Move to an initial pose offset.
self.controller.send_target(
T_B_G * Transform.translation([0, 0, -0.05]) * self.T_G_EE
)
rospy.sleep(3.0)
# Approach grasp pose.
self.controller.send_target(T_B_G * self.T_G_EE)
rospy.sleep(1.0)
# Close the fingers.
self.gripper.grasp()
# Lift the object.
target = Transform.translation([0, 0, 0.2]) * T_B_G * self.T_G_EE
self.controller.send_target(target)
rospy.sleep(2.0)
# Check whether the object remains in the hand
return self.gripper.read() > 0.005
def postprocess(self, T_B_G):
# Ensure that the camera is pointing forward.
rot = T_B_G.rotation
if rot.as_matrix()[:, 0][0] < 0:
T_B_G.rotation = rot * Rotation.from_euler("z", np.pi)
return T_B_G

172
active_grasp/policies.py Normal file
View File

@@ -0,0 +1,172 @@
import cv_bridge
import numpy as np
from pathlib import Path
import rospy
from rospy import Publisher
import sensor_msgs.msg
from visualization_msgs.msg import Marker, MarkerArray
from robot_utils.perception import Image
from robot_utils.ros import tf
from robot_utils.ros.conversions import *
from robot_utils.ros.rviz import *
from robot_utils.spatial import Transform
from vgn.detection import VGN, compute_grasps
from vgn.perception import UniformTSDFVolume
from vgn.utils import *
def get_policy(name):
if name == "single-view":
return SingleView()
else:
raise ValueError("{} policy does not exist.".format(name))
class Policy:
def activate(self, bbox):
raise NotImplementedError
def update(self):
raise NotImplementedError
class BasePolicy(Policy):
def __init__(self):
self.cv_bridge = cv_bridge.CvBridge()
self.vgn = VGN(Path(rospy.get_param("vgn/model")))
self.finger_depth = 0.05
self.load_parameters()
self.lookup_transforms()
self.connect_to_camera()
self.connect_to_rviz()
self.rate = 2
def load_parameters(self):
self.base_frame = rospy.get_param("~base_frame_id")
self.task_frame = rospy.get_param("~frame_id")
self.cam_frame = rospy.get_param("~camera/frame_id")
self.info_topic = rospy.get_param("~camera/info_topic")
self.depth_topic = rospy.get_param("~camera/depth_topic")
def lookup_transforms(self):
tf._init_listener()
rospy.sleep(1.0) # wait to receive transforms
self.T_B_task = tf.lookup(self.base_frame, self.task_frame)
def connect_to_camera(self):
msg = rospy.wait_for_message(
self.info_topic, sensor_msgs.msg.CameraInfo, rospy.Duration(2.0)
)
self.intrinsic = from_camera_info_msg(msg)
rospy.Subscriber(
self.depth_topic, sensor_msgs.msg.Image, self.sensor_cb, queue_size=1
)
def sensor_cb(self, msg):
self.img = Image(depth=self.cv_bridge.imgmsg_to_cv2(msg).astype(np.float32))
self.extrinsic = tf.lookup(
self.cam_frame,
self.task_frame,
msg.header.stamp,
rospy.Duration(0.2),
)
def connect_to_rviz(self):
self.bbox_pub = Publisher("bbox", Marker, queue_size=1, latch=True)
self.cloud_pub = Publisher("cloud", PointCloud2, queue_size=1, latch=True)
self.path_pub = Publisher("path", MarkerArray, queue_size=1, latch=True)
self.grasps_pub = Publisher("grasps", MarkerArray, queue_size=1, latch=True)
def activate(self, bbox):
self.clear_grasps()
self.bbox = bbox
self.draw_bbox()
self.tsdf = UniformTSDFVolume(0.3, 40)
self.viewpoints = []
self.done = False
self.best_grasp = None # grasp pose defined w.r.t. the robot's base frame
def update(self):
raise NotImplementedError
def integrate_latest_image(self):
self.viewpoints.append(self.extrinsic.inv())
self.tsdf.integrate(
self.img,
self.intrinsic,
self.extrinsic,
)
def predict_best_grasp(self):
tsdf_grid = self.tsdf.get_grid()
out = self.vgn.predict(tsdf_grid)
score_fn = lambda g: g.pose.translation[2]
grasps = compute_grasps(self.tsdf.voxel_size, out, score_fn, max_filter_size=3)
grasps = self.filter_grasps_on_target_object(grasps)
self.draw_grasps(grasps)
return self.T_B_task * grasps[0].pose if len(grasps) > 0 else None
def filter_grasps_on_target_object(self, grasps):
return [
g
for g in grasps
if self.bbox.is_inside(
g.pose.rotation.apply([0, 0, 0.05]) + g.pose.translation
)
]
def clear_grasps(self):
self.grasps_pub.publish(DELETE_MARKER_ARRAY_MSG)
def draw_bbox(self):
pose = Transform.translation((self.bbox.min + self.bbox.max) / 2.0)
scale = self.bbox.max - self.bbox.min
color = np.r_[0.8, 0.2, 0.2, 0.6]
msg = create_marker(Marker.CUBE, self.task_frame, pose, scale, color)
self.bbox_pub.publish(msg)
def draw_scene_cloud(self):
cloud = self.tsdf.get_scene_cloud()
msg = to_cloud_msg(self.task_frame, np.asarray(cloud.points))
self.cloud_pub.publish(msg)
def draw_grasps(self, grasps):
msg = create_grasp_marker_array(self.task_frame, grasps, self.finger_depth)
self.grasps_pub.publish(msg)
def draw_camera_path(self):
identity = Transform.identity()
color = np.r_[31, 119, 180] / 255.0
# Spheres for each viewpoint
scale = 0.01 * np.ones(3)
spheres = create_marker(
Marker.SPHERE_LIST, self.task_frame, identity, scale, color
)
spheres.id = 0
spheres.points = [to_point_msg(p.translation) for p in self.viewpoints]
# Line strip connecting viewpoints
scale = [0.005, 0.0, 0.0]
lines = create_marker(
Marker.LINE_STRIP, self.task_frame, identity, scale, color
)
lines.id = 1
lines.points = [to_point_msg(p.translation) for p in self.viewpoints]
self.path_pub.publish(MarkerArray([spheres, lines]))
class SingleView(BasePolicy):
"""
Process a single image from the initial viewpoint.
"""
def update(self):
self.integrate_latest_image()
self.draw_scene_cloud()
self.best_grasp = self.predict_best_grasp()
self.done = True

144
active_grasp/simulation.py Normal file
View File

@@ -0,0 +1,144 @@
from pathlib import Path
import pybullet as p
import rospkg
from active_grasp.utils import *
from robot_utils.bullet import *
from robot_utils.controllers import CartesianPoseController
from robot_utils.spatial import Rotation, Transform
class Simulation(BtSim):
def __init__(self, gui=True):
super().__init__(gui=gui, sleep=False)
# p.configureDebugVisualizer(p.COV_ENABLE_GUI, 0)
p.resetDebugVisualizerCamera(1.4, 50, -35, [0.0, 0.0, 0.6])
self.object_uids = []
self.find_object_urdfs()
self.load_table()
self.load_robot()
self.load_controller()
self.reset()
def find_object_urdfs(self):
rospack = rospkg.RosPack()
root = Path(rospack.get_path("vgn")) / "assets/urdfs/packed/test"
self.urdfs = [str(f) for f in root.iterdir() if f.suffix == ".urdf"]
def load_table(self):
p.loadURDF("plane.urdf")
ori = Rotation.from_rotvec(np.array([0, 0, np.pi / 2])).as_quat()
p.loadURDF("table/table.urdf", baseOrientation=ori, useFixedBase=True)
self.length = 0.3
self.origin = [-0.3, -0.5 * self.length, 0.5]
def load_robot(self):
self.T_W_B = Transform(Rotation.identity(), np.r_[-0.6, 0.0, 0.4])
self.arm = BtPandaArm(self.T_W_B)
self.gripper = BtPandaGripper(self.arm)
self.model = Model(self.arm.urdf_path, self.arm.base_frame, self.arm.ee_frame)
self.camera = BtCamera(320, 240, 1.047, 0.1, 1.0, self.arm.uid, 11)
def load_controller(self):
self.controller = CartesianPoseController(self.model, self.arm.ee_frame, None)
def reset(self):
self.remove_all_objects()
self.set_initial_arm_configuration()
self.load_random_object_arrangement()
uid = self.select_target()
return self.get_target_bbox(uid)
def set_initial_arm_configuration(self):
q = self.arm.configurations["ready"]
q[0] = np.deg2rad(np.random.uniform(-10, 10))
q[5] = np.deg2rad(np.random.uniform(90, 105))
for i, q_i in enumerate(q):
p.resetJointState(self.arm.uid, i, q_i, 0)
p.resetJointState(self.arm.uid, 9, 0.04, 0)
p.resetJointState(self.arm.uid, 10, 0.04, 0)
x0 = self.model.pose(self.arm.ee_frame, q)
self.controller.x_d = x0
def load_object(self, urdf, ori, pos, scale=1.0):
uid = p.loadURDF(str(urdf), pos, ori.as_quat(), globalScaling=scale)
self.object_uids.append(uid)
return uid
def remove_object(self, uid):
p.removeBody(uid)
self.object_uids.remove(uid)
def remove_all_objects(self):
for uid in list(self.object_uids):
self.remove_object(uid)
def load_random_object_arrangement(self, attempts=10):
origin = np.r_[self.origin[:2], 0.625]
scale = 0.8
urdfs = np.random.choice(self.urdfs, 4)
for urdf in urdfs:
# Load the object
uid = self.load_object(urdf, Rotation.identity(), [0.0, 0.0, 0.0], scale)
lower, upper = p.getAABB(uid)
z_offset = 0.5 * (upper[2] - lower[2]) + 0.002
state_id = p.saveState()
for _ in range(attempts):
# Try to place the object without collision
ori = Rotation.from_rotvec([0.0, 0.0, np.random.uniform(0, 2 * np.pi)])
offset = np.r_[np.random.uniform(0.2, 0.8, 2) * self.length, z_offset]
p.resetBasePositionAndOrientation(uid, origin + offset, ori.as_quat())
self.step()
if not p.getContactPoints(uid):
break
else:
p.restoreState(stateId=state_id)
else:
# No placement found, remove the object
self.remove_object(uid)
def select_target(self):
img = self.camera.get_image()
uids, counts = np.unique(img.mask, return_counts=True)
mask = np.isin(uids, self.object_uids) # remove ids of the floor, table, etc
uids, counts = uids[mask], counts[mask]
target_uid = uids[np.argmin(counts)]
p.changeVisualShape(target_uid, -1, rgbaColor=[1, 0, 0, 1])
return target_uid
def get_target_bbox(self, uid):
aabb_min, aabb_max = p.getAABB(uid)
aabb_min = np.array(aabb_min) - self.origin
aabb_max = np.array(aabb_max) - self.origin
return AABBox(aabb_min, aabb_max)
class CartesianPoseController:
def __init__(self, model, frame, x0):
self._model = model
self._frame = frame
self.kp = np.ones(6) * 4.0
self.max_linear_vel = 0.2
self.max_angular_vel = 1.57
self.x_d = x0
def update(self, q):
x = self._model.pose(self._frame, q)
error = np.zeros(6)
error[:3] = self.x_d.translation - x.translation
error[3:] = (self.x_d.rotation * x.rotation.inv()).as_rotvec()
dx = self._limit_rate(self.kp * error)
J_pinv = np.linalg.pinv(self._model.jacobian(self._frame, q))
cmd = np.dot(J_pinv, dx)
return cmd
def _limit_rate(self, dx):
linear, angular = dx[:3], dx[3:]
linear = np.clip(linear, -self.max_linear_vel, self.max_linear_vel)
angular = np.clip(angular, -self.max_angular_vel, self.max_angular_vel)
return np.r_[linear, angular]

36
active_grasp/utils.py Normal file
View File

@@ -0,0 +1,36 @@
from geometry_msgs.msg import PoseStamped
import rospy
import active_grasp.msg
from robot_utils.ros.conversions import *
class CartesianPoseControllerClient:
def __init__(self, topic="/command"):
self.target_pub = rospy.Publisher(topic, PoseStamped, queue_size=10)
def send_target(self, pose):
msg = to_pose_stamped_msg(pose, "panda_link0")
self.target_pub.publish(msg)
class AABBox:
def __init__(self, bbox_min, bbox_max):
self.min = bbox_min
self.max = bbox_max
def is_inside(self, p):
return np.all(p > self.min) and np.all(p < self.max)
def from_bbox_msg(msg):
aabb_min = from_point_msg(msg.min)
aabb_max = from_point_msg(msg.max)
return AABBox(aabb_min, aabb_max)
def to_bbox_msg(bbox):
msg = active_grasp.msg.AABBox()
msg.min = to_point_msg(bbox.min)
msg.max = to_point_msg(bbox.max)
return msg