-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcontrol_logic.py
307 lines (197 loc) · 9.73 KB
/
control_logic.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
##################################################################################
# Copyright (c) 2024 Matthew Thomas Beck #
# #
# All rights reserved. This code and its associated files may not be reproduced, #
# modified, distributed, or otherwise used, in part or in whole, by any person #
# or entity without the express written permission of the copyright holder, #
# Matthew Thomas Beck. #
##################################################################################
############################################################
############### IMPORT / CREATE DEPENDENCIES ###############
############################################################
########## IMPORT DEPENDENCIES ##########
##### import necessary libraries #####
import RPi.GPIO as GPIO # import GPIO library for pin control
import pigpio # import pigpio library for PWM control
import logging # import logging library for debugging
import os # import os library for system commands and log files
########## CREATE DEPENDENCIES ##########
##### initialize GPIO and pigpio #####
GPIO.setmode(GPIO.BCM) # set gpio mode to bcm so pins a referred to the same way as the processor refers them
pi = pigpio.pi() # set pigpio object to pi so it can be referred to as pi throughout the script
##### set virtual environment path #####
venv_path = "/home/matthewthomasbeck/.virtualenvs/openvino/bin/activate_this.py" # activate the virtual environment
with open(venv_path) as f: # open the virtual environment path
exec(f.read(), {'__file__': venv_path}) # execute the virtual environment path
##### set up logging #####
logFile = "/home/matthewthomasbeck/Projects/Robot_Dog/robot_dog.log" # set the log file path
if os.path.exists(logFile): # if the old log file exists...
os.rename(logFile, f"{logFile}.bak") # rename the old log file with a timestamp
logging.basicConfig( # configure the logging module to write mode, overwriting the old log file
filename=logFile,
filemode='w',
level=logging.DEBUG,
format='%(asctime)s %(levelname)s: %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
logging.info("Starting control_logic.py script...\n") # log the start of the script
########## IMPORT DEPENDENCIES ##########
##### import initialization functions #####
from initialize.initialize_receiver import * # import PWMDecoder class from initialize_receiver along with functions
from initialize.initialize_servos import * # import servo initialization functions and maestro object
from initialize.initialize_camera import * # import camera initialization functions
from initialize.initialize_opencv import * # import opencv initialization functions
##### import movement functions #####
from movement.standing.standing_inplace import * # import standing functions
from movement.walking.manual_walking import * # import walking functions
#########################################
############### RUN ROBOT ###############
#########################################
########## RUN ROBOTIC PROCESS ##########
def runRobot(): # central function that runs the robot
##### set vairables #####
global CHANNEL_DATA
CHANNEL_DATA = {pin: 1500 for pin in PWM_PINS} # initialize with neutral values
decoders = [] # define decoders as empty list
IS_NEUTRAL = False # assume robot is not in neutral standing position until neutralStandingPosition() is called
##### initialize camera #####
camera_process = start_camera_process(width=640, height=480, framerate=30)
if camera_process is None:
logging.error("ERROR (control_logic.py): Failed to start camera process. Exiting...\n")
exit(1)
##### initialize PWM decoders #####
for pin in PWM_PINS:
decoder = PWMDecoder(pi, pin, pwmCallback)
decoders.append(decoder)
##### run robotic logic #####
try: # try to put the robot in a neutral standing position
neutralStandingPosition() # move to neutral standing position
IS_NEUTRAL = True # set IS_NEUTRAL to True
time.sleep(3) # wait for 3 seconds
except Exception as e: # if there is an error, log the error
logging.error(f"ERROR (control_logic.py): Failed to move to neutral standing position in runRobot: {e}\n")
mjpeg_buffer = b'' # Initialize buffer for MJPEG frames
try:
while True:
# Read chunk of data from the camera process
chunk = camera_process.stdout.read(4096)
if not chunk:
logging.error("ERROR (control_logic.py): Camera process stopped sending data.")
break
mjpeg_buffer += chunk
# Attempt to decode a single frame and display
prev_len = len(mjpeg_buffer)
mjpeg_buffer = decode_and_show_frame(mjpeg_buffer)
if mjpeg_buffer is None:
# If something went very wrong, stop
logging.warning("WARNING (control_logic.py): decode_and_show_frame returned None. Stopping.")
break
elif len(mjpeg_buffer) == prev_len:
# Means no complete JPEG was found (incomplete frame)
# If the buffer grows too large, reset it
if len(mjpeg_buffer) > 65536:
logging.warning("WARNING (control_logic.py): MJPEG buffer overflow. Resetting buffer.")
mjpeg_buffer = b''
# Check if 'q' was pressed in the imshow window
if cv2.waitKey(1) & 0xFF == ord('q'):
logging.info("Exiting camera feed display.")
break
# Handle commands
commands = interpretCommands(CHANNEL_DATA)
for channel, (action, intensity) in commands.items():
IS_NEUTRAL = executeCommands(channel, action, intensity, IS_NEUTRAL)
except KeyboardInterrupt:
logging.info("KeyboardInterrupt received. Exiting...\n")
except Exception as e:
logging.error(f"ERROR (control_logic.py): Unexpected exception in main loop: {e}\n")
exit(1)
finally:
##### clean up servos and decoders #####
disableAllServos()
for decoder in decoders:
decoder.cancel()
##### close camera #####
if camera_process.poll() is None:
camera_process.terminate()
camera_process.wait()
cv2.destroyAllWindows()
##### clean up GPIO and pigpio #####
pi.stop()
GPIO.cleanup()
closeMaestroConnection(MAESTRO)
########## PWM CALLBACK ##########
def pwmCallback(gpio, pulseWidth): # function to set pulse width to channel data
CHANNEL_DATA[gpio] = pulseWidth # set channel data to pulse width
########## INTERPRET COMMANDS ##########
def executeCommands(channel, action, intensity, IS_NEUTRAL): # function to interpret commands from channel data and do things
##### squat channel 2 #####
if channel == 'channel-2':
if action == 'NEUTRAL' or action == 'SQUAT DOWN':
if action == 'SQUAT DOWN':
pass
# function to squat
elif action == 'SQUAT UP':
# function to neutral
print(f"{channel}: {action}")
##### tilt channel 0 #####
if channel == 'channel-0':
if action == 'TILT DOWN':
print(f"{channel}: {action}")
elif action == 'TILT UP':
print(f"{channel}: {action}")
##### trigger channel 1 #####
elif channel == 'channel-1':
if action == 'TRIGGER SHOOT':
print(f"{channel}: {action}")
##### rotation channel 3 #####
elif channel == 'channel-3':
if action == 'ROTATE LEFT':
print(f"{channel}: {action}")
elif action == 'ROTATE RIGHT':
print(f"{channel}: {action}")
##### look channel 4 #####
elif channel == 'channel-4':
if action == 'LOOK DOWN':
print(f"{channel}: {action}")
elif action == 'LOOK UP':
print(f"{channel}: {action}")
##### move channel 5 #####
elif channel == 'channel-5':
if action == 'MOVE FORWARD':
print(f"{channel}: {action}")
try:
walkForward(intensity)
IS_NEUTRAL = False
except Exception as e:
logging.error(f"ERROR (control_logic.py): Failed to move forward in executeCommands: {e}\n")
elif action == 'NEUTRAL':
try:
if IS_NEUTRAL == False:
neutralStandingPosition()
IS_NEUTRAL = True
except Exception as e:
logging.error(f"ERROR (control_logic.py): Failed to move to neutral standing position in executeCommands: {e}\n")
elif action == 'MOVE BACKWARD':
print(f"{channel}: {action}")
try:
#walkBackward(intensity)
IS_NEUTRAL = False
except Exception as e:
logging.error(f"ERROR (control_logic.py): Failed to move backward in executeCommands: {e}\n")
##### shift channel 6 #####
elif channel == 'channel-6':
if action == 'SHIFT LEFT':
print(f"{channel}: {action}")
elif action == 'SHIFT RIGHT':
print(f"{channel}: {action}")
##### extra channel 7 #####
elif channel == 'channel-7':
if action == '+':
print(f"{channel}: {action}")
elif action == '-':
print(f"{channel}: {action}")
##### update is neutral standing #####
return IS_NEUTRAL # return neutral standing boolean for position awareness
########## RUN ROBOTIC PROCESS ##########
##### complete all initialization and begin robotic process #####
runRobot() # initialize receiver