4from visp.core
import ColVector, Color, PixelMeterConversion, Display, Matrix
5from visp.core
import CameraParameters, HomogeneousMatrix , PoseVector, ImagePoint
6from visp.core
import ImageRGBa, ImageUInt16
7from visp.core
import UnscentedKalman, UKSigmaDrawerMerwe
9from visp.visual_features
import BasicFeature, FeaturePoint
10from visp.vs
import Servo
11from visp.gui
import ColorBlindFriendlyPalette
13from visp.robot
import RobotAfma6, Robot
15from typing
import List, Optional, Tuple
17 from ultralytics
import YOLO
19 print(
'This example requires YoloV8: pip install ultralytics')
23import matplotlib.pyplot
as plt
24from matplotlib
import animation
26import pyrealsense2
as rs
27from functools
import partial
28plt.rcParams[
'text.usetex'] =
False
32def fx(x: ColVector, dt: float) -> ColVector:
34 @brief Process function that projects in time the internal state of the UKF.
36 @param x The internal state of the UKF.
37 @param dt The sampling time: how far in the future are we projecting x.
39 @return ColVector The updated internal state, projected in time, also known as the prior.
42 x[0] + dt * x[1] + dt ** 2 * x[2],
45 x[3] + dt * x[4] + dt ** 2 * x[5],
51def hx(x: ColVector) -> ColVector:
53 @brief Measurement function that expresses the internal state of the UKF in the measurement space.
55 @param x The internal state of the UKF.
57 @return ColVector The internal state, expressed in the measurement space.
66 @brief Method that permits to add two state vectors.
68 @param a The first state vector to which another state vector must be added.
69 @param b The other state vector that must be added to a.
71 @return ColVector The sum a + b.
77 @brief Method that permits to subtract a state vector to another.
79 @param a The first state vector to which another state vector must be subtracted.
80 @param b The other state vector that must be subtracted to a.
82 @return ColVector The subtraction a - b.
88 @brief Method that generates the process covariance matrix for a process for which the
89 state vector can be written as (x, dx/dt)^T
91 @param dt The sampling period.
93 @return Matrix The corresponding process covariance matrix.
95 return Matrix(np.asarray([
96 [dt**4/4, dt**3/3, dt**2/2, 0, 0, 0],
97 [dt**3/3, dt**2/2, dt, 0, 0, 0],
98 [dt**2/2, dt, 1, 0, 0, 0],
99 [0, 0, 0, dt**4/4, dt**3/3, dt**2/2],
100 [0, 0, 0, dt**3/3, dt**2/2, dt],
101 [0, 0, 0, dt**2/2, dt, 1],
106def read_data(I_rgba: ImageRGBa, I_depth: ImageUInt16, pipe: rs.pipeline, align: rs.align) ->
None:
107 frames = pipe.wait_for_frames()
108 aligned_frames = align.process(frames)
109 I_np = np.asanyarray(aligned_frames.get_color_frame().as_frame().get_data())
110 I_np = np.concatenate((I_np, np.ones_like(I_np[..., 0:1], dtype=np.uint8) * 255), axis=-1)
111 rgba_numpy_view = I_rgba.numpy()
112 np.copyto(rgba_numpy_view, I_np)
113 depth_numpy_view = I_depth.numpy()
114 depth_np = np.asanyarray(aligned_frames.get_depth_frame().as_frame().get_data())
115 np.copyto(depth_numpy_view, depth_np)
118 intr = profile.as_video_stream_profile().get_intrinsics()
119 return CameraParameters(intr.fx, intr.fy, intr.ppx, intr.ppy), intr.height, intr.width
128 def on_iter(self, Idisp: ImageRGBa, v: ColVector, error: ColVector, cTw: HomogeneousMatrix) ->
None:
129 self.
I.append(Idisp.numpy().copy())
130 self.
v.append(v.numpy()[3:5].flatten())
131 self.
error.append(error.numpy().flatten())
132 self.
r.append(PoseVector(cTw).numpy()[3:5].flatten())
136 self.
v = np.asarray(self.
v)[1:]
137 self.
r = np.asarray(self.
r)[1:]
140 fig, axs = plt.subplots(2, 2, figsize=(15, 15 * (self.
I[0].shape[0] / self.
I[0].shape[1])))
141 axs = [axs[0][0], axs[0][1], axs[1][0],axs[1][1]]
142 titles = [
'I',
'Feature error',
'Velocity',
'Pose']
146 [
r"$\mathbf{\upsilon}_x$",
r"$\mathbf{\upsilon}_y$"],
147 [
r"$\theta\mathbf{u}_x$",
r"$\theta\mathbf{u}_y$"],
149 data = [
None, self.
error, self.
v, self.
r]
151 for i
in range(len(axs)):
152 axs[i].set_title(titles[i])
153 if data[i]
is not None:
154 axs[i].set_xlabel(
'Iteration')
156 axs[i].set_xlim(0, len(self.
v))
157 min_val, max_val = np.min(data[i]), np.max(data[i])
158 margin = (max_val - min_val) * 0.05
159 axs[i].set_ylim(min_val - margin, max_val + margin)
160 artists.append(axs[i].plot(data[i]))
161 axs[i].legend(legends[i])
163 artists.append(axs[i].imshow(self.
I[0]))
164 axs[i].set_axis_off()
167 print(f
'Processing frame {i+1}/{len(self.I)}')
169 artists[0].set_data(self.
I[i])
171 artists[1][j].set_data(xs, self.
error[:i, j])
172 artists[2][j].set_data(xs, self.
v[:i, j])
173 artists[3][j].set_data(xs, self.
r[:i, j])
176 anim = animation.FuncAnimation(fig, animate, frames=len(self.
v), blit=
False, repeat=
False)
177 writervideo = animation.FFMpegWriter(fps=30)
178 anim.save(
'exp.mp4', writer=writervideo)
179 plt.savefig(
'exp.pdf')
182if __name__ ==
'__main__':
183 parser = argparse.ArgumentParser(
'Centering task using a YOLO network')
184 parser.add_argument(
'--class-id', type=int, help=
'COCO class id of the object to track (e.g, 2 for a car)')
185 args = parser.parse_args()
187 detection_model = YOLO(
'yolov8n.pt')
192 print(
'Initializing Realsense camera...')
197 config.enable_stream(rs.stream.depth, 640, 480, rs.format.z16, fps)
198 config.enable_stream(rs.stream.color, 640, 480, rs.format.rgb8, fps)
199 cfg = pipe.start(config)
203 depth_scale = cfg.get_device().first_depth_sensor().get_depth_scale()
204 print(f
'Depth scale is {depth_scale}')
207 I_depth = ImageUInt16(h, w)
208 Idisp = ImageRGBa(h, w)
211 align = rs.align(rs.stream.color)
212 get_images = partial(read_data, pipe=pipe, align=align)
214 print(
'Initializing Afma6...')
216 robot.setPositioningVelocity(5.0)
217 print(robot.getPosition(Robot.ControlFrameType.REFERENCE_FRAME))
219 print(
'Moving Afma6 to starting pose...')
220 r = PoseVector(0.06706274856, 0.3844766362, -0.04551332622 , 0.3111005431, 0.3031078532, 0.01708581392)
221 cTw = HomogeneousMatrix(r)
223 robot.setPosition(Robot.ControlFrameType.REFERENCE_FRAME, r)
226 print(
'Warming up camera...')
232 drawer = UKSigmaDrawerMerwe(6, alpha=0.0001, beta=2, kappa=-3, resFunc=residual_state_vectors, addFunc=add_state_vectors)
234 noise_x, noise_y = [pixel_noise / f
for f
in [cam.get_px(), cam.get_py()]]
237 measure_covariance = Matrix([
241 process_covariance = Matrix([
242 [noise_x ** 2, 0, 0, 0, 0, 0],
243 [0, noise_vel, 0, 0, 0, 0],
244 [0, 0, noise_accel, 0, 0, 0],
245 [0, 0, 0, noise_y ** 2, 0,0],
246 [0, 0, 0, 0, noise_vel,0],
247 [0, 0, 0, 0, 0, noise_accel],
249 kalman = UnscentedKalman(
generate_Q_matrix(1 / fps, sigma=1e-8), measure_covariance, drawer, fx, hx)
253 xd, yd = PixelMeterConversion.convertPoint(cam, w / 2.0, h / 2.0)
255 Zd = I_depth[h // 2, w // 2] * depth_scale
256 print(f
'Desired depth is {Zd}')
258 sd.buildFrom(xd, yd, Zd)
261 s.buildFrom(0.0, 0.0, Zd)
264 task.addFeature(s, sd)
266 task.setCameraDoF(ColVector([0, 0, 0, 1, 1, 0]))
267 task.setServo(Servo.ServoType.EYEINHAND_CAMERA)
268 task.setInteractionMatrixType(Servo.ServoIteractionMatrixType.CURRENT)
269 target_class = args.class_id
271 v = ColVector(6, 0.0)
279 last_detection_time = -1.0
282 while error_norm > 5e-7:
288 return box.cls
is not None and len(box.cls) > 0
and box.cls[0]
292 boxes = results.boxes
296 for i
in range(len(boxes.conf)):
297 if boxes.cls[i] == target_class
and boxes.conf[i] > max_conf:
299 max_conf = boxes.conf[i]
300 bb = boxes.xywh[i].cpu().numpy()
304 x, y = PixelMeterConversion.convertPoint(cam, u, v)
306 initial_state = ColVector([x, 0, 0, y, 0, 0])
307 kalman.init(initial_state, process_covariance)
309 kalman.filter(ColVector([x, y]), (1 / fps))
310 kalman_state = kalman.getXest()
311 last_detection_time = time.time()
312 s.buildFrom(kalman_state[0], kalman_state[3], Zd)
313 v = task.computeControlLaw()
315 if last_detection_time < 0.0:
316 raise RuntimeError(
'No detection at first iteration')
317 kalman.predict(time.time() - last_detection_time)
318 kalman_pred = kalman.getXpred()
319 s.buildFrom(kalman_pred[0], kalman_pred[3], Zd)
320 task.computeControlLaw()
322 error: ColVector = task.getError()
323 error_norm = error.sumSquare()
327 current_color = ColorBlindFriendlyPalette(ColorBlindFriendlyPalette.SkyBlue).to_vpColor()
329 Display.displayRectangle(I, i=
int(bb[1] - bb[3] / 2), j=
int(bb[0] - bb[2] / 2), width=bb[2], height=bb[3],
330 color=current_color, fill=
False, thickness=2)
331 sd.display(cam, I, ColorBlindFriendlyPalette(ColorBlindFriendlyPalette.Yellow).to_vpColor(), thickness=3)
332 s.display(cam, I, current_color, thickness=3)
334 Display.getImage(I, Idisp)
335 robot.getPosition(Robot.ControlFrameType.REFERENCE_FRAME, r)
337 plotter.on_iter(Idisp, v, error, cTw)
340 robot.setRobotState(Robot.RobotStateType.STATE_VELOCITY_CONTROL)
341 robot.setVelocity(Robot.ControlFrameType.CAMERA_FRAME, v)
342 print(f
'Iter took {time.time() - start}s')
347 plotter.generate_anim()
None on_iter(self, ImageRGBa Idisp, ColVector v, ColVector error, HomogeneousMatrix cTw)
ColVector add_state_vectors(a, b)
ColVector residual_state_vectors(a, b)
ColVector hx(ColVector x)
None read_data(ImageRGBa I_rgba, ImageUInt16 I_depth, rs.pipeline pipe, rs.align align)
Matrix generate_Q_matrix(float dt, float sigma)
Tuple[CameraParameters, int, int] cam_from_rs_profile(profile)
ColVector fx(ColVector x, float dt)