-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathswarm_rl_manousos.py
2533 lines (2047 loc) · 102 KB
/
swarm_rl_manousos.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# -*- coding: utf-8 -*-
import os
import sys
import time
import copy
import random
import functools
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import multiprocessing
from IPython import get_ipython
from scipy.spatial.distance import cdist
from PIL import Image
import re
import collections
from scipy.stats import norm
from sklearn.preprocessing import MinMaxScaler
import warnings
warnings.filterwarnings('ignore')
def check_run_colab() -> bool:
return bool('google.colab' in sys.modules)
# Check if the current environment is Google Colab
if check_run_colab():
print("Running in Google Colab!")
from google.colab import drive
drive.mount('/content/drive')
else:
print("Not running in Google Colab.")
"""#### The Agent Class:"""
class Agent:
def __init__(self, start: tuple, goal: tuple, real_stage, view_range=2):
self.x = start[0]
self.y = start[1]
self.goal = goal
self.view_range = view_range
self.explored_stage = np.full_like(real_stage, -1)
self.explored_stage[self.x, self.y] = 0
self.agent_view(real_stage)
self.start_time = time.time()
self.u_hedac = None # hedac related parameter.
# voronoi related parameters:
self.voronoi_coords = None
self.broadcast_range = max(real_stage.shape[0], real_stage.shape[1]) // 4
self.visited_cells = np.zeros_like(real_stage)
self.set_voronoi_goal = False
def agent_view(self, real_stage):
""" Refreshes the explored map of the agent (sees up, down, left, right). """
up_obs, upleft_obs, upright_obs, down_obs, downleft_obs, downright_obs, left_obs, right_obs = False, False, False, False, False, False, False, False
for i in range(self.view_range):
if self.x > i: # checks up
tmp_x = self.x - i - 1
if not up_obs: # stops if it sees obstacle
self.explored_stage[(tmp_x, self.y)] = real_stage[(tmp_x, self.y)]
if real_stage[(tmp_x, self.y)]:
up_obs = True
if self.y > i and not upleft_obs: # up-left
if not upleft_obs: # stops if it sees obstacle
self.explored_stage[(tmp_x, self.y - i - 1)] = real_stage[(tmp_x, self.y - i - 1)]
if real_stage[(tmp_x, self.y - i - 1)]:
upleft_obs = True
if self.y < len(real_stage[0]) - i - 1: # up-right
if not upright_obs: # stops if it sees obstacle
self.explored_stage[(tmp_x, self.y + i + 1)] = real_stage[(tmp_x, self.y + i + 1)]
if real_stage[(tmp_x, self.y + i + 1)]:
upright_obs = True
if self.x < len(real_stage) - i - 1: # checks down:
tmp_x = self.x + i + 1
if not down_obs:
self.explored_stage[(tmp_x, self.y)] = real_stage[(tmp_x, self.y)]
if real_stage[(tmp_x, self.y)]:
down_obs = True
if self.y > i: # down-left
if not downleft_obs:
self.explored_stage[(tmp_x, self.y - i - 1)] = real_stage[(tmp_x, self.y - i - 1)]
if real_stage[(tmp_x, self.y - i - 1)]:
downleft_obs = True
if self.y < len(real_stage[0]) - i - 1: # down-right
if not downright_obs:
self.explored_stage[(tmp_x, self.y + i + 1)] = real_stage[(tmp_x, self.y + i + 1)]
if real_stage[(tmp_x, self.y + i + 1)]:
downright_obs = True
if self.y > i and not left_obs: # left (& stops if it sees obstacle)
self.explored_stage[(self.x, self.y - i - 1)] = real_stage[(self.x, self.y - i - 1)]
if real_stage[(self.x, self.y - i - 1)]:
left_obs = True
if self.y < len(real_stage[0]) - i - 1 and not right_obs: # right (& stops if it sees obstacle)
self.explored_stage[(self.x, self.y + i + 1)] = real_stage[(self.x, self.y + i + 1)]
if real_stage[(self.x, self.y + i + 1)]:
right_obs = True
self.explored_stage[self.explored_stage == 2] = 0
def check_goal(self):
if (self.x, self.y) == self.goal:
return True
return False
"""#### **A\* Algorithm** (source [here](https://pypi.org/project/python-astar/)): Produces a path from start to end using A\*."""
"""
Python-astar - A* path search algorithm
"""
class Tile:
"""A tile is a walkable space on a map."""
distance = 0
came_from = None
def __init__(self, x, y, weight=1):
self.x = x
self.y = y
self.weight = 1
assert (self.x is not None and self.y is not None)
def update_origin(self, came_from):
"""Update which tile this one came from."""
self.came_from = came_from
self.distance = came_from.distance + self.weight
def __eq__(self, other):
"""A tile is the same if they have the same position"""
return (other and self.x == other.x and self.y == other.y)
def __lt__(self, other):
"""We want the shortest distance tile to find the happy path.
This is used by min() so we can just compare them :)
"""
return (self.distance + self.weight <= other.distance)
def __hash__(self):
"""We need this so we can use a set()"""
return hash(str(self))
@property
def pos(self):
"""a (x, y) tuple with position on the grid"""
return (self.x, self.y)
def __str__(self):
return str(self.pos)
def __repr__(self):
return str(self)
class AStar:
"""The A Star (A*) path search algorithm"""
def __init__(self, world, coverage_mode: bool = False):
world2 = copy.deepcopy(world)
world2[world2 == -1] = 0
if coverage_mode: # coverage_mode == different goals.
world2[world2 == 2] = 1 # astar takes agents into account.
else:
world2[world2 == 2] = 0
self.world = world2
def search(self, start_pos, target_pos):
"""A_Star (A*) path search algorithm"""
start = Tile(*start_pos)
self.open_tiles = set([start])
self.closed_tiles = set()
# while we still have tiles to search
while len(self.open_tiles) > 0:
# get the tile with the shortest distance
tile = min(self.open_tiles)
# check if we're there. Happy path!
if tile.pos == target_pos:
return self.rebuild_path(tile)
# search new ways in the neighbor's tiles.
self.search_for_tiles(tile)
self.close_tile(tile)
# if we got here, path is blocked :(
return None
def search_for_tiles(self, current):
"""Search for new tiles in the maze"""
for other in self.get_neighbors(current):
if self.is_new_tile(other):
other.update_origin(current)
self.open_tiles.add(other)
# if this other has gone a farthest distance before
# then we just found a new and shortest way to it.
elif other > current:
other.update_origin(current)
if other in self.closed_tiles:
self.reopen_tile(other)
def get_neighbors(self, tile):
"""Return a list of available tiles around a given tile"""
min_x = max(0, tile.x - 1)
max_x = min(len(self.world)-1, tile.x + 1)
min_y = max(0, tile.y - 1)
max_y = min(len(self.world[tile.x])-1, tile.y + 1)
available_tiles = [
(min_x, tile.y),
(max_x, tile.y),
(tile.x, min_y),
(tile.x, max_y),
]
neighbors = []
for x, y in available_tiles:
if (x, y) == tile.pos:
continue
if self.world[x][y] == 0:
neighbors.append(Tile(x, y))
return neighbors
def rebuild_path(self, current):
"""Rebuild the path from each tile"""
self.last_tile = current
path = []
while current is not None:
path.append(current)
current = current.came_from
path.reverse()
# return a list with tuples
return [tile.pos for tile in path]
def is_new_tile(self, tile):
"""Check if this is a proviously unknown tile"""
return (
tile not in self.open_tiles
and tile not in self.closed_tiles
)
def reopen_tile(self, tile):
"""Reinstate a tile in the open list"""
self.closed_tiles.remove(tile)
self.open_tiles.add(tile)
def close_tile(self, tile):
"""Remove tile from open_tiles, as we are done testing it"""
self.open_tiles.remove(tile)
self.closed_tiles.add(tile)
"""#### **Flood Fill**"""
def flood_fill(expl_maze, start, agent_obs=True):
# function inputs: expl_maze = explored maze of agent, start = the position of the agent.
maze = copy.deepcopy(expl_maze)
if agent_obs: # agents are obstacle.
maze = np.where(maze == 2, 1, maze)
else: # agents are not obstacles
maze = np.where(maze == 2, 0, maze)
maze = np.where(maze == 2, 1, maze)
maze = np.where(maze == -1, 0, maze)
distances = np.full_like(maze, fill_value=np.iinfo(np.int32).max, dtype=np.float64)
distances[start] = 0
directions = [(-1, 0), (0, 1), (1, 0), (0, -1)]
def fill(x, y, distance):
distances[x, y] = distance
for dx, dy in directions:
nx, ny = x + dx, y + dy
if 0 <= nx < maze.shape[0] and 0 <= ny < maze.shape[1]:
if maze[nx, ny] == 0 and distances[nx, ny] > distance + 1:
fill(nx, ny, distance + 1)
fill(start[0], start[1], 0)
distances[distances == np.iinfo(np.int32).max] = np.inf
# if no where left to go, stays where it is. Else, goes away from start pos.
distances[start] = np.inf
if np.all(distances == np.inf):
distances[start] = 0
return distances
def flood_fill_path(start_grid, start, end):
"""Returns a path representing the shortest flood fill path."""
grid = copy.deepcopy(start_grid)
grid = np.where(grid == 2, 1, grid)
grid = np.where(grid == -1, 0, grid)
rows, cols = grid.shape
directions = [(-1, 0), (1, 0), (0, -1), (0, 1)] # Up, Down, Left, Right
# Queue for flood fill exploration
queue = collections.deque([start])
# Distance map to keep track of shortest known distance from start
distances = np.inf * np.ones((rows, cols))
distances[start] = 0
# Perform flood fill exploration
while queue:
x, y = queue.popleft()
if (x, y) == end:
break
for dx, dy in directions:
nx, ny = x + dx, y + dy
if 0 <= nx < rows and 0 <= ny < cols and grid[nx, ny] == 0:
new_distance = distances[x, y] + 1
if new_distance < distances[nx, ny]:
distances[nx, ny] = new_distance
queue.append((nx, ny))
# Check if end coordinate is reachable
if distances[end] == np.inf:
return None # No path found
# Reconstruct path from end to start using shortest distance
path = []
current = end
while current != start:
path.append(current)
x, y = current
# Find the neighbor with the shortest distance
min_distance = np.inf
next_step = None
for dx, dy in directions:
nx, ny = x + dx, y + dy
if 0 <= nx < rows and 0 <= ny < cols and distances[nx, ny] < min_distance:
min_distance = distances[nx, ny]
next_step = (nx, ny)
current = next_step
path.append(start)
path.reverse()
return path
"""#### **Dijkstra**"""
import heapq
def dijkstra_path(start_grid, start, end):
grid = copy.deepcopy(start_grid)
grid = np.where(grid == 2, 1, grid)
grid = np.where(grid == -1, 0, grid)
rows, cols = grid.shape
directions = [(-1, 0), (1, 0), (0, -1), (0, 1)] # Up, Down, Left, Right
# Priority queue for Dijkstra's algorithm
pq = []
heapq.heappush(pq, (0, start)) # (cost, (x, y))
# Distance map to keep track of shortest distances from start
distances = np.inf * np.ones((rows, cols))
distances[start] = 0
# Parent map to reconstruct the path
parent = {}
while pq:
current_cost, (x, y) = heapq.heappop(pq)
if (x, y) == end:
break
# Explore neighbors
for dx, dy in directions:
nx, ny = x + dx, y + dy
if 0 <= nx < rows and 0 <= ny < cols and grid[nx, ny] == 0:
new_cost = current_cost + 1 # Assuming each move has cost 1
if new_cost < distances[nx, ny]:
distances[nx, ny] = new_cost
heapq.heappush(pq, (new_cost, (nx, ny)))
parent[(nx, ny)] = (x, y)
# Reconstruct path from end to start using parent map
path = []
if (end in parent) or end == start:
step = end
while step in parent:
path.append(step)
step = parent[step]
path.append(start)
path.reverse()
return path if path else None
def dijkstra(start_grid, start):
grid = copy.deepcopy(start_grid)
grid = np.where(grid == 2, 1, grid)
grid = np.where(grid == -1, 0, grid)
rows, cols = grid.shape
directions = [(-1, 0), (1, 0), (0, -1), (0, 1)] # Up, Down, Left, Right
# Priority queue for Dijkstra's algorithm
pq = []
heapq.heappush(pq, (0, start)) # (cost, (x, y))
# Distance matrix to store shortest distances from the start point
distances = np.full((rows, cols), np.inf)
distances[start] = 0 # Distance to the start point is 0
# Perform Dijkstra's algorithm
while pq:
current_cost, (x, y) = heapq.heappop(pq)
# If the popped node's cost exceeds its current recorded cost, skip
if current_cost > distances[x, y]:
continue
for dx, dy in directions:
nx, ny = x + dx, y + dy
if 0 <= nx < rows and 0 <= ny < cols and grid[nx, ny] == 0:
new_cost = current_cost + 1 # Assuming each move has cost 1
if new_cost < distances[nx, ny]:
distances[nx, ny] = new_cost
heapq.heappush(pq, (new_cost, (nx, ny)))
return distances
"""#### **Wavefront**"""
def wavefront(start_grid, start, agent_obs=True):
grid = copy.deepcopy(start_grid)
if agent_obs: # agents are obstacle.
grid = np.where(grid == 2, 1, grid)
else: # agents are not obstacles
grid = np.where(grid == 2, 0, grid)
grid = np.where(grid == -1, 0, grid)
rows, cols = grid.shape
directions = [(-1, 0), (1, 0), (0, -1), (0, 1)] # Up, Down, Left, Right
# Queue for wavefront expansion
queue = collections.deque([start])
# Distance matrix to store shortest distances from the end point
distances = np.full((rows, cols), np.inf)
distances[start] = 0 # Distance to the end point is 0
# Perform wavefront propagation
while queue:
x, y = queue.popleft()
for dx, dy in directions:
nx, ny = x + dx, y + dy
if 0 <= nx < rows and 0 <= ny < cols and grid[nx, ny] == 0:
new_distance = distances[x, y] + 1
if new_distance < distances[nx, ny]:
distances[nx, ny] = new_distance
queue.append((nx, ny))
return distances
from queue import Queue
def wavefront_path(start_grid, start, end):
grid = copy.deepcopy(start_grid)
grid = np.where(grid == 2, 1, grid)
grid = np.where(grid == -1, 0, grid)
rows, cols = grid.shape
visited = np.zeros_like(grid)
queue = Queue()
queue.put(start)
visited[start] = 1
while not queue.empty():
current = queue.get()
if current == end:
break
for dr, dc in [(1, 0), (-1, 0), (0, 1), (0, -1)]:
r, c = current[0] + dr, current[1] + dc
if 0 <= r < rows and 0 <= c < cols and grid[r, c] == 0 and visited[r, c] == 0:
queue.put((r, c))
visited[r, c] = visited[current] + 1
if visited[end] == 0:
return None
# Reconstruct the path
path = [end]
while path[-1] != start:
for dr, dc in [(1, 0), (-1, 0), (0, 1), (0, -1)]:
r, c = path[-1][0] + dr, path[-1][1] + dc
if 0 <= r < rows and 0 <= c < cols and visited[r, c] == visited[path[-1]] - 1:
path.append((r, c))
break
return path[::-1] # Reverse the path to start from start
"""# Stage/Maze (*)
This section contains code for:
* plot the stage/maze
* creation of gifs (that show the exploration process)
* simple stage creation
* maze creation
* agent generation (on stage)
* update of agent explored stage
* maze exploration metrics (from the paper *Yan, Z., Fabresse, L., Laval, J., & Bouraqadi, N. (2015, September). Metrics for performance benchmarking of multi-robot exploration. In 2015 IEEE/RSJ International Conference on Intelligent Robots and Systems (IROS) (pp. 3407-3414). IEEE*)
Function to plot the grid/maze:
"""
def draw_maze(maze, path=None, goal=None, save_gif=False, numbered=False):
fig, ax = plt.subplots(figsize=(10,10))
fig.patch.set_edgecolor('white')
fig.patch.set_linewidth(0)
ax.imshow(maze, cmap=plt.cm.binary, interpolation='nearest')
if path is not None:
x_coords = [x[1] for x in path]
y_coords = [y[0] for y in path]
ax.plot(x_coords, y_coords, color='red', linewidth=2)
ax.scatter(path[-1][1], path[-1][0], color='red', s=100, marker='s')
if goal is not None:
ax.scatter(goal[1], goal[0], color='red', s=100, marker='s')
if numbered:
for i in range(maze.shape[0]):
for j in range(maze.shape[1]):
cell_number = maze[i, j]
ax.text(j, i, str(int(cell_number)), color='black', ha='center', va='center', fontsize=10)
ax.set_xticks([])
ax.set_yticks([])
# Saves images to folder ========================
if save_gif:
if not os.path.exists("tmp_img"):
os.makedirs("tmp_img")
filepath = os.path.join("tmp_img", f"{time.time()}.png")
plt.savefig(filepath)
# =============================================
plt.show()
"""Function for plotting the grid/maze in Voronoi mode (distributed maze):"""
from matplotlib.colors import ListedColormap, Normalize
def draw_maze_voronoi(v_map, agent_explored=None, path=None, goal=None, save_gif=False):
unique_values = np.unique(v_map)
# print(len(unique_values))
if len(unique_values) > 11:
print("Colored plot not supported for more than 10 agents. Plotting in grayscale:")
draw_maze(v_map, path, goal, save_gif) if agent_explored is None else draw_maze(agent_explored, path, goal, save_gif)
return
tmp_v_map = copy.deepcopy(v_map)
fig, ax = plt.subplots(figsize=(10,10))
fig.patch.set_edgecolor('white')
fig.patch.set_linewidth(0)
if agent_explored is not None:
tmp_v_map[agent_explored == 1] = -1 # obstacles -> gray
tmp_v_map[agent_explored == -1] = -2 # unexplored areas -> white
tmp_v_map[agent_explored == 2] = -3 # agents -> black
color_dict = {-3: 'black', -2: 'white', -1: '#555555', 0: "blue", 1: "brown", 2: "green", 3: "purple", 4: "orange", 5: "cyan", 6: "magenta", 7: "yellow", 8: "lime", 9: "pink"}
# else:
# color_dict = {-1: 'darkgray', 0: "blue", 1: "brown", 2: "green", 3: "purple", 4: "orange", 5: "cyan", 6: "magenta", 7: "yellow", 8: "lime", 9: "pink"}
colors = [color_dict[val] for val in sorted(color_dict.keys())]
cmap = ListedColormap(colors)
norm = Normalize(vmin=min(color_dict.keys()), vmax=max(color_dict.keys()))
plt.imshow(tmp_v_map, cmap=cmap, norm=norm, interpolation='nearest')
# if agent_explored is None:
# plt.colorbar()
if path is not None:
x_coords = [x[1] for x in path]
y_coords = [y[0] for y in path]
ax.plot(x_coords, y_coords, color='red', linewidth=2)
ax.scatter(path[-1][1], path[-1][0], color='red', s=100, marker='s')
if goal is not None:
ax.scatter(goal[1], goal[0], color='red', s=100, marker='s')
ax.set_xticks([])
ax.set_yticks([])
# Saves images to folder ========================
if save_gif:
if not os.path.exists("tmp_img"):
os.makedirs("tmp_img")
filepath = os.path.join("tmp_img", f"{time.time()}.png")
plt.savefig(filepath)
# =============================================
plt.show()
# print("---------------")
"""Function to convert images to gifs."""
def images_to_gif(gif_filename=f"maze_{time.time()}.gif", duration=300, image_folder="tmp_img", gif_folder="utils"):
image_files = [f for f in os.listdir(image_folder) if f.endswith('.png') and os.path.isfile(os.path.join(image_folder, f))]
image_files.sort()
images = []
for image_file in image_files:
image_path = os.path.join(image_folder, image_file)
image = Image.open(image_path)
images.append(image)
gif_filepath = os.path.join(gif_folder, gif_filename)
images[0].save(gif_filepath, save_all=True, append_images=images[1:], loop=0, duration=duration)
for image_file in image_files:
os.remove(os.path.join(image_folder, image_file))
time.sleep(1)
"""Create a stage with obstacles (1) and free path (0)."""
def generate_stage(rows: int, cols: int, obs_prob = 0.2):
# generate obstacles with obs_prob probability
num_obstacles = int(rows * cols * obs_prob)
stage = np.full((rows, cols), 0)
# Set 1s at random positions for the specified percentage
indices = np.random.choice(rows * cols, num_obstacles, replace=False)
stage.flat[indices] = 1
return stage
"""Function for maze creation. [Source](https://medium.com/@msgold/using-python-to-create-and-solve-mazes-672285723c96)."""
def create_maze(rows, cols, obs_prob=0.8):
rows = int(rows / 2)
cols = int(cols / 2)
maze = np.ones((rows*2+1, cols*2+1))
x, y = (0, 0)
stack = [(x, y)]
while len(stack) > 0:
x, y = stack[-1]
directions = [(0, 1), (1, 0), (0, -1), (-1, 0)]
random.shuffle(directions)
for dx, dy in directions:
nx, ny = x + dx, y + dy
if nx >= 0 and ny >= 0 and nx < rows and ny < cols and maze[2*nx+1, 2*ny+1] == 1:
maze[2*nx+1, 2*ny+1] = 0
maze[2*x+1+dx, 2*y+1+dy] = 0
stack.append((nx, ny))
break
else:
stack.pop()
zero_indices = np.argwhere(maze == 0)
zero_coords = [tuple(index) for index in zero_indices]
directions = [(0, 1), (1, 0), (0, -1), (-1, 0)] # adds randomly crosses of free space.
for z in zero_coords:
if random.random() >= obs_prob:
for dx, dy in directions:
nx, ny = z[0] + dx, z[1] + dy
maze[nx, ny] = 0
maze[0, :] = 1
maze[-1, :] = 1
maze[:, 0] = 1
maze[:, -1] = 1
# removes crosses (so agents wont be stuck).
for i in range(maze.shape[0]):
for j in range(maze.shape[1]):
walls = []
for d in directions:
neighbor_i = i + d[0]
neighbor_j = j + d[1]
# Check if neighbor is in bounds
if 0 <= neighbor_i < maze.shape[0] and 0 <= neighbor_j < maze.shape[1] and maze[(neighbor_i, neighbor_j)]:
walls.append((neighbor_i, neighbor_j))
if len(walls) >= len(directions):
for coord in walls:
maze[coord] = 0
# re-adds the boundaries (after cross removed).
maze[0, :] = 1
maze[-1, :] = 1
maze[:, 0] = 1
maze[:, -1] = 1
# draw_maze(maze)
return maze
""" Creates the "explored" stage, which at the start everything is not explored (-1) and put the agents there (2)."""
def generate_agents(real_stage, num_agents: int = 1, view_range: int = 2, coverage_mode: bool = False):
agents = []
if num_agents <= 0:
num_agents = 1
zero_coordinates = list(zip(*np.where(real_stage == 0)))
goal = random.choice(zero_coordinates)
zero_coordinates.remove(goal)
# Create the "explored" stage
for _ in range(num_agents):
if zero_coordinates:
start = random.choice(zero_coordinates)
zero_coordinates.remove(start)
agents.append(Agent((start[0], start[1]), (goal[0], goal[1]), real_stage, view_range))
if coverage_mode: # puts different goals
goal = random.choice(zero_coordinates)
zero_coordinates.remove(goal)
else:
break
return agents
"""Function to concat all agents explored stages (returns the total explored stage):"""
def update_total_explored(agents, coverage_mode=False, agents_not_in_range=False):
if len(agents) == 0:
return
total_explored = np.full(agents[0].explored_stage.shape, -1)
# fills the border with 1s
total_explored[0, :] = 1
total_explored[-1, :] = 1
total_explored[:, 0] = 1
total_explored[:, -1] = 1
for a in agents:
total_explored[total_explored == -1] = a.explored_stage[total_explored == -1]
total_explored[total_explored == 2] = 0
for a in agents:
if coverage_mode: # agents never leave the stage when coverage mode.
total_explored[a.x, a.y] = 2
elif not a.check_goal(): # if agent has reached goal -> collidable (as if it is removed from the stage).
total_explored[a.x, a.y] = 2
if not agents_not_in_range: # if agents are in range, shares info (THIS IS USED ONLY IN VORONOI MODE).
# Total explored contains the concats of all agents stages:
for a in agents:
a.explored_stage = copy.deepcopy(total_explored)
return total_explored
"""Function to return only the unexplored coords assigned by voronoi method **for each agent**. This method is used only in voronoi coverage."""
def find_unexp_voronoi(agent):
unexp_vor_coords = []
for v_coord in list(agent.voronoi_coords):
if v_coord != (agent.x, agent.y) and agent.explored_stage[v_coord] == -1:
unexp_vor_coords.append(v_coord)
return np.array(unexp_vor_coords)
"""### Maze Exploration Metrics
<u>Citation:</u> Yan, Z., Fabresse, L., Laval, J., & Bouraqadi, N. (2015, September). Metrics for performance benchmarking of multi-robot exploration. In 2015 IEEE/RSJ International Conference on Intelligent Robots and Systems (IROS) (pp. 3407-3414). IEEE
Here follow some metrics discussed in the paper and used in our experiments.
#### Exploration Time:
Calculates the exploration time, from start to entire maze coverage. For our implementation, the agents run sequentially, thats why to calculate the exploration time, we use the following formula:
$$
explorationTime(n) = \frac{\sum_{i=1}^{n} t_{s_i}}{n} \cdot R
$$
where $t_{s_i}$ is the time needed for an agent to make a step, $R$ is the number of rounds, and $n$ is the number of robots.
#### Map Completeness:
**Calculates the explored percentage of the total explored stage**. According to the paper [above](https://yzrobot.github.io/publications/yz15iros.pdf), it is based on the formula:
$$
mapCompleteness = M / P
$$
where $M$ is the amount of explored area and $P$ the total area of ground truth map.
This method is used throughout the maze exploration process, to see at which stage of the exploration we are.
"""
def calculate_expl_percentage(total_explored):
subarray = total_explored[1:-1, 1:-1] # gets all rows and columns except from the borders (agents already know they are obstacles).
num_minus_1 = np.sum(subarray == -1) # gets the unexplored areas.
explored_percentage = 1 - (num_minus_1 / (subarray.shape[0] * subarray.shape[1]))
return explored_percentage # equals to M / P of paper.
"""#### Map Quality:
**Function for calculating the final difference of the real grid and the total explored grid.** According to the paper [above](https://yzrobot.github.io/publications/yz15iros.pdf), it is based on the formula:
$$
mapQuality = \frac{M - A(\text{mapError})}{P}
$$
where $M$ is the total explored area in square meters, $P$ the total area of ground truth map and $A(mapError)$ is the area occupied by the error cells. The error cells are the cells in the explored grid map that have a different value from the corresponding cell in the ground truth map.
**`Map Quality` is a superset from `Map Completeness`. That's why this is what is returned as metric in the conclusion of the maze exploration experiment.**
"""
def check_real_expl(real_grid, total_explored, debug=False):
total_explored = np.where(total_explored == -1, 1, total_explored) # converts all unexplored to 1s.
expl_perc = calculate_expl_percentage(total_explored)
# the borders of the maze are removed from the above calculation (bc the agents already know they are obstacles,
# thats why we dont calculate them). To continue in the same logic, we also remove borders below.
real = copy.deepcopy(real_grid[1:-1, 1:-1])
real = np.where(real == 2, 0, real)
tot = copy.deepcopy(total_explored[1:-1, 1:-1])
tot = np.where(tot == 2, 0, tot)
tot = np.where(tot == -1, 1, tot)
false_positions = np.where(np.not_equal(real, tot))[0]
total_false = np.sum(np.not_equal(real, tot)) / (real_grid.shape[0] * real_grid.shape[1])
if debug: # print statements:
if total_false == 0:
print("Real == Explored? TRUE")
else:
print("Real == Explored? FALSE")
print("Positions of False values:", false_positions)
draw_maze(real)
draw_maze(tot)
return false_positions, expl_perc - total_false
"""#### Exploration Cost
**Function for calculating the cost of the exploration.** According to the paper [above](https://yzrobot.github.io/publications/yz15iros.pdf), it is based on the formula:
$$
explorationCost(n) = \sum_{i=1}^{n} d_i
$$
where $n$ is the number of robots in the fleet, and $d_i$ is the distance traveled by robot $i$.
**This metric is calculated in the corresponding coverage execution functions.**
#### Exploration Efficiency
**Function for calculating the efficienct of the exploration.** According to the paper [above](https://yzrobot.github.io/publications/yz15iros.pdf), it is based on the formula:
$$
explorationEfficiency(n) = \frac{M}{explorationCost(n)}
$$
where $n$ is the number of robots in the fleet and $M$ is the total explored area in square meters.
"""
def calc_exploration_efficiency(total_explored, sum_d):
tot = copy.deepcopy(total_explored[1:-1, 1:-1])
tot = np.where(tot == 2, 0, tot)
tot = np.where(tot == -1, 1, tot)
if sum_d == 0:
sum_d = 1
return np.count_nonzero(tot >= 0) / sum_d
"""# Agents Move Towards one Goal (*)
This section **does not explore the entire stage**. It was made for agents to move to a specific goal.
This should run because it is used later in the code (**not in the thesis though**).
Function for testing astar:
"""
def move_astar(start_grid, start_agents, debug=True):
grid = copy.deepcopy(start_grid)
agents = copy.deepcopy(start_agents)
for agent in agents:
grid[agent.x, agent.y] = 2 # Mark initial agent positions
total_explored = update_total_explored(agents)
total_agents = len(agents)
# print(total_agents)
round_time_list = []
avg_rounds = []
num_finish = 0
rounds = 0
while any((agent.x, agent.y) != agent.goal for agent in agents):
rounds+=1
eps_start_time = time.time()
path_none = 0
for agent in agents:
if agent.check_goal():
num_finish += 1
total_explored = update_total_explored(agents)
agents.remove(agent)
avg_rounds.append(rounds)
continue # Agent has reached its goal
path = AStar(agent.explored_stage).search((agent.x, agent.y), agent.goal)
if debug:
draw_maze(agent.explored_stage, path=path)
if path:
if agent.explored_stage[path[1]] != 2:
grid[agent.x, agent.y] = 0 # Mark the old position as unoccupied
agent.x, agent.y = path[1] # Update agent position
grid[agent.x, agent.y] = 2 # Mark the new position as occupied by agent
# if agent.check_goal():
else:
path_none += 1
agent.agent_view(start_grid)
total_explored = update_total_explored(agents)
round_time_list.append(time.time() - eps_start_time)
if path_none >= len(agents): # stops if no agents have moved.
print(path_none, len(agents))
print("STOP PATH NONE")
break
for agent in agents: # gets the time of some finished agents (that have not been counted),
if agent.check_goal():
num_finish += 1
avg_rounds.append(rounds)
return calculate_expl_percentage(total_explored), rounds, np.mean(avg_rounds), np.sum(round_time_list), num_finish / total_agents, total_explored
"""# Agents Explore Stage (*)
**This section is researched in my thesis.** It contains the algorithms used for maze exploration (HEDAC, nearest frontier, etc). Also, it proposes new method(s) for maze exploration.
## **HEDAC**: Uses Artificial Potential Fields for maze exploration.
<u>Citation:</u> Crnković, B., Ivić, S., & Zovko, M. (2023). Fast algorithm for centralized multi-agent maze exploration. arXiv preprint arXiv:2310.02121 [[source](https://arxiv.org/pdf/2310.02121.pdf)].
For calculation of HEDAC, we iteratively calculate the following:
$$
u_{i,j}^{r+1} = \frac{\sum_{l \in L} a_{i,j}(l)u_{l}^{r} + s(x_{i,j}, t_k)}{\sum_{l \in L} a_{i,j}(l) + \alpha}, \text{ where } r \text{ = iterations until convergence.}
$$
And:
$$
L = \{(i+1, j), (i-1, j), (i, j+1), (i, j-1)\}, \text{ denotes the neighboring nodes of (i, j).}
$$
So:
$$
a_{i,j}(l) = \begin{cases}
0, & \text{if neighbor is obstacle} \\
1, & \text{if neighbor is not obstacle}
\end{cases}
$$
In the paper, `a_ij(l)` can also get the value of 2, but we will just use (0, 1).
Also:
$$
s(x_{i,j}, t_k) = \max(0, 1 - c(x_{i,j}, t_k)) \cdot S(t_k)
$$
where:
$$
c(x_{i,j}, t_k) \text{ = coverage function }
$$
*<u>According to the paper</u>: For each agent that visits the maze node `xi,j` at each time step, 1 is added, and 0 in all other cases.* This has the same result in `s` if 1 is added when cell `xi,j` is explored else 0 (since we get max).
also, for numerical examples (according to the paper):
$$
S(t_k) = |I(t_k)| \text{, where } |I(t_k)| \text{ = number of explored indexes.}
$$
---
**Algorithm 1 from paper** (implemented below):
```bash
initialization ;
for t = 0, t = maxt, t = t + 1 do
exchange of information about discovered nodes between agents;
upadate of a linear system;
iteratively solve a linear system (9),(10) ;
for i = 1, i = N, i = i + 1 do
next pos for agent i = non ocuppied neighbouring nodes where is the largest value of