-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathshared.py
228 lines (193 loc) · 6.98 KB
/
shared.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
import enum
import logging
import pathlib
import signal
import subprocess
import sys
import time
import typing
# TODO: Improve functions documentation
logging.basicConfig(
level=logging.INFO,
format='%(asctime)-15s [%(levelname)s] %(message)s',
)
logger = logging.getLogger(__name__)
SSH_CONNECTION_TIMEOUT = 10
# NOTE: It is important to add "-t" option in order for SSH
# to transfer SIGINT, SIGTERM signals to the command
# NOTE: It is important to add "-o BatchMode=yes" option
# in order to disable any kind of promt
# NOTE: It is important to add # "-o ConnectTimeout={SSH_CONNECTION_TIMEOUT}"
# option in case when the server is down not to wait and be able to check
# quickly that the process has not been started successfully
SSH_COMMON_ARGS = [
'ssh',
'-t',
'-o', 'BatchMode=yes',
'-o', f'ConnectTimeout={SSH_CONNECTION_TIMEOUT}',
]
DELIMETER = 1000000
class AutoName(enum.Enum):
def _generate_next_value_(name, start, count, last_values):
return name
class ProcessHasNotBeenCreated(Exception):
pass
class ProcessHasNotBeenStartedSuccessfully(Exception):
pass
class ProcessHasNotBeenKilled(Exception):
pass
class ParallelSendersExecutionFailed(Exception):
pass
def process_is_running(process):
"""
Returns:
A tuple of (result, returncode) where
- is_running is equal to True if the process is running and False if
the process has terminated,
- returncode is None if the process is running and the actual value
of returncode if the process has terminated.
"""
is_running = True
returncode = process.poll()
if returncode is not None:
is_running = False
return (is_running, returncode)
def create_process(name, args, via_ssh: bool=False):
"""
name: name of the application being started
args: process args
Raises:
ProcessHasNotBeenCreated
ProcessHasNotBeenStarted
"""
try:
logger.debug(f'Starting process: {name}')
if sys.platform == 'win32':
process = subprocess.Popen(
args,
stdin =subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=False,
creationflags=subprocess.CREATE_NEW_PROCESS_GROUP,
bufsize=1
)
else:
process = subprocess.Popen(
args,
#stdin =subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
#universal_newlines=False,
bufsize=1
)
except OSError as e:
raise ProcessHasNotBeenCreated(f'{name}. Error: {e}')
# Check that the process has started successfully and has not terminated
# because of an error
if via_ssh:
time.sleep(SSH_CONNECTION_TIMEOUT + 1)
else:
time.sleep(1)
logger.debug(f'Checking that the process has started successfully: {name}')
is_running, returncode = process_is_running(process)
if not is_running:
raise ProcessHasNotBeenStartedSuccessfully(
f'{name}, returncode {returncode}, stderr: {process.stderr.readlines()}'
)
logger.debug(f'Started successfully: {name}')
return process
def cleanup_process(process_tuple):
"""
Clean up actions for the process.
Raises:
ProcessHasNotBeenKilled
"""
name, process = process_tuple
# NOTE: There is a problem with terminating processes which use SSH
# to run a command on a remote server. The problem is in SSH not
# forwarding a signal (e.g., SIGINT, SIGTERM). As a result, SSH session
# itself terminates and process.poll() returns None, however
# an application started from a command continues to work on a remote server.
# The solution is to use -t option in order to allocate a pseudo-terminal.
# See https://stackoverflow.com/questions/48419781/work-around-ssh-does-not-forward-signal
# for details. FIXME: Maybe it is reasonable to add additional check in
# clean-up actions that the process is not running on a remote server
# ps -A | grep [process_name]
# FIXME: However, there is a problem with wrong interpretation of carriage
# (\r\n) from pseudo-terminal in this case. Check stdout, it is full of b'\r\n'.
# FIXME: Signals may not work on Windows properly. Might be useful
# https://stefan.sofa-rockers.org/2013/08/15/handling-sub-process-hierarchies-python-linux-os-x/
is_running, _ = process_is_running(process)
if not is_running:
logger.info(
f'Process is not running, no need to terminate: {process_tuple}\r'
)
return
logger.info(f'Terminating: {process_tuple}\r')
# logger.info('OS: {}'.format(sys.platform))
sig = signal.CTRL_C_EVENT if sys.platform == 'win32' else signal.SIGINT
process.send_signal(sig)
for i in range(3):
time.sleep(1)
is_running, _ = process_is_running(process)
if not is_running:
logger.info(f'Terminated: {process_tuple}\r')
return
# TODO: (For future) Experiment with this more. If stransmit will not
# stop after several terminations, there is a problem, and kill() will
# hide this problem in this case.
# TODO: (!) There is a problem with tsp, it's actually not killed
# however process_is_running(process) becomes False
is_running, _ = process_is_running(process)
if is_running:
logger.info(f'Killing: {process_tuple}\r')
process.kill()
time.sleep(1)
is_running, _ = process_is_running(process)
if is_running:
raise ProcessHasNotBeenKilled(f'{name}, id: {process.pid}')
logger.info(f'Killed: {process_tuple}\r')
def start_tshark(
interface: str,
port: str,
results_dir: pathlib.Path,
filename: str,
start_via_ssh: bool=False,
ssh_username: typing.Optional[str]=None,
ssh_host: typing.Optional[str]=None
):
name = 'tshark'
logger.info(f'Starting on a local machine: {name}')
args = []
if start_via_ssh:
args += SSH_COMMON_ARGS
args += [f'{ssh_username}@{ssh_host}']
filepath = results_dir / filename
args += [
'tshark',
'-i', interface,
'-f', f'udp port {port}',
'-s', '1500',
'-w', filepath
]
process = create_process(name, args)
logger.info(f'Started successfully: {name}')
return (name, process)
def calculate_extra_time(sender_processes):
"""
Calculate extra time needed for senders to fininsh streaming.
Attributes:
sender_processes: List of processes tuples.
"""
extra_time = 0
for process_tuple in sender_processes:
is_running = True
_, process = process_tuple
while is_running:
is_running, _ = process_is_running(process)
if is_running:
time.sleep(1)
extra_time += 1
# logger.info(f'Extra time spent on streaming: {extra_time}')
return extra_time