Move src files

This commit is contained in:
Michel Breyer
2021-10-08 10:34:38 +02:00
parent 0fb572ee72
commit c1710eb2da
10 changed files with 1 additions and 30 deletions

View File

@@ -0,0 +1,8 @@
from .policy import register
from .baselines import *
from .nbv import NextBestView
register("initial-view", InitialView)
register("top-view", TopView)
register("top-trajectory", TopTrajectory)
register("nbv-grasp", NextBestView)

View File

@@ -0,0 +1,29 @@
import numpy as np
from .policy import SingleViewPolicy, MultiViewPolicy, compute_error
class InitialView(SingleViewPolicy):
def update(self, img, x, q):
self.x_d = x
super().update(img, x, q)
class TopView(SingleViewPolicy):
def activate(self, bbox, view_sphere):
super().activate(bbox, view_sphere)
self.x_d = self.view_sphere.get_view(0.0, 0.0)
self.done = False if self.is_feasible(self.x_d) else True
class TopTrajectory(MultiViewPolicy):
def activate(self, bbox, view_sphere):
super().activate(bbox, view_sphere)
self.x_d = self.view_sphere.get_view(0.0, 0.0)
self.done = False if self.is_feasible(self.x_d) else True
def update(self, img, x, q):
self.integrate(img, x, q)
linear, _ = compute_error(self.x_d, x)
if np.linalg.norm(linear) < 0.02:
self.done = True

33
src/active_grasp/bbox.py Normal file
View File

@@ -0,0 +1,33 @@
import itertools
import numpy as np
import active_grasp.msg
from robot_helpers.ros.conversions import to_point_msg, from_point_msg
class AABBox:
def __init__(self, bbox_min, bbox_max):
self.min = np.asarray(bbox_min)
self.max = np.asarray(bbox_max)
self.center = 0.5 * (self.min + self.max)
self.size = self.max - self.min
@property
def corners(self):
return list(itertools.product(*np.vstack((self.min, self.max)).T))
def is_inside(self, p):
return np.all(p > self.min) and np.all(p < self.max)
def from_bbox_msg(msg):
aabb_min = from_point_msg(msg.min)
aabb_max = from_point_msg(msg.max)
return AABBox(aabb_min, aabb_max)
def to_bbox_msg(bbox):
msg = active_grasp.msg.AABBox()
msg.min = to_point_msg(bbox.min)
msg.max = to_point_msg(bbox.max)
return msg

View File

@@ -0,0 +1,183 @@
from controller_manager_msgs.srv import *
import copy
import cv_bridge
from geometry_msgs.msg import Twist
import numpy as np
import rospy
from sensor_msgs.msg import Image
from .bbox import from_bbox_msg
from .timer import Timer
from active_grasp.srv import Reset, ResetRequest
from robot_helpers.ros import tf
from robot_helpers.ros.conversions import *
from robot_helpers.ros.panda import PandaArmClient, PandaGripperClient
from robot_helpers.ros.moveit import MoveItClient
from robot_helpers.spatial import Rotation, Transform
from vgn.utils import look_at, cartesian_to_spherical, spherical_to_cartesian
class GraspController:
def __init__(self, policy):
self.policy = policy
self.load_parameters()
self.init_service_proxies()
self.init_robot_connection()
self.init_moveit()
self.init_camera_stream()
def load_parameters(self):
self.base_frame = rospy.get_param("~base_frame_id")
self.T_grasp_ee = Transform.from_list(rospy.get_param("~ee_grasp_offset")).inv()
self.cam_frame = rospy.get_param("~camera/frame_id")
self.depth_topic = rospy.get_param("~camera/depth_topic")
self.min_z_dist = rospy.get_param("~camera/min_z_dist")
self.control_rate = rospy.get_param("~control_rate")
self.linear_vel = rospy.get_param("~linear_vel")
self.policy_rate = rospy.get_param("~policy_rate")
def init_service_proxies(self):
self.reset_env = rospy.ServiceProxy("reset", Reset)
self.switch_controller = rospy.ServiceProxy(
"controller_manager/switch_controller", SwitchController
)
def init_robot_connection(self):
self.arm = PandaArmClient()
self.gripper = PandaGripperClient()
self.cartesian_vel_pub = rospy.Publisher("command", Twist, queue_size=10)
def init_moveit(self):
self.moveit = MoveItClient("panda_arm")
rospy.sleep(1.0) # Wait for connections to be established.
# msg = to_pose_stamped_msg(Transform.t([0.4, 0, 0.4]), self.base_frame)
# self.moveit.scene.add_box("table", msg, size=(0.5, 0.5, 0.02))
def switch_to_cartesian_velocity_control(self):
req = SwitchControllerRequest()
req.start_controllers = ["cartesian_velocity_controller"]
req.stop_controllers = ["position_joint_trajectory_controller"]
self.switch_controller(req)
def switch_to_joint_trajectory_control(self):
req = SwitchControllerRequest()
req.start_controllers = ["position_joint_trajectory_controller"]
req.stop_controllers = ["cartesian_velocity_controller"]
self.switch_controller(req)
def init_camera_stream(self):
self.cv_bridge = cv_bridge.CvBridge()
rospy.Subscriber(self.depth_topic, Image, self.sensor_cb, queue_size=1)
def sensor_cb(self, msg):
self.latest_depth_msg = msg
def run(self):
bbox = self.reset()
self.switch_to_cartesian_velocity_control()
with Timer("search_time"):
grasp = self.search_grasp(bbox)
self.switch_to_joint_trajectory_control()
with Timer("grasp_time"):
res = self.execute_grasp(grasp)
return self.collect_info(res)
def reset(self):
Timer.reset()
res = self.reset_env(ResetRequest())
rospy.sleep(1.0) # Wait for the TF tree to be updated.
return from_bbox_msg(res.bbox)
def search_grasp(self, bbox):
self.view_sphere = ViewHalfSphere(bbox, self.min_z_dist)
self.policy.activate(bbox, self.view_sphere)
timer = rospy.Timer(rospy.Duration(1.0 / self.control_rate), self.send_vel_cmd)
r = rospy.Rate(self.policy_rate)
while not self.policy.done:
img, pose, q = self.get_state()
self.policy.update(img, pose, q)
r.sleep()
rospy.sleep(0.1) # Wait for a zero command to be sent to the robot.
timer.shutdown()
return self.policy.best_grasp
def get_state(self):
q, _ = self.arm.get_state()
msg = copy.deepcopy(self.latest_depth_msg)
img = self.cv_bridge.imgmsg_to_cv2(msg).astype(np.float32)
pose = tf.lookup(self.base_frame, self.cam_frame, msg.header.stamp)
return img, pose, q
def send_vel_cmd(self, event):
if self.policy.x_d is None or self.policy.done:
cmd = np.zeros(6)
else:
x = tf.lookup(self.base_frame, self.cam_frame)
cmd = self.compute_velocity_cmd(self.policy.x_d, x)
self.cartesian_vel_pub.publish(to_twist_msg(cmd))
def compute_velocity_cmd(self, x_d, x):
# Velocity cmd towards the target
e_t = x_d.translation - x.translation
# Velocity cmd towards the surface of the sphere
r, theta, phi = cartesian_to_spherical(x.translation - self.view_sphere.center)
e_n = (self.view_sphere.center - x.translation) * (r - self.view_sphere.r) / r
# Final cmd is a linear combination of both components
linear = 1.0 * e_t + 6.0 * e_n
scale = np.linalg.norm(linear)
linear *= np.clip(scale, 0.0, self.linear_vel) / scale
angular = self.view_sphere.get_view(theta, phi).rotation * x.rotation.inv()
angular = 0.5 * angular.as_rotvec()
return np.r_[linear, angular]
def execute_grasp(self, grasp):
if not grasp:
return "aborted"
T_base_grasp = self.postprocess(grasp.pose)
self.gripper.move(0.08)
self.moveit.goto(T_base_grasp * Transform.t([0, 0, -0.05]) * self.T_grasp_ee)
self.moveit.goto(T_base_grasp * self.T_grasp_ee)
self.gripper.grasp()
self.moveit.goto(Transform.t([0, 0, 0.1]) * T_base_grasp * self.T_grasp_ee)
success = self.gripper.read() > 0.005
return "succeeded" if success else "failed"
def postprocess(self, T_base_grasp):
rot = T_base_grasp.rotation
if rot.as_matrix()[:, 0][0] < 0: # Ensure that the camera is pointing forward
T_base_grasp.rotation = rot * Rotation.from_euler("z", np.pi)
T_base_grasp *= Transform.t([0.0, 0.0, 0.01])
return T_base_grasp
def collect_info(self, result):
points = [p.translation for p in self.policy.views]
d = np.sum([np.linalg.norm(p2 - p1) for p1, p2 in zip(points, points[1:])])
info = {
"result": result,
"view_count": len(points),
"distance": d,
}
info.update(self.policy.info)
info.update(Timer.timers)
return info
class ViewHalfSphere:
def __init__(self, bbox, min_z_dist):
self.center = bbox.center
self.r = 0.5 * bbox.size[2] + min_z_dist
def get_view(self, theta, phi):
eye = self.center + spherical_to_cartesian(self.r, theta, phi)
up = np.r_[1.0, 0.0, 0.0]
return look_at(eye, self.center, up)
def sample_view(self):
raise NotImplementedError

164
src/active_grasp/nbv.py Normal file
View File

@@ -0,0 +1,164 @@
import itertools
from numba import jit
import numpy as np
import rospy
from .policy import MultiViewPolicy
from .timer import Timer
@jit(nopython=True)
def get_voxel_at(voxel_size, p):
index = (p / voxel_size).astype(np.int64)
return index if (index >= 0).all() and (index < 40).all() else None
# Note that the jit compilation takes some time the first time raycast is called
@jit(nopython=True)
def raycast(
voxel_size,
tsdf_grid,
ori,
pos,
fx,
fy,
cx,
cy,
u_min,
u_max,
v_min,
v_max,
t_min,
t_max,
t_step,
):
voxel_indices = []
for u in range(u_min, u_max):
for v in range(v_min, v_max):
direction = np.asarray([(u - cx) / fx, (v - cy) / fy, 1.0])
direction = ori @ (direction / np.linalg.norm(direction))
t, tsdf_prev = t_min, -1.0
while t < t_max:
p = pos + t * direction
t += t_step
index = get_voxel_at(voxel_size, p)
if index is not None:
i, j, k = index
tsdf = tsdf_grid[i, j, k]
if tsdf * tsdf_prev < 0 and tsdf_prev > -1: # crossed a surface
break
voxel_indices.append(index)
tsdf_prev = tsdf
return voxel_indices
class NextBestView(MultiViewPolicy):
def __init__(self):
super().__init__()
self.min_z_dist = rospy.get_param("~camera/min_z_dist")
self.max_views = 50
self.min_gain = 0.0
def activate(self, bbox, view_sphere):
super().activate(bbox, view_sphere)
with Timer("view_generation"):
self.generate_view_candidates()
self.info["view_candidates_count"] = len(self.view_candidates)
def generate_view_candidates(self):
thetas = np.deg2rad([15, 30, 45])
phis = np.arange(8) * np.deg2rad(45)
self.view_candidates = []
for theta, phi in itertools.product(thetas, phis):
view = self.view_sphere.get_view(theta, phi)
if self.is_feasible(view):
self.view_candidates.append(view)
def update(self, img, x, q):
if len(self.views) > self.max_views or self.best_grasp_prediction_is_stable():
self.done = True
else:
with Timer("state_update"):
self.integrate(img, x, q)
views = self.view_candidates
with Timer("ig_computation"):
gains = [self.ig_fn(v, 10) for v in views]
with Timer("cost_computation"):
costs = [self.cost_fn(v) for v in views]
utilities = gains / np.sum(gains) - costs / np.sum(costs)
self.vis.views(self.base_frame, self.intrinsic, views, utilities)
i = np.argmax(utilities)
nbv, gain = views[i], gains[i]
if gain < self.min_gain:
self.done = True
self.x_d = nbv
def best_grasp_prediction_is_stable(self):
if self.best_grasp:
t = (self.T_task_base * self.best_grasp.pose).translation
i, j, k = (t / self.tsdf.voxel_size).astype(int)
qs = self.qual_hist[:, i, j, k]
if (
np.count_nonzero(qs) == self.T
and np.mean(qs) > 0.9
and np.std(qs) < 0.05
):
return True
return False
def ig_fn(self, view, downsample):
tsdf_grid, voxel_size = self.tsdf.get_grid(), self.tsdf.voxel_size
tsdf_grid = -1.0 + 2.0 * tsdf_grid # Open3D maps tsdf to [0,1]
# Downsample the sensor resolution
fx = self.intrinsic.fx / downsample
fy = self.intrinsic.fy / downsample
cx = self.intrinsic.cx / downsample
cy = self.intrinsic.cy / downsample
# Project bbox onto the image plane to get better bounds
T_cam_base = view.inv()
corners = np.array([T_cam_base.apply(p) for p in self.bbox.corners]).T
u = (fx * corners[0] / corners[2] + cx).round().astype(int)
v = (fy * corners[1] / corners[2] + cy).round().astype(int)
u_min, u_max = u.min(), u.max()
v_min, v_max = v.min(), v.max()
t_min = self.min_z_dist
t_max = corners[2].max() # TODO This bound might be a bit too short
t_step = np.sqrt(3) * voxel_size # TODO replace with line rasterization
# Cast rays from the camera view (we'll work in the task frame from now on)
view = self.T_task_base * view
ori, pos = view.rotation.as_matrix(), view.translation
voxel_indices = raycast(
voxel_size,
tsdf_grid,
ori,
pos,
fx,
fy,
cx,
cy,
u_min,
u_max,
v_min,
v_max,
t_min,
t_max,
t_step,
)
# Count rear side voxels
i, j, k = np.unique(voxel_indices, axis=0).T
tsdfs = tsdf_grid[i, j, k]
ig = np.logical_and(tsdfs > -1.0, tsdfs < 0.0).sum()
return ig
def cost_fn(self, view):
return 1.0

177
src/active_grasp/policy.py Normal file
View File

@@ -0,0 +1,177 @@
import numpy as np
from sensor_msgs.msg import CameraInfo
from pathlib import Path
import rospy
from .timer import Timer
from .visualization import Visualizer
from robot_helpers.model import KDLModel
from robot_helpers.ros import tf
from robot_helpers.ros.conversions import *
from vgn.detection import *
from vgn.perception import UniformTSDFVolume
class Policy:
def __init__(self):
self.load_parameters()
self.init_robot_model()
self.init_visualizer()
def load_parameters(self):
self.base_frame = rospy.get_param("~base_frame_id")
self.T_grasp_ee = Transform.from_list(rospy.get_param("~ee_grasp_offset")).inv()
self.cam_frame = rospy.get_param("~camera/frame_id")
self.task_frame = "task"
info_topic = rospy.get_param("~camera/info_topic")
msg = rospy.wait_for_message(info_topic, CameraInfo, rospy.Duration(2.0))
self.intrinsic = from_camera_info_msg(msg)
self.qual_threshold = rospy.get_param("vgn/qual_threshold")
def init_robot_model(self):
self.model = KDLModel.from_parameter_server(self.base_frame, self.cam_frame)
self.ee_model = KDLModel.from_parameter_server(self.base_frame, "panda_link8")
def init_visualizer(self):
self.vis = Visualizer()
def is_feasible(self, view, q_init=None):
q_init = q_init if q_init else [0.0, -0.79, 0.0, -2.356, 0.0, 1.57, 0.79]
return self.model.ik(q_init, view) is not None
def activate(self, bbox, view_sphere):
self.vis.clear()
self.bbox = bbox
self.view_sphere = view_sphere
self.vis.bbox(self.base_frame, self.bbox)
self.calibrate_task_frame()
self.tsdf = UniformTSDFVolume(0.3, 40)
self.vgn = VGN(Path(rospy.get_param("vgn/model")))
self.views = []
self.best_grasp = None
self.x_d = None
self.done = False
self.info = {}
def calibrate_task_frame(self):
self.T_base_task = Transform.translation(self.bbox.center - np.full(3, 0.15))
self.T_task_base = self.T_base_task.inv()
tf.broadcast(self.T_base_task, self.base_frame, self.task_frame)
rospy.sleep(1.0) # Wait for tf tree to be updated
self.vis.workspace(self.task_frame, 0.3)
def update(self, img, x, q):
raise NotImplementedError
def sort_grasps(self, in_grasps, q):
# Transforms grasps into base frame, checks whether they lie on the target, and sorts by their score
grasps, scores = [], []
for grasp in in_grasps:
pose = self.T_base_task * grasp.pose
R, t = pose.rotation, pose.translation
tip = pose.rotation.apply([0, 0, 0.05]) + pose.translation
# Filter out artifacts close to the support
if t[2] < self.bbox.min[2] + 0.05 or tip[2] < self.bbox.min[2] + 0.02:
continue
if self.bbox.is_inside(tip):
grasp.pose = pose
q_grasp = self.ee_model.ik(q, pose * self.T_grasp_ee)
if q_grasp is not None:
grasps.append(grasp)
scores.append(self.score_fn(grasp, q, q_grasp))
grasps, scores = np.asarray(grasps), np.asarray(scores)
indices = np.argsort(-scores)
return grasps[indices], scores[indices]
def score_fn(self, grasp, q, q_grasp):
return -np.linalg.norm(q - q_grasp)
class SingleViewPolicy(Policy):
def update(self, img, x, q):
linear, _ = compute_error(self.x_d, x)
if np.linalg.norm(linear) < 0.02:
self.views.append(x)
self.tsdf.integrate(img, self.intrinsic, x.inv() * self.T_base_task)
tsdf_grid, voxel_size = self.tsdf.get_grid(), self.tsdf.voxel_size
self.vis.scene_cloud(self.task_frame, self.tsdf.get_scene_cloud())
self.vis.map_cloud(self.task_frame, self.tsdf.get_map_cloud())
out = self.vgn.predict(tsdf_grid)
self.vis.quality(self.task_frame, voxel_size, out.qual, 0.5)
grasps = select_grid(voxel_size, out, threshold=self.qual_threshold)
grasps, _ = self.sort_grasps(grasps, q)
if len(grasps) > 0:
self.best_grasp = grasps[0]
self.vis.grasps(self.base_frame, grasps)
self.vis.best_grasp(self.base_frame, self.best_grasp)
self.done = True
class MultiViewPolicy(Policy):
def activate(self, bbox, view_sphere):
super().activate(bbox, view_sphere)
self.T = 10 # Window size of grasp prediction history
self.qual_hist = np.zeros((self.T,) + (40,) * 3, np.float32)
def integrate(self, img, x, q):
self.views.append(x)
self.vis.path(self.base_frame, self.views)
with Timer("tsdf_integration"):
self.tsdf.integrate(img, self.intrinsic, x.inv() * self.T_base_task)
self.vis.scene_cloud(self.task_frame, self.tsdf.get_scene_cloud())
self.vis.map_cloud(self.task_frame, self.tsdf.get_map_cloud())
with Timer("grasp_prediction"):
tsdf_grid, voxel_size = self.tsdf.get_grid(), self.tsdf.voxel_size
out = self.vgn.predict(tsdf_grid)
self.vis.quality(self.task_frame, self.tsdf.voxel_size, out.qual, 0.9)
t = (len(self.views) - 1) % self.T
self.qual_hist[t, ...] = out.qual
with Timer("grasp_selection"):
grasps = select_grid(voxel_size, out, threshold=self.qual_threshold)
grasps, _ = self.sort_grasps(grasps, q)
self.vis.clear_grasps()
if len(grasps) > 0:
self.best_grasp = grasps[0]
self.vis.grasps(self.base_frame, grasps)
self.vis.best_grasp(self.base_frame, self.best_grasp)
else:
self.best_grasp = None
def compute_error(x_d, x):
linear = x_d.translation - x.translation
angular = (x_d.rotation * x.rotation.inv()).as_rotvec()
return linear, angular
registry = {}
def register(id, cls):
global registry
registry[id] = cls
def make(id, *args, **kwargs):
if id in registry:
return registry[id](*args, **kwargs)
else:
raise ValueError("{} policy does not exist.".format(id))

View File

@@ -0,0 +1,201 @@
from pathlib import Path
import pybullet as p
import pybullet_data
import yaml
import rospkg
from active_grasp.bbox import AABBox
from robot_helpers.bullet import *
from robot_helpers.model import KDLModel
from robot_helpers.spatial import Rotation
rospack = rospkg.RosPack()
active_grasp_urdfs_dir = Path(rospack.get_path("active_grasp")) / "assets" / "urdfs"
urdf_zoo_dir = Path(rospack.get_path("urdf_zoo")) / "models"
class Simulation:
"""Robot is placed s.t. world and base frames are the same"""
def __init__(self, gui, scene_id):
self.configure_physics_engine(gui, 60, 4)
self.configure_visualizer()
self.load_robot()
self.scene = get_scene(scene_id)
def configure_physics_engine(self, gui, rate, sub_step_count):
self.rate = rate
self.dt = 1.0 / self.rate
p.connect(p.GUI if gui else p.DIRECT)
p.setAdditionalSearchPath(pybullet_data.getDataPath())
p.setPhysicsEngineParameter(fixedTimeStep=self.dt, numSubSteps=sub_step_count)
p.setGravity(0.0, 0.0, -9.81)
def configure_visualizer(self):
# p.configureDebugVisualizer(p.COV_ENABLE_GUI, 0)
p.resetDebugVisualizerCamera(1.2, 30, -30, [0.4, 0.0, 0.2])
def load_robot(self):
panda_urdf_path = active_grasp_urdfs_dir / "franka/panda_arm_hand.urdf"
self.arm = BtPandaArm(panda_urdf_path)
self.gripper = BtPandaGripper(self.arm)
self.model = KDLModel.from_urdf_file(
panda_urdf_path, self.arm.base_frame, self.arm.ee_frame
)
self.camera = BtCamera(320, 240, 0.96, 0.01, 1.0, self.arm.uid, 11)
def seed(self, seed):
self.rng = np.random.default_rng(seed)
def reset(self):
self.set_arm_configuration([0.0, -0.79, 0.0, -2.356, 0.0, 1.57, 0.79])
self.scene.reset(rng=self.rng)
q = self.scene.sample_initial_configuration(self.rng)
self.set_arm_configuration(q)
uid = self.select_target()
bbox = self.get_target_bbox(uid)
return bbox
def set_arm_configuration(self, q):
for i, q_i in enumerate(q):
p.resetJointState(self.arm.uid, i, q_i, 0)
p.resetJointState(self.arm.uid, 9, 0.04, 0)
p.resetJointState(self.arm.uid, 10, 0.04, 0)
def select_target(self):
_, _, mask = self.camera.get_image()
uids, counts = np.unique(mask, return_counts=True)
mask = np.isin(uids, self.scene.object_uids) # remove ids of the floor, etc
uids, counts = uids[mask], counts[mask]
target_uid = uids[np.argmin(counts)]
p.changeVisualShape(target_uid, -1, rgbaColor=[1, 0, 0, 1])
return target_uid
def get_target_bbox(self, uid):
aabb_min, aabb_max = p.getAABB(uid)
return AABBox(aabb_min, aabb_max)
def step(self):
p.stepSimulation()
class Scene:
def __init__(self):
self.support_urdf = urdf_zoo_dir / "plane" / "model.urdf"
self.ycb_urdfs_dir = urdf_zoo_dir / "ycb"
self.support_uid = -1
self.object_uids = []
def load_support(self, pos):
self.support_uid = p.loadURDF(str(self.support_urdf), pos, globalScaling=0.3)
def remove_support(self):
p.removeBody(self.support_uid)
def load_object(self, urdf, ori, pos, scale=1.0):
uid = p.loadURDF(str(urdf), pos, ori.as_quat(), globalScaling=scale)
self.object_uids.append(uid)
return uid
def remove_object(self, uid):
p.removeBody(uid)
self.object_uids.remove(uid)
def remove_all_objects(self):
for uid in list(self.object_uids):
self.remove_object(uid)
def reset(self, rng):
self.remove_support()
self.remove_all_objects()
self.load(rng)
def load(self, rng):
raise NotImplementedError
def get_ycb_urdf_path(self, model_name):
return self.ycb_urdfs_dir / model_name / "model.urdf"
def find_urdfs(root):
# Scans a dir for URDF assets
return [str(f) for f in root.iterdir() if f.suffix == ".urdf"]
class RandomScene(Scene):
def __init__(self):
super().__init__()
self.center = np.r_[0.5, 0.0, 0.2]
self.length = 0.3
self.origin = self.center - np.r_[0.5 * self.length, 0.5 * self.length, 0.0]
self.object_urdfs = find_urdfs(active_grasp_urdfs_dir / "packed")
def load(self, rng, attempts=10):
self.load_support(self.center)
urdfs = rng.choice(self.object_urdfs, 4)
scale = 1.0
for urdf in urdfs:
uid = self.load_object(urdf, Rotation.identity(), np.zeros(3), scale)
lower, upper = p.getAABB(uid)
z_offset = 0.5 * (upper[2] - lower[2]) + 0.002
state_id = p.saveState()
for _ in range(attempts):
# Try to place and check for collisions
ori = Rotation.from_rotvec([0.0, 0.0, rng.uniform(0, 2 * np.pi)])
pos = np.r_[rng.uniform(0.2, 0.8, 2) * self.length, z_offset]
p.resetBasePositionAndOrientation(uid, self.origin + pos, ori.as_quat())
p.stepSimulation()
if not p.getContactPoints(uid):
break
else:
p.restoreState(stateId=state_id)
else:
# No placement found, remove the object
self.remove_object(uid)
def sample_initial_configuration(self, rng):
q = [0.0, -1.39, 0.0, -2.36, 0.0, 1.57, 0.79]
q += rng.uniform(-0.08, 0.08, 7)
return q
class CustomScene(Scene):
def __init__(self, config_name):
super().__init__()
self.config_path = (
Path(rospack.get_path("active_grasp")) / "cfg" / "scenes" / config_name
)
def load_config(self):
with self.config_path.open("r") as f:
self.scene = yaml.load(f)
self.center = np.asarray(self.scene["center"])
self.length = 0.3
self.origin = self.center - np.r_[0.5 * self.length, 0.5 * self.length, 0.0]
def load(self, rng):
self.load_config()
self.load_support(self.center)
for object in self.scene["objects"]:
self.load_object(
self.get_ycb_urdf_path(object["object_id"]),
Rotation.from_euler("xyz", object["rpy"], degrees=True),
self.center + np.asarray(object["xyz"]),
object.get("scale", 1),
)
for _ in range(60):
p.stepSimulation()
def sample_initial_configuration(self, rng):
return self.scene["q"]
def get_scene(scene_id):
if scene_id == "random":
return RandomScene()
elif scene_id.endswith(".yaml"):
return CustomScene(scene_id)
else:
raise ValueError("Unknown scene {}.".format(scene_id))

27
src/active_grasp/timer.py Normal file
View File

@@ -0,0 +1,27 @@
import time
class Timer:
timers = dict()
def __init__(self, name):
self.name = name
self.timers.setdefault(name, 0)
def __enter__(self):
self.start()
return self
def __exit__(self, *exc_info):
self.stop()
@classmethod
def reset(cls):
cls.timers = dict()
def start(self):
self.tic = time.perf_counter()
def stop(self):
elapsed_time = time.perf_counter() - self.tic
self.timers[self.name] += elapsed_time

View File

@@ -0,0 +1,244 @@
from geometry_msgs.msg import PoseStamped
import matplotlib.colors
import numpy as np
import rospy
from robot_helpers.ros.rviz import *
from robot_helpers.spatial import Transform
from vgn.utils import *
cmap = matplotlib.colors.LinearSegmentedColormap.from_list("RedGreen", ["r", "g"])
red = np.r_[1.0, 0.0, 0.0]
blue = np.r_[0, 0.6, 1.0]
class Visualizer:
def __init__(self, topic="visualization_marker_array"):
self.marker_pub = rospy.Publisher(topic, MarkerArray, queue_size=1)
self.scene_cloud_pub = rospy.Publisher(
"scene_cloud",
PointCloud2,
latch=True,
queue_size=1,
)
self.map_cloud_pub = rospy.Publisher(
"map_cloud",
PointCloud2,
latch=True,
queue_size=1,
)
self.pose_pub = rospy.Publisher("pose", PoseStamped, queue_size=1)
self.quality_pub = rospy.Publisher("quality", PointCloud2, queue_size=1)
self.grasp_marker_count = 0
def clear(self):
self.clear_markers()
msg = to_cloud_msg("panda_link0", np.array([]))
self.scene_cloud_pub.publish(msg)
self.map_cloud_pub.publish(msg)
self.quality_pub.publish(msg)
rospy.sleep(0.1)
def clear_markers(self):
self.draw([Marker(action=Marker.DELETEALL)])
def draw(self, markers):
self.marker_pub.publish(MarkerArray(markers=markers))
def bbox(self, frame, bbox):
pose = Transform.identity()
scale = [0.004, 0.0, 0.0]
color = red
corners = bbox.corners
edges = [
(0, 1),
(1, 3),
(3, 2),
(2, 0),
(4, 5),
(5, 7),
(7, 6),
(6, 4),
(0, 4),
(1, 5),
(3, 7),
(2, 6),
]
lines = [(corners[s], corners[e]) for s, e in edges]
marker = create_line_list_marker(frame, pose, scale, color, lines, ns="bbox")
self.draw([marker])
def best_grasp(self, frame, grasp, qmin=0.5, qmax=1.0):
color = cmap((grasp.quality - qmin) / (qmax - qmin))
self.draw(create_grasp_markers(frame, grasp, color, "best_grasp", radius=0.006))
def grasps(self, frame, grasps, qmin=0.5, qmax=1.0):
markers = []
for i, grasp in enumerate(grasps):
color = cmap((grasp.quality - qmin) / (qmax - qmin))
markers += create_grasp_markers(frame, grasp, color, "grasps", 4 * i)
self.grasp_marker_count = len(markers)
self.draw(markers)
def clear_grasps(self):
if self.grasp_marker_count > 0:
markers = [
Marker(action=Marker.DELETE, ns="grasps", id=i)
for i in range(self.grasp_marker_count)
]
markers += [
Marker(action=Marker.DELETE, ns="best_grasp", id=i) for i in range(4)
]
self.draw(markers)
self.grasp_marker_count = 0
def rays(self, frame, origin, directions, t_max=1.0):
lines = [[origin, origin + t_max * direction] for direction in directions]
marker = create_line_list_marker(
frame,
Transform.identity(),
[0.001, 0.0, 0.0],
[0.9, 0.9, 0.9],
lines,
"rays",
)
self.draw([marker])
def map_cloud(self, frame, cloud):
points = np.asarray(cloud.points)
distances = np.expand_dims(np.asarray(cloud.colors)[:, 0], 1)
msg = to_cloud_msg(frame, points, distances=distances)
self.map_cloud_pub.publish(msg)
def path(self, frame, poses):
color = blue
points = [p.translation for p in poses]
spheres = create_sphere_list_marker(
frame,
Transform.identity(),
np.full(3, 0.01),
color,
points,
"path",
0,
)
markers = [spheres]
if len(poses) > 1:
lines = create_line_strip_marker(
frame,
Transform.identity(),
[0.005, 0.0, 0.0],
color,
points,
"path",
1,
)
markers.append(lines)
self.draw(markers)
def point(self, frame, point):
marker = create_sphere_marker(
frame,
Transform.translation(point),
np.full(3, 0.01),
[0, 0, 1],
"point",
)
self.draw([marker])
def pose(self, frame, pose):
msg = to_pose_stamped_msg(pose, frame)
self.pose_pub.publish(msg)
def quality(self, frame, voxel_size, quality, threshold=0.9):
points, values = grid_to_map_cloud(voxel_size, quality, threshold)
msg = to_cloud_msg(frame, points, intensities=values)
self.quality_pub.publish(msg)
def scene_cloud(self, frame, cloud):
msg = to_cloud_msg(frame, np.asarray(cloud.points))
self.scene_cloud_pub.publish(msg)
def views(self, frame, intrinsic, views, values):
vmin, vmax = min(values), max(values)
scale = [0.002, 0.0, 0.0]
near, far = 0.0, 0.02
markers = []
for i, (view, value) in enumerate(zip(views, values)):
color = cmap((value - vmin) / (vmax - vmin))
marker = create_cam_view_marker(
frame,
view,
scale,
color,
intrinsic,
near,
far,
ns="views",
id=i,
)
markers.append(marker)
self.draw(markers)
def workspace(self, frame, size):
scale = size * 0.005
pose = Transform.identity()
scale = [scale, 0.0, 0.0]
color = [0.5, 0.5, 0.5]
lines = [
([0.0, 0.0, 0.0], [size, 0.0, 0.0]),
([size, 0.0, 0.0], [size, size, 0.0]),
([size, size, 0.0], [0.0, size, 0.0]),
([0.0, size, 0.0], [0.0, 0.0, 0.0]),
([0.0, 0.0, size], [size, 0.0, size]),
([size, 0.0, size], [size, size, size]),
([size, size, size], [0.0, size, size]),
([0.0, size, size], [0.0, 0.0, size]),
([0.0, 0.0, 0.0], [0.0, 0.0, size]),
([size, 0.0, 0.0], [size, 0.0, size]),
([size, size, 0.0], [size, size, size]),
([0.0, size, 0.0], [0.0, size, size]),
]
msg = create_line_list_marker(frame, pose, scale, color, lines, ns="workspace")
self.draw([msg])
def create_cam_view_marker(
frame, pose, scale, color, intrinsic, near, far, ns="", id=0
):
marker = create_marker(Marker.LINE_LIST, frame, pose, scale, color, ns, id)
x_n = near * intrinsic.width / (2.0 * intrinsic.fx)
y_n = near * intrinsic.height / (2.0 * intrinsic.fy)
z_n = near
x_f = far * intrinsic.width / (2.0 * intrinsic.fx)
y_f = far * intrinsic.height / (2.0 * intrinsic.fy)
z_f = far
points = [
[x_n, y_n, z_n],
[-x_n, y_n, z_n],
[-x_n, y_n, z_n],
[-x_n, -y_n, z_n],
[-x_n, -y_n, z_n],
[x_n, -y_n, z_n],
[x_n, -y_n, z_n],
[x_n, y_n, z_n],
[x_f, y_f, z_f],
[-x_f, y_f, z_f],
[-x_f, y_f, z_f],
[-x_f, -y_f, z_f],
[-x_f, -y_f, z_f],
[x_f, -y_f, z_f],
[x_f, -y_f, z_f],
[x_f, y_f, z_f],
[x_n, y_n, z_n],
[x_f, y_f, z_f],
[-x_n, y_n, z_n],
[-x_f, y_f, z_f],
[-x_n, -y_n, z_n],
[-x_f, -y_f, z_f],
[x_n, -y_n, z_n],
[x_f, -y_f, z_f],
]
marker.points = [to_point_msg(p) for p in points]
return marker