Skip to content

Commit 5b3f2cd

Browse files
deanleesshane
andauthored
msgq: refactor blocking recv for improved robustness and performance (#616)
* improve blocking receive * Update msgq/impl_msgq.cc * 1s timeout * comment * typo --------- Co-authored-by: Shane Smiskol <shane@smiskol.com>
1 parent 5bb86f8 commit 5b3f2cd

File tree

5 files changed

+78
-68
lines changed

5 files changed

+78
-68
lines changed

SConscript

+3-2
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ msgq_objects = env.SharedObject([
1515
'msgq/msgq.cc',
1616
])
1717
msgq = env.Library('msgq', msgq_objects)
18-
msgq_python = envCython.Program('msgq/ipc_pyx.so', 'msgq/ipc_pyx.pyx', LIBS=envCython["LIBS"]+[msgq, "zmq", common])
18+
msgq_python = envCython.Program('msgq/ipc_pyx.so', 'msgq/ipc_pyx.pyx', LIBS=envCython["LIBS"]+[msgq, "zmq", 'pthread', common])
1919

2020
# Build Vision IPC
2121
vipc_files = ['visionipc.cc', 'visionipc_server.cc', 'visionipc_client.cc', 'visionbuf.cc']
@@ -31,7 +31,7 @@ visionipc = env.Library('visionipc', vipc_objects)
3131

3232

3333
vipc_frameworks = []
34-
vipc_libs = envCython["LIBS"] + [visionipc, msgq, common, "zmq"]
34+
vipc_libs = envCython["LIBS"] + [visionipc, msgq, common, "zmq", 'pthread']
3535
if arch == "Darwin":
3636
vipc_frameworks.append('OpenCL')
3737
else:
@@ -45,4 +45,5 @@ if GetOption('extras'):
4545
[f'{visionipc_dir.abspath}/test_runner.cc', f'{visionipc_dir.abspath}/visionipc_tests.cc'],
4646
LIBS=['pthread'] + vipc_libs, FRAMEWORKS=vipc_frameworks)
4747

48+
msgq = [msgq, 'pthread']
4849
Export('visionipc', 'msgq', 'msgq_python')

msgq/impl_msgq.cc

+45-59
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,12 @@
11
#include <cassert>
22
#include <cstring>
33
#include <iostream>
4-
#include <cstdlib>
4+
#include <chrono>
55
#include <csignal>
6-
#include <cerrno>
76

87
#include "msgq/impl_msgq.h"
98

10-
11-
volatile sig_atomic_t msgq_do_exit = 0;
12-
13-
void sig_handler(int signal) {
14-
assert(signal == SIGINT || signal == SIGTERM);
15-
msgq_do_exit = 1;
16-
}
17-
9+
using namespace std::chrono;
1810

1911
MSGQContext::MSGQContext() {
2012
}
@@ -70,61 +62,55 @@ int MSGQSubSocket::connect(Context *context, std::string endpoint, std::string a
7062
return 0;
7163
}
7264

73-
74-
Message * MSGQSubSocket::receive(bool non_blocking){
75-
msgq_do_exit = 0;
76-
77-
void (*prev_handler_sigint)(int);
78-
void (*prev_handler_sigterm)(int);
79-
if (!non_blocking){
80-
prev_handler_sigint = std::signal(SIGINT, sig_handler);
81-
prev_handler_sigterm = std::signal(SIGTERM, sig_handler);
82-
}
83-
84-
msgq_msg_t msg;
85-
86-
MSGQMessage *r = NULL;
87-
65+
Message *MSGQSubSocket::receive(bool non_blocking) {
66+
msgq_msg_t msg{};
8867
int rc = msgq_msg_recv(&msg, q);
8968

90-
// Hack to implement blocking read with a poller. Don't use this
91-
while (!non_blocking && rc == 0 && msgq_do_exit == 0){
92-
msgq_pollitem_t items[1];
93-
items[0].q = q;
94-
95-
int t = (timeout != -1) ? timeout : 100;
96-
97-
int n = msgq_poll(items, 1, t);
98-
rc = msgq_msg_recv(&msg, q);
99-
100-
// The poll indicated a message was ready, but the receive failed. Try again
101-
if (n == 1 && rc == 0){
102-
continue;
103-
}
104-
105-
if (timeout != -1){
106-
break;
69+
if (rc == 0 && !non_blocking) {
70+
sigset_t mask;
71+
sigset_t old_mask;
72+
sigemptyset(&mask);
73+
sigaddset(&mask, SIGINT);
74+
sigaddset(&mask, SIGTERM);
75+
sigaddset(&mask, SIGUSR2); // notification from publisher
76+
77+
pthread_sigmask(SIG_BLOCK, &mask, &old_mask);
78+
79+
int64_t timeout_ns = ((timeout != -1) ? timeout : 100) * 1000000;
80+
auto start = steady_clock::now();
81+
82+
// Continue receiving messages until timeout or interruption by SIGINT or SIGTERM
83+
while (rc == 0 && timeout_ns > 0) {
84+
struct timespec ts {
85+
timeout_ns / 1000000000,
86+
timeout_ns % 1000000000,
87+
};
88+
89+
int ret = sigtimedwait(&mask, nullptr, &ts);
90+
if (ret == SIGINT || ret == SIGTERM) {
91+
// Ensure signal handling is not missed
92+
raise(ret);
93+
break;
94+
} else if (ret == -1 && errno == EAGAIN && timeout != -1) {
95+
break; // Timed out
96+
}
97+
98+
rc = msgq_msg_recv(&msg, q);
99+
100+
if (timeout != -1) {
101+
timeout_ns -= duration_cast<nanoseconds>(steady_clock::now() - start).count();
102+
start = steady_clock::now(); // Update start time
103+
}
107104
}
105+
pthread_sigmask(SIG_SETMASK, &old_mask, nullptr);
108106
}
109107

110-
111-
if (!non_blocking){
112-
std::signal(SIGINT, prev_handler_sigint);
113-
std::signal(SIGTERM, prev_handler_sigterm);
114-
}
115-
116-
errno = msgq_do_exit ? EINTR : 0;
117-
118-
if (rc > 0){
119-
if (msgq_do_exit){
120-
msgq_msg_close(&msg); // Free unused message on exit
121-
} else {
122-
r = new MSGQMessage;
123-
r->takeOwnership(msg.data, msg.size);
124-
}
108+
if (rc > 0) {
109+
MSGQMessage *r = new MSGQMessage;
110+
r->takeOwnership(msg.data, msg.size);
111+
return r;
125112
}
126-
127-
return (Message*)r;
113+
return nullptr;
128114
}
129115

130116
void MSGQSubSocket::setTimeout(int t){

msgq/ipc.pxd

+1-1
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ cdef extern from "msgq/ipc.h":
5050
@staticmethod
5151
SubSocket * create()
5252
int connect(Context *, string, string, bool)
53-
Message * receive(bool)
53+
Message * receive(bool) nogil
5454
void setTimeout(int)
5555

5656
cdef cppclass PubSocket:

msgq/ipc_pyx.pyx

+3-6
Original file line numberDiff line numberDiff line change
@@ -196,14 +196,11 @@ cdef class SubSocket:
196196
self.socket.setTimeout(timeout)
197197

198198
def receive(self, bool non_blocking=False):
199-
msg = self.socket.receive(non_blocking)
199+
cdef cppMessage *msg
200+
with nogil:
201+
msg = self.socket.receive(non_blocking)
200202

201203
if msg == NULL:
202-
# If a blocking read returns no message check errno if SIGINT was caught in the C++ code
203-
if errno.errno == errno.EINTR:
204-
print("SIGINT received, exiting")
205-
sys.exit(1)
206-
207204
return None
208205
else:
209206
sz = msg.getSize()

msgq/tests/test_messaging.py

+26
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
import os
2+
import pytest
23
import random
4+
import signal
5+
import threading
36
import time
47
import string
58
import msgq
@@ -67,3 +70,26 @@ def test_receive_timeout(self):
6770
recvd = sub_sock.receive()
6871
assert (time.monotonic() - start_time) < 0.2
6972
assert recvd is None
73+
74+
def test_receive_interrupts_on_sigint(self):
75+
sock = random_sock()
76+
sub_sock = msgq.sub_sock(sock)
77+
sub_sock.setTimeout(1000)
78+
79+
# Send SIGINT after a short delay
80+
pid = os.getpid()
81+
def send_sigint():
82+
time.sleep(.5)
83+
os.kill(pid, signal.SIGINT)
84+
85+
# Start a thread to send SIGINT
86+
thread = threading.Thread(target=send_sigint)
87+
thread.start()
88+
89+
with pytest.raises(KeyboardInterrupt):
90+
start_time = time.monotonic()
91+
recvd = sub_sock.receive()
92+
assert (time.monotonic() - start_time) < 0.5
93+
assert recvd is None
94+
95+
thread.join()

0 commit comments

Comments
 (0)