-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathp2_utils.py
393 lines (296 loc) · 13.2 KB
/
p2_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
import math
import numpy as np
from scipy import io
import scipy
import cv2
import matplotlib.pyplot as plt
#################################### MAP #####################################
def initializeMap(res, xmin, ymin, xmax, ymax, memory = None, trust = None, optimism = None, occupied_thresh = None, free_thresh = None, confidence_limit = None):
if memory == None:
memory = 1 # set to value between 0 and 1 if memory is imperfect
if trust == None:
trust = 0.8
if optimism == None:
optimism = 0.5
if occupied_thresh == None:
occupied_thresh = 0.85
if free_thresh == None:
free_thresh = 0.2 # 0.5 # 0.25
if confidence_limit == None:
confidence_limit = 100 * memory
MAP = {}
MAP['res'] = res #meters; used to detrmine the number of square cells
MAP['xmin'] = xmin #meters
MAP['ymin'] = ymin
MAP['xmax'] = xmax
MAP['ymax'] = ymax
MAP['sizex'] = int(np.ceil((MAP['xmax'] - MAP['xmin']) / MAP['res'] + 1)) # number of horizontal cells
MAP['sizey'] = int(np.ceil((MAP['ymax'] - MAP['ymin']) / MAP['res'] + 1)) # number of vertical cells
MAP['map'] = np.zeros((MAP['sizex'], MAP['sizey']), dtype=np.float64) # contains log odds. DATA TYPE: char or int8
# Related to log-odds
MAP['memory'] = memory
MAP['occupied'] = np.log(trust / (1 - trust))
MAP['free'] = optimism * np.log((1 - trust) / trust) # Try to be optimistic about exploration, so weight free space
MAP['confidence_limit'] = confidence_limit
# Related to occupancy grid
MAP['occupied_thresh'] = np.log(occupied_thresh / (1 - occupied_thresh))
MAP['free_thresh'] = np.log(free_thresh / (1 - free_thresh))
(h, w) = MAP['map'].shape
MAP['plot'] = np.zeros((h, w, 3), np.uint8)
return MAP
def updateMap(MAP, x_w, y_w, x_curr, y_curr):
# convert lidar hits to map coordinates
x_m, y_m = world2map(MAP, x_w, y_w)
# convert robot's position to map coordinates
x_curr_m, y_curr_m = world2map(MAP, x_curr, y_curr)
indGood = np.logical_and(np.logical_and(np.logical_and((x_m > 1), (y_m > 1)), (x_m < MAP['sizex'])),
(y_m < MAP['sizey']))
MAP['map'] = MAP['map'] * MAP['memory']
MAP['map'][x_m[0][indGood[0]], y_m[0][indGood[0]]] += MAP['occupied'] - MAP['free'] # we're going to add the MAP['free'] back in a second
# initialize a mask where we will label the free cells
free_grid = np.zeros(MAP['map'].shape).astype(np.int8)
x_m = np.append(x_m, x_curr_m) # Must consider robot's current cell
y_m = np.append(y_m, y_curr_m)
contours = np.vstack((x_m, y_m)) # SWITCH
# find the cells that are free, and update the map
cv2.drawContours(free_grid, [contours.T], -1, MAP['free'], -1)
MAP['map'] += free_grid
# prevent overconfidence
MAP['map'][MAP['map'] > MAP['confidence_limit']] = MAP['confidence_limit']
MAP['map'][MAP['map'] < -MAP['confidence_limit']] = -MAP['confidence_limit']
# update plot
occupied_grid = MAP['map'] > MAP['occupied_thresh']
free_grid = MAP['map'] < MAP['free_thresh']
MAP['plot'][occupied_grid] = [0, 0, 0]
MAP['plot'][free_grid] = [255, 255, 255]
MAP['plot'][np.logical_and(np.logical_not(free_grid), np.logical_not(occupied_grid))] = [127, 127, 127]
x_m, y_m = world2map(MAP, x_w, y_w)
MAP['plot'][y_m, x_m] = [0, 255, 0] # plot latest lidar scan hits
def lidar2map(MAP, x_l, y_l):
#x_w, y_w = lidar2world()
x_m, y_m = world2map(MAP, x_l, y_l)
# build a single map in the lidar's frame of reference
indGood = np.logical_and(np.logical_and(np.logical_and((x_m > 1), (y_m > 1)), (x_m < MAP['sizex'])),
(y_m < MAP['sizey']))
map = np.zeros(MAP['map'].shape)
map[x_m[0][indGood[0]], y_m[0][indGood[0]]] = 1
np.int8(map)
return map
def world2map(MAP, x_w, y_w):
# convert from meters to cells
x_m = np.ceil((x_w - MAP['xmin']) / MAP['res']).astype(np.int16) - 1
y_m = np.ceil((y_w - MAP['ymin']) / MAP['res']).astype(np.int16) - 1
indGood = np.logical_and(np.logical_and(np.logical_and((x_m > 1), (y_m > 1)), (x_m < MAP['sizex'])),
(y_m < MAP['sizey']))
x_m = x_m[indGood]
y_m = y_m[indGood]
return x_m.astype(np.int), y_m.astype(np.int)
################################## PARTICLES #################################
# A particle is just a tuple of weights (scalar) and pose (3x1)
def initializeParticles(num = None, n_thresh = None, noise_cov = None):
if num == None:
num = 100
if n_thresh == None:
n_thresh = 0.1 * num # set threshold to 20% of original number of particles to resample
if noise_cov == None:
noise_cov = np.zeros((3,3)) # for debugging purposes
noise_cov = 0.5 * np.eye(3) # set noise covariances for multivariate Gaussian. This is 10% of the delta_pose movement (check predictParticles)
noise_cov = np.array([[.1, 0, 0], [0, .1, 0], [0, 0, 0.005]])
PARTICLES = {}
PARTICLES['num'] = num
PARTICLES['n_thresh'] = n_thresh # below this value, resample
PARTICLES['noise_cov'] = noise_cov # covariances for Gaussian noise in each dimension
PARTICLES['weights'] = np.ones(PARTICLES['num']) / PARTICLES['num']
PARTICLES['poses'] = np.zeros((PARTICLES['num'], 3))
return PARTICLES
def predictParticles(PARTICLES, d_x, d_y, d_yaw, x_prev, y_prev, yaw_prev):
noise_cov = np.matmul(PARTICLES['noise_cov'], np.abs(np.array([[d_x, 0, 0], [0, d_y, 0], [0, 0, d_yaw]])))
# create hypothesis (particles) poses
noise = np.random.multivariate_normal([0, 0, 0], noise_cov, PARTICLES['num'])
PARTICLES['poses'] = noise + np.array([[x_prev, y_prev, yaw_prev]])
# update poses according to deltas
PARTICLES['poses'] += np.array([[d_x, d_y, d_yaw]])
return
def updateParticles(PARTICLES, MAP, x_l, y_l, psi, theta):
n_eff = 1 / np.sum(np.square(PARTICLES['weights']))
if (n_eff < PARTICLES['n_thresh']):
print("resampling!")
resampleParticles(PARTICLES)
correlations = np.zeros(PARTICLES['num'])
_, plot = cv2.threshold(MAP['plot'], 127, 255, cv2.THRESH_BINARY)
for i in range(PARTICLES['num']):
x_w, y_w, _ = lidar2world(psi, theta, x_l, y_l, PARTICLES['poses'][i][0], PARTICLES['poses'][i][1], PARTICLES['poses'][i][2])
x_m, y_m = world2map(MAP, x_w, y_w)
particle_plot = np.zeros(MAP['plot'].shape)
particle_plot[y_m, x_m] = [0, 1, 0]
correlations[i] = np.sum(np.logical_and(plot, particle_plot)) # switched x and y
weights = scipy.special.softmax(correlations - np.max(correlations)) # np.multiply(PARTICLES['weights'], scipy.special.softmax(correlations)) # multiply or add or replace?
if (np.count_nonzero(correlations) == 0):
print("ALL ZERO CORRELATIONS")
PARTICLES['weights'] = weights
return
def resampleParticles(PARTICLES):
# implemented low-variance resampling according to: https://robotics.stackexchange.com/questions/7705/low-variance-resampling-algorithm-for-particle-filter
M = PARTICLES['num']
new_poses = np.zeros(PARTICLES['poses'].shape)
r = np.random.uniform(0, 1 / M)
w = PARTICLES['weights'][0]
i = 0
j = 0
for m in range(M):
U = r + m / M
while (U > w):
i += 1
w += PARTICLES['weights'][i]
new_poses[j, :] = PARTICLES['poses'][i, :]
j += 1
PARTICLES['poses'] = new_poses
PARTICLES['weights'] = np.ones(PARTICLES['num']) / PARTICLES['num']
return
################################# TRANSFORMS #################################
b = 0.93 # distance from world to body in meters
h = 0.33 # distance from body to head
l = 0.15 # distance from head to lidar
k = 0.07 # distance from head to kinect
def pol2cart(rho, phi):
x = rho * np.cos(phi)
y = rho * np.sin(phi)
x = x[:, np.newaxis]
y = y[:, np.newaxis]
return x.T, y.T
def cart2pol(x, y):
rho = np.sqrt(x**2 + y**2)
phi = np.arctan2(y, x)
return (rho, phi)
# Checks if a matrix is a valid rotation matrix.
def isRotationMatrix(R):
Rt = np.transpose(R)
shouldBeIdentity = np.dot(Rt, R)
I = np.identity(3, dtype = R.dtype)
n = np.linalg.norm(I - shouldBeIdentity)
return n < 1e-6
# Calculates rotation matrix to euler angles.
def rotationMatrixToEulerAngles(R):
assert(isRotationMatrix(R))
sy = math.sqrt(R[0,0] * R[0,0] + R[1,0] * R[1,0])
singular = sy < 1e-6
if not singular:
x = math.atan2(R[2,1] , R[2,2])
y = math.atan2(-R[2,0], sy)
z = math.atan2(R[1,0], R[0,0])
else :
x = math.atan2(-R[1,2], R[1,1])
y = math.atan2(-R[2,0], sy)
z = 0
return np.array([x, y, z])
# Calculates Rotation Matrix given euler angles
def eulerAnglesToRotationMatrix(angles):
R_x = rot_x(angles[0])
R_y = rot_y(angles[1])
R_z = rot_z(angles[2])
R = np.matmul(R_z, np.matmul(R_y, R_x))
return R
# example: (roll)
def rot_x(phi):
R_x = np.array([[1, 0, 0],
[0, math.cos(phi), -math.sin(phi)],
[0, math.sin(phi), math.cos(phi)]
])
return R_x
# example: head angle (pitch)
def rot_y(theta):
R_y = np.array([[ math.cos(theta), 0, math.sin(theta)],
[ 0, 1, 0],
[-math.sin(theta), 0, math.cos(theta)]
])
return R_y
# example: neck angle (yaw)
def rot_z(psi):
R_z = np.array([[math.cos(psi), -math.sin(psi), 0],
[math.sin(psi), math.cos(psi), 0],
[ 0, 0, 1]
])
return R_z
def lidarToHeadTransform():
h_T_l = np.array([[1, 0, 0, 0],
[0, 1, 0, 0],
[0, 0, 1, l],
[0, 0, 0, 1]
])
return h_T_l
def kinectToHeadTransform():
h_T_k = np.array([[1, 0, 0, 0],
[0, 1, 0, 0],
[0, 0, 1, k],
[0, 0, 0, 1]
])
return h_T_k
# psi is left/right counter-clockwise NECK angle in z-axis (yaw)
# theta is up/down counter-clockwise HEAD angle in y-axis (pitch)
def headToBodyTransform(psi, theta):
R = np.matmul(rot_z(psi), rot_y(theta))
b_T_h = np.vstack((R, np.zeros((1,3))))
b_T_h = np.hstack((b_T_h, np.array(([0], [0], [h], [1]))))
return b_T_h
# psi is left/right counter-clockwise NECK angle in z-axis (yaw)
# theta is up/down counter-clockwise HEAD angle in y-axis (pitch)
def lidarToBodyTransform(psi, theta):
b_T_h = headToBodyTransform(psi, theta)
h_T_l = lidarToHeadTransform()
b_T_l = np.matmul(b_T_h, h_T_l)
return b_T_l
# psi is left/right counter-clockwise NECK angle in z-axis (yaw)
# theta is up/down counter-clockwise HEAD angle in y-axis (pitch)
def kinectToBodyTransform(psi, theta):
b_T_h = headToBodyTransform(psi, theta)
h_T_k = kinectToHeadTransform()
b_T_k = np.matmul(b_T_h, h_T_k)
return b_T_k
# x, y, yaw obtained from odometry
def bodyToWorldTransform(x, y, yaw):
R = rot_z(yaw)
w_T_b = np.vstack((R, np.zeros((1,3))))
w_T_b = np.hstack((w_T_b, np.array(([x], [y], [b], [1]))))
return w_T_b
# x, y, yaw obtained from odometry
# psi, theta obtained from head
def lidarToWorldTransform(psi, theta, x, y, yaw):
w_T_b = bodyToWorldTransform(x, y, yaw)
b_T_l = lidarToBodyTransform(psi, theta)
w_T_l = np.matmul(w_T_b, b_T_l)
return w_T_l
# x, y, yaw obtained from odometry
# psi, theta obtained from head
def kinectToWorldTransform(psi, theta, x, y, yaw):
w_T_b = bodyToWorldTransform(x, y, yaw)
b_T_k = kinectToBodyTransform(psi, theta)
w_T_k = np.matmul(w_T_b, b_T_k)
return w_T_k
def lidar2head(x_l, y_l):
coordinates_l = np.vstack((np.vstack((x_l, y_l)), np.zeros((1, x_l.shape[1])), np.ones((1, x_l.shape[1]))))
coordinates_h = np.matmul(lidarToHeadTransform(), coordinates_l)
x_h = coordinates_h[0, :]
y_h = coordinates_h[1, :]
return (x_h, y_h)
def lidar2body(psi, theta, x_l, y_l):
coordinates_l = np.vstack((np.vstack((x_l, y_l)), np.zeros((1, x_l.shape[1])), np.ones((1, x_l.shape[1]))))
coordinates_b = np.matmul(lidarToBodyTransform(psi, theta), coordinates_l)
x_b = coordinates_b[0, :]
y_b = coordinates_b[1, :]
return (x_b, y_b)
# *_curr variables come from cumulative delta_pose
def lidar2world(psi, theta, x_l, y_l, x_curr, y_curr, yaw_curr):
coordinates_l = np.vstack((np.vstack((x_l, y_l)), np.zeros((1, x_l.shape[1])), np.ones((1, x_l.shape[1]))))
coordinates_w = np.matmul(lidarToWorldTransform(psi, theta, x_curr, y_curr, yaw_curr), coordinates_l)
x_w = coordinates_w[0, :]
y_w = coordinates_w[1, :]
z_w = coordinates_w[2, :]
x_w = x_w[:, np.newaxis]
y_w = y_w[:, np.newaxis]
z_w = z_w[:, np.newaxis]
# remove scans that are too close to the ground
indValid = (z_w > 0.1)
x_w = x_w[indValid]
y_w = y_w[indValid]
z_w = z_w[indValid]
return (x_w, y_w, z_w)