Skip to content

Commit faf6ffd

Browse files
committed
Revert "msgq: refactor blocking recv for improved robustness and performance (#616)"
This reverts commit 5b3f2cd.
1 parent 5b3f2cd commit faf6ffd

File tree

5 files changed

+68
-78
lines changed

5 files changed

+68
-78
lines changed

SConscript

+2-3
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ msgq_objects = env.SharedObject([
1515
'msgq/msgq.cc',
1616
])
1717
msgq = env.Library('msgq', msgq_objects)
18-
msgq_python = envCython.Program('msgq/ipc_pyx.so', 'msgq/ipc_pyx.pyx', LIBS=envCython["LIBS"]+[msgq, "zmq", 'pthread', common])
18+
msgq_python = envCython.Program('msgq/ipc_pyx.so', 'msgq/ipc_pyx.pyx', LIBS=envCython["LIBS"]+[msgq, "zmq", common])
1919

2020
# Build Vision IPC
2121
vipc_files = ['visionipc.cc', 'visionipc_server.cc', 'visionipc_client.cc', 'visionbuf.cc']
@@ -31,7 +31,7 @@ visionipc = env.Library('visionipc', vipc_objects)
3131

3232

3333
vipc_frameworks = []
34-
vipc_libs = envCython["LIBS"] + [visionipc, msgq, common, "zmq", 'pthread']
34+
vipc_libs = envCython["LIBS"] + [visionipc, msgq, common, "zmq"]
3535
if arch == "Darwin":
3636
vipc_frameworks.append('OpenCL')
3737
else:
@@ -45,5 +45,4 @@ if GetOption('extras'):
4545
[f'{visionipc_dir.abspath}/test_runner.cc', f'{visionipc_dir.abspath}/visionipc_tests.cc'],
4646
LIBS=['pthread'] + vipc_libs, FRAMEWORKS=vipc_frameworks)
4747

48-
msgq = [msgq, 'pthread']
4948
Export('visionipc', 'msgq', 'msgq_python')

msgq/impl_msgq.cc

+59-45
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,20 @@
11
#include <cassert>
22
#include <cstring>
33
#include <iostream>
4-
#include <chrono>
4+
#include <cstdlib>
55
#include <csignal>
6+
#include <cerrno>
67

78
#include "msgq/impl_msgq.h"
89

9-
using namespace std::chrono;
10+
11+
volatile sig_atomic_t msgq_do_exit = 0;
12+
13+
void sig_handler(int signal) {
14+
assert(signal == SIGINT || signal == SIGTERM);
15+
msgq_do_exit = 1;
16+
}
17+
1018

1119
MSGQContext::MSGQContext() {
1220
}
@@ -62,55 +70,61 @@ int MSGQSubSocket::connect(Context *context, std::string endpoint, std::string a
6270
return 0;
6371
}
6472

65-
Message *MSGQSubSocket::receive(bool non_blocking) {
66-
msgq_msg_t msg{};
73+
74+
Message * MSGQSubSocket::receive(bool non_blocking){
75+
msgq_do_exit = 0;
76+
77+
void (*prev_handler_sigint)(int);
78+
void (*prev_handler_sigterm)(int);
79+
if (!non_blocking){
80+
prev_handler_sigint = std::signal(SIGINT, sig_handler);
81+
prev_handler_sigterm = std::signal(SIGTERM, sig_handler);
82+
}
83+
84+
msgq_msg_t msg;
85+
86+
MSGQMessage *r = NULL;
87+
6788
int rc = msgq_msg_recv(&msg, q);
6889

69-
if (rc == 0 && !non_blocking) {
70-
sigset_t mask;
71-
sigset_t old_mask;
72-
sigemptyset(&mask);
73-
sigaddset(&mask, SIGINT);
74-
sigaddset(&mask, SIGTERM);
75-
sigaddset(&mask, SIGUSR2); // notification from publisher
76-
77-
pthread_sigmask(SIG_BLOCK, &mask, &old_mask);
78-
79-
int64_t timeout_ns = ((timeout != -1) ? timeout : 100) * 1000000;
80-
auto start = steady_clock::now();
81-
82-
// Continue receiving messages until timeout or interruption by SIGINT or SIGTERM
83-
while (rc == 0 && timeout_ns > 0) {
84-
struct timespec ts {
85-
timeout_ns / 1000000000,
86-
timeout_ns % 1000000000,
87-
};
88-
89-
int ret = sigtimedwait(&mask, nullptr, &ts);
90-
if (ret == SIGINT || ret == SIGTERM) {
91-
// Ensure signal handling is not missed
92-
raise(ret);
93-
break;
94-
} else if (ret == -1 && errno == EAGAIN && timeout != -1) {
95-
break; // Timed out
96-
}
97-
98-
rc = msgq_msg_recv(&msg, q);
99-
100-
if (timeout != -1) {
101-
timeout_ns -= duration_cast<nanoseconds>(steady_clock::now() - start).count();
102-
start = steady_clock::now(); // Update start time
103-
}
90+
// Hack to implement blocking read with a poller. Don't use this
91+
while (!non_blocking && rc == 0 && msgq_do_exit == 0){
92+
msgq_pollitem_t items[1];
93+
items[0].q = q;
94+
95+
int t = (timeout != -1) ? timeout : 100;
96+
97+
int n = msgq_poll(items, 1, t);
98+
rc = msgq_msg_recv(&msg, q);
99+
100+
// The poll indicated a message was ready, but the receive failed. Try again
101+
if (n == 1 && rc == 0){
102+
continue;
103+
}
104+
105+
if (timeout != -1){
106+
break;
104107
}
105-
pthread_sigmask(SIG_SETMASK, &old_mask, nullptr);
106108
}
107109

108-
if (rc > 0) {
109-
MSGQMessage *r = new MSGQMessage;
110-
r->takeOwnership(msg.data, msg.size);
111-
return r;
110+
111+
if (!non_blocking){
112+
std::signal(SIGINT, prev_handler_sigint);
113+
std::signal(SIGTERM, prev_handler_sigterm);
112114
}
113-
return nullptr;
115+
116+
errno = msgq_do_exit ? EINTR : 0;
117+
118+
if (rc > 0){
119+
if (msgq_do_exit){
120+
msgq_msg_close(&msg); // Free unused message on exit
121+
} else {
122+
r = new MSGQMessage;
123+
r->takeOwnership(msg.data, msg.size);
124+
}
125+
}
126+
127+
return (Message*)r;
114128
}
115129

116130
void MSGQSubSocket::setTimeout(int t){

msgq/ipc.pxd

+1-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ cdef extern from "msgq/ipc.h":
5050
@staticmethod
5151
SubSocket * create()
5252
int connect(Context *, string, string, bool)
53-
Message * receive(bool) nogil
53+
Message * receive(bool)
5454
void setTimeout(int)
5555

5656
cdef cppclass PubSocket:

msgq/ipc_pyx.pyx

+6-3
Original file line numberDiff line numberDiff line change
@@ -196,11 +196,14 @@ cdef class SubSocket:
196196
self.socket.setTimeout(timeout)
197197

198198
def receive(self, bool non_blocking=False):
199-
cdef cppMessage *msg
200-
with nogil:
201-
msg = self.socket.receive(non_blocking)
199+
msg = self.socket.receive(non_blocking)
202200

203201
if msg == NULL:
202+
# If a blocking read returns no message check errno if SIGINT was caught in the C++ code
203+
if errno.errno == errno.EINTR:
204+
print("SIGINT received, exiting")
205+
sys.exit(1)
206+
204207
return None
205208
else:
206209
sz = msg.getSize()

msgq/tests/test_messaging.py

-26
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
11
import os
2-
import pytest
32
import random
4-
import signal
5-
import threading
63
import time
74
import string
85
import msgq
@@ -70,26 +67,3 @@ def test_receive_timeout(self):
7067
recvd = sub_sock.receive()
7168
assert (time.monotonic() - start_time) < 0.2
7269
assert recvd is None
73-
74-
def test_receive_interrupts_on_sigint(self):
75-
sock = random_sock()
76-
sub_sock = msgq.sub_sock(sock)
77-
sub_sock.setTimeout(1000)
78-
79-
# Send SIGINT after a short delay
80-
pid = os.getpid()
81-
def send_sigint():
82-
time.sleep(.5)
83-
os.kill(pid, signal.SIGINT)
84-
85-
# Start a thread to send SIGINT
86-
thread = threading.Thread(target=send_sigint)
87-
thread.start()
88-
89-
with pytest.raises(KeyboardInterrupt):
90-
start_time = time.monotonic()
91-
recvd = sub_sock.receive()
92-
assert (time.monotonic() - start_time) < 0.5
93-
assert recvd is None
94-
95-
thread.join()

0 commit comments

Comments
 (0)