-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathbrose.py
66 lines (49 loc) · 1.48 KB
/
brose.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
import can
import threading
import time
import os
import pickle
# os.system('echo pass | sudo -S ./CAN_ON')
# time.sleep(1)
channels = ['kvaser', 'microchip']
buses = {ch: can.ThreadSafeBus(channel=ch,
interface='socketcan', # noqa
bitrate=250000, # noqa
recevie_own_messages=False) # noqa
for ch in channels}
print('')
print('Emptying read buffers...')
for bus in buses.values():
bus.flush_tx_buffer()
m = bus.recv(0.01)
while type(m) == can.Message:
m = bus.recv(0.01)
print('Read buffers empty.')
captured = list()
run = True
def forwarder(source, destination, blacklist):
for msg in source:
if msg.arbitration_id in blacklist or msg.is_error_frame:
continue
destination.send(msg)
captured.append(msg)
print(msg)
if not run:
break
notImportantForBattery = {0x201}
t1 = threading.Thread(target=forwarder, args=(buses['kvaser'], buses['microchip'], notImportantForBattery))
t1.name = 'From bike to battery'
buses['kvaser'].send(can.Message())
notImportantForBike = {}
t2 = threading.Thread(target=forwarder, args=(buses['microchip'], buses['kvaser'], notImportantForBike))
t2.name = 'From battery to bike'
threads = [t1, t2]
for t in threads:
t.start()
while len(captured) < 500:
time.sleep(0.1)
pickle.dump(captured, open("captured.p", "wb"))
run = False
for t in threads:
t.join()
print('Switching off')