This repository has been archived by the owner on Aug 2, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathcomm_server.py
111 lines (85 loc) · 3.28 KB
/
comm_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
### comm_server.py
from collections import deque
from twisted.protocols.basic import LineReceiver
from twisted.internet.protocol import Factory
from twisted.internet import interfaces
from zope.interface import implements
import command
class ProducerToManyClient:
implements(interfaces.IConsumer)
def __init__(self):
print('initing {}'.format(self.__class__))
self.clients = []
def addClient(self, client):
print('client added to one2many made')
self.clients.append(client)
def write(self, data):
#print('one2many received data')
for client in self.clients:
client.write(data)
def removeClient(self, client, reason):
self.clients.remove(client)
class ProducerConsumerBufferProxy:
"""Proxy which buffers a few telemetry blocks and drops old ones"""
implements(interfaces.IPushProducer, interfaces.IConsumer)
def __init__(self, producer, consumer):
print('initing {}'.format(self.__class__))
self._paused = False
self._buffer = deque(maxlen = 10)
self._producer = producer
self._consumer = consumer
self._producer.addClient(self)
def pauseProducing(self):
print('pausing {}'.format(
self._consumer.transport.getPeer()))
self._paused = True
def resumeProducing(self):
print('resuming {}'.format(
self._consumer.transport.getPeer()))
self._paused = False
def unregisterProducer(self):
self._producer.removeClient(self)
def stopProducing(self):
pass
def write(self, data):
self._buffer.append(data)
if not self._paused:
for data in self._buffer:
self._consumer.transport.write(data)
self._buffer.clear()
class ServeTelemetry(LineReceiver):
"""Serve the telemetry"""
def __init__(self, producer, raw_source, header):
print('initing {}'.format(self.__class__))
self._producer = producer
self._is_commander = False
self._raw_telemetry_source = raw_source
self._header = header
def connectionMade(self):
self.proxy = ProducerConsumerBufferProxy(self._producer, self)
self.transport.registerProducer(self.proxy, True)
self.proxy.write(self._header+'\r\n')
self.proxy.resumeProducing()
def lineReceived(self, line):
print('from {} received line {}'.format(
self.transport.getPeer(), line))
if line == 'commander':
self._is_commander = True
elif self._is_commander:
valid, cmd = command.parse_command(line.rstrip())
if valid:
self._raw_telemetry_source.async_tx(cmd)
else:
print('command not valid')
def connectionLost(self, reason):
print('connection lost from {}'.format(self.transport.getPeer()))
self.transport.unregisterProducer()
class TelemetryFactory(Factory):
def __init__(self, raw_source, header):
self.clients = []
self._raw_source = raw_source
self._header= header
def setSource(self, telemetrySource):
self._telemetrySource = telemetrySource
def buildProtocol(self, addr):
return ServeTelemetry(self._telemetrySource, self._raw_source, self._header)