Skip to content

Commit 3a64e80

Browse files
committed
Merge remote-tracking branch 'upstream/master' into cereal_fix_signal
2 parents 65b48b2 + 5bb86f8 commit 3a64e80

10 files changed

+62
-97
lines changed

msgq/impl_zmq.cc

+11-2
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,19 @@
77

88
#include "msgq/impl_zmq.h"
99

10+
static size_t fnv1a_hash(const std::string &str) {
11+
const size_t fnv_prime = 0x100000001b3;
12+
size_t hash_value = 0xcbf29ce484222325;
13+
for (char c : str) {
14+
hash_value ^= (unsigned char)c;
15+
hash_value *= fnv_prime;
16+
}
17+
return hash_value;
18+
}
19+
1020
//FIXME: This is a hack to get the port number from the socket name, might have collisions
1121
static int get_port(std::string endpoint) {
12-
std::hash<std::string> hasher;
13-
size_t hash_value = hasher(endpoint);
22+
size_t hash_value = fnv1a_hash(endpoint);
1423
int start_port = 8023;
1524
int max_port = 65535;
1625
int port = start_port + (hash_value % (max_port - start_port));

msgq/visionipc/tests/test_visionipc.py

+2-11
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,10 @@ def zmq_sleep(t=1):
1111

1212
class TestVisionIpc:
1313

14-
def setup_vipc(self, name, *stream_types, num_buffers=1, rgb=False, width=100, height=100, conflate=False):
14+
def setup_vipc(self, name, *stream_types, num_buffers=1, width=100, height=100, conflate=False):
1515
self.server = VisionIpcServer(name)
1616
for stream_type in stream_types:
17-
self.server.create_buffers(stream_type, num_buffers, rgb, width, height)
17+
self.server.create_buffers(stream_type, num_buffers, width, height)
1818
self.server.start_listener()
1919

2020
if len(stream_types):
@@ -51,15 +51,6 @@ def test_buffers(self):
5151
del self.client
5252
del self.server
5353

54-
def test_yuv_rgb(self):
55-
_, client_yuv = self.setup_vipc("camerad", VisionStreamType.VISION_STREAM_ROAD, rgb=False)
56-
_, client_rgb = self.setup_vipc("navd", VisionStreamType.VISION_STREAM_MAP, rgb=True)
57-
assert client_rgb.rgb
58-
assert not client_yuv.rgb
59-
del client_yuv
60-
del client_rgb
61-
del self.server
62-
6354
def test_send_single_buffer(self):
6455
self.setup_vipc("camerad", VisionStreamType.VISION_STREAM_ROAD)
6556

msgq/visionipc/visionbuf.cc

-15
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,6 @@
11
#include "msgq/visionipc/visionbuf.h"
22

3-
#define ALIGN(x, align) (((x) + (align)-1) & ~((align)-1))
4-
5-
void visionbuf_compute_aligned_width_and_height(int width, int height, int *aligned_w, int *aligned_h) {
6-
*aligned_w = width;
7-
*aligned_h = height;
8-
}
9-
10-
void VisionBuf::init_rgb(size_t init_width, size_t init_height, size_t init_stride) {
11-
this->rgb = true;
12-
this->width = init_width;
13-
this->height = init_height;
14-
this->stride = init_stride;
15-
}
16-
173
void VisionBuf::init_yuv(size_t init_width, size_t init_height, size_t init_stride, size_t init_uv_offset){
18-
this->rgb = false;
194
this->width = init_width;
205
this->height = init_height;
216
this->stride = init_stride;

msgq/visionipc/visionbuf.h

-4
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ class VisionBuf {
2929
uint64_t *frame_id;
3030
int fd = 0;
3131

32-
bool rgb = false;
3332
size_t width = 0;
3433
size_t height = 0;
3534
size_t stride = 0;
@@ -54,13 +53,10 @@ class VisionBuf {
5453
void allocate(size_t len);
5554
void import();
5655
void init_cl(cl_device_id device_id, cl_context ctx);
57-
void init_rgb(size_t width, size_t height, size_t stride);
5856
void init_yuv(size_t width, size_t height, size_t stride, size_t uv_offset);
5957
int sync(int dir);
6058
int free();
6159

6260
void set_frame_id(uint64_t id);
6361
uint64_t get_frame_id();
6462
};
65-
66-
void visionbuf_compute_aligned_width_and_height(int width, int height, int *aligned_w, int *aligned_h);

msgq/visionipc/visionipc.pxd

+2-3
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ cdef extern from "msgq/visionipc/visionbuf.h":
2121

2222
cdef cppclass VisionBuf:
2323
void * addr
24-
bool rgb
2524
size_t len
2625
size_t width
2726
size_t height
@@ -42,8 +41,8 @@ cdef extern from "msgq/visionipc/visionipc_server.h":
4241

4342
cdef cppclass VisionIpcServer:
4443
VisionIpcServer(string, void*, void*)
45-
void create_buffers(VisionStreamType, size_t, bool, size_t, size_t)
46-
void create_buffers_with_sizes(VisionStreamType, size_t, bool, size_t, size_t, size_t, size_t, size_t)
44+
void create_buffers(VisionStreamType, size_t, size_t, size_t)
45+
void create_buffers_with_sizes(VisionStreamType, size_t, size_t, size_t, size_t, size_t, size_t)
4746
VisionBuf * get_buffer(VisionStreamType)
4847
void send(VisionBuf *, VisionIpcBufExtra *, bool)
4948
void start_listener()

msgq/visionipc/visionipc_client.cc

+17-8
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@
77
#include "msgq/visionipc/visionipc.h"
88
#include "msgq/visionipc/visionipc_client.h"
99
#include "msgq/visionipc/visionipc_server.h"
10-
#include "logger/logger.h"
11-
#include "logger/logger.h"
10+
#include "msgq/logger/logger.h"
11+
#include "msgq/logger/logger.h"
1212

1313
static int connect_to_vipc_server(const std::string &name, bool blocking) {
1414
const std::string ipc_path = get_ipc_path(name);
@@ -54,6 +54,12 @@ bool VisionIpcClient::connect(bool blocking){
5454
int fds[VISIONIPC_MAX_FDS];
5555
VisionBuf bufs[VISIONIPC_MAX_FDS];
5656
r = ipc_sendrecv_with_fds(false, socket_fd, &bufs, sizeof(bufs), fds, VISIONIPC_MAX_FDS, &num_buffers);
57+
if (r < 0) {
58+
// only expected error is server shutting down
59+
assert(errno == ECONNRESET);
60+
close(socket_fd);
61+
return false;
62+
}
5763

5864
assert(num_buffers >= 0);
5965
assert(r == sizeof(VisionBuf) * num_buffers);
@@ -63,11 +69,7 @@ bool VisionIpcClient::connect(bool blocking){
6369
buffers[i] = bufs[i];
6470
buffers[i].fd = fds[i];
6571
buffers[i].import();
66-
if (buffers[i].rgb) {
67-
buffers[i].init_rgb(buffers[i].width, buffers[i].height, buffers[i].stride);
68-
} else {
69-
buffers[i].init_yuv(buffers[i].width, buffers[i].height, buffers[i].stride, buffers[i].uv_offset);
70-
}
72+
buffers[i].init_yuv(buffers[i].width, buffers[i].height, buffers[i].stride, buffers[i].uv_offset);
7173

7274
if (device_id) buffers[i].init_cl(device_id, ctx);
7375
}
@@ -126,7 +128,14 @@ std::set<VisionStreamType> VisionIpcClient::getAvailableStreams(const std::strin
126128

127129
VisionStreamType available_streams[VISION_STREAM_MAX] = {};
128130
r = ipc_sendrecv_with_fds(false, socket_fd, &available_streams, sizeof(available_streams), nullptr, 0, nullptr);
129-
assert((r >= 0) && (r % sizeof(VisionStreamType) == 0));
131+
if (r < 0) {
132+
// only expected error is server shutting down
133+
assert(errno == ECONNRESET);
134+
close(socket_fd);
135+
return {};
136+
}
137+
138+
assert(r % sizeof(VisionStreamType) == 0);
130139
close(socket_fd);
131140
return std::set<VisionStreamType>(available_streams, available_streams + r / sizeof(VisionStreamType));
132141
}

msgq/visionipc/visionipc_pyx.pyx

+4-12
Original file line numberDiff line numberDiff line change
@@ -55,22 +55,18 @@ cdef class VisionBuf:
5555
def uv_offset(self):
5656
return self.buf.uv_offset
5757

58-
@property
59-
def rgb(self):
60-
return self.buf.rgb
61-
6258

6359
cdef class VisionIpcServer:
6460
cdef cppVisionIpcServer * server
6561

6662
def __init__(self, string name):
6763
self.server = new cppVisionIpcServer(name, NULL, NULL)
6864

69-
def create_buffers(self, VisionStreamType tp, size_t num_buffers, bool rgb, size_t width, size_t height):
70-
self.server.create_buffers(tp, num_buffers, rgb, width, height)
65+
def create_buffers(self, VisionStreamType tp, size_t num_buffers, size_t width, size_t height):
66+
self.server.create_buffers(tp, num_buffers, width, height)
7167

72-
def create_buffers_with_sizes(self, VisionStreamType tp, size_t num_buffers, bool rgb, size_t width, size_t height, size_t size, size_t stride, size_t uv_offset):
73-
self.server.create_buffers_with_sizes(tp, num_buffers, rgb, width, height, size, stride, uv_offset)
68+
def create_buffers_with_sizes(self, VisionStreamType tp, size_t num_buffers, size_t width, size_t height, size_t size, size_t stride, size_t uv_offset):
69+
self.server.create_buffers_with_sizes(tp, num_buffers, width, height, size, stride, uv_offset)
7470

7571
def send(self, VisionStreamType tp, const unsigned char[:] data, uint32_t frame_id=0, uint64_t timestamp_sof=0, uint64_t timestamp_eof=0):
7672
cdef cppVisionBuf * buf = self.server.get_buffer(tp)
@@ -123,10 +119,6 @@ cdef class VisionIpcClient:
123119
def uv_offset(self):
124120
return self.client.buffers[0].uv_offset if self.client.num_buffers else None
125121

126-
@property
127-
def rgb(self):
128-
return self.client.buffers[0].rgb if self.client.num_buffers else None
129-
130122
@property
131123
def buffer_len(self):
132124
return self.client.buffers[0].len if self.client.num_buffers else None

msgq/visionipc/visionipc_server.cc

+16-17
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
#include "msgq/ipc.h"
1212
#include "msgq/visionipc/visionipc.h"
1313
#include "msgq/visionipc/visionipc_server.h"
14-
#include "logger/logger.h"
14+
#include "msgq/logger/logger.h"
1515

1616
std::string get_endpoint_name(std::string name, VisionStreamType type){
1717
if (messaging_use_zmq()){
@@ -38,29 +38,22 @@ VisionIpcServer::VisionIpcServer(std::string name, cl_device_id device_id, cl_co
3838
server_id = distribution(rd);
3939
}
4040

41-
void VisionIpcServer::create_buffers(VisionStreamType type, size_t num_buffers, bool rgb, size_t width, size_t height){
41+
void VisionIpcServer::create_buffers(VisionStreamType type, size_t num_buffers, size_t width, size_t height){
4242
// TODO: assert that this type is not created yet
4343
assert(num_buffers < VISIONIPC_MAX_FDS);
44-
int aligned_w = 0, aligned_h = 0;
4544

4645
size_t size = 0;
4746
size_t stride = 0;
4847
size_t uv_offset = 0;
4948

50-
if (rgb) {
51-
visionbuf_compute_aligned_width_and_height(width, height, &aligned_w, &aligned_h);
52-
size = (size_t)aligned_w * (size_t)aligned_h * 3;
53-
stride = aligned_w * 3;
54-
} else {
55-
size = width * height * 3 / 2;
56-
stride = width;
57-
uv_offset = width * height;
58-
}
49+
size = width * height * 3 / 2;
50+
stride = width;
51+
uv_offset = width * height;
5952

60-
create_buffers_with_sizes(type, num_buffers, rgb, width, height, size, stride, uv_offset);
53+
create_buffers_with_sizes(type, num_buffers, width, height, size, stride, uv_offset);
6154
}
6255

63-
void VisionIpcServer::create_buffers_with_sizes(VisionStreamType type, size_t num_buffers, bool rgb, size_t width, size_t height, size_t size, size_t stride, size_t uv_offset) {
56+
void VisionIpcServer::create_buffers_with_sizes(VisionStreamType type, size_t num_buffers, size_t width, size_t height, size_t size, size_t stride, size_t uv_offset) {
6457
// Create map + alloc requested buffers
6558
for (size_t i = 0; i < num_buffers; i++){
6659
VisionBuf* buf = new VisionBuf();
@@ -70,7 +63,7 @@ void VisionIpcServer::create_buffers_with_sizes(VisionStreamType type, size_t nu
7063

7164
if (device_id) buf->init_cl(device_id, ctx);
7265

73-
rgb ? buf->init_rgb(width, height, stride) : buf->init_yuv(width, height, stride, uv_offset);
66+
buf->init_yuv(width, height, stride, uv_offset);
7467

7568
buffers[type].push_back(buf);
7669
}
@@ -167,11 +160,17 @@ void VisionIpcServer::listener(){
167160

168161

169162

170-
VisionBuf * VisionIpcServer::get_buffer(VisionStreamType type){
163+
VisionBuf * VisionIpcServer::get_buffer(VisionStreamType type, int idx){
171164
// Do we want to keep track if the buffer has been sent out yet and warn user?
172165
assert(buffers.count(type));
173166
auto b = buffers[type];
174-
return b[cur_idx[type]++ % b.size()];
167+
if (idx < 0) {
168+
idx = cur_idx[type]++ % b.size();
169+
} else {
170+
assert(idx < b.size() && idx >= 0);
171+
cur_idx[type] = idx;
172+
}
173+
return b[idx];
175174
}
176175

177176
void VisionIpcServer::send(VisionBuf * buf, VisionIpcBufExtra * extra, bool sync){

msgq/visionipc/visionipc_server.h

+3-3
Original file line numberDiff line numberDiff line change
@@ -33,10 +33,10 @@ class VisionIpcServer {
3333
VisionIpcServer(std::string name, cl_device_id device_id=nullptr, cl_context ctx=nullptr);
3434
~VisionIpcServer();
3535

36-
VisionBuf * get_buffer(VisionStreamType type);
36+
VisionBuf * get_buffer(VisionStreamType type, int idx = -1);
3737

38-
void create_buffers(VisionStreamType type, size_t num_buffers, bool rgb, size_t width, size_t height);
39-
void create_buffers_with_sizes(VisionStreamType type, size_t num_buffers, bool rgb, size_t width, size_t height, size_t size, size_t stride, size_t uv_offset);
38+
void create_buffers(VisionStreamType type, size_t num_buffers, size_t width, size_t height);
39+
void create_buffers_with_sizes(VisionStreamType type, size_t num_buffers, size_t width, size_t height, size_t size, size_t stride, size_t uv_offset);
4040
void send(VisionBuf * buf, VisionIpcBufExtra * extra, bool sync=true);
4141
void start_listener();
4242
};

msgq/visionipc/visionipc_tests.cc

+7-22
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ static void zmq_sleep(int milliseconds=1000){
1515

1616
TEST_CASE("Connecting"){
1717
VisionIpcServer server("camerad");
18-
server.create_buffers(VISION_STREAM_ROAD, 1, false, 100, 100);
18+
server.create_buffers(VISION_STREAM_ROAD, 1, 100, 100);
1919
server.start_listener();
2020

2121
VisionIpcClient client = VisionIpcClient("camerad", VISION_STREAM_ROAD, false);
@@ -26,8 +26,8 @@ TEST_CASE("Connecting"){
2626

2727
TEST_CASE("getAvailableStreams"){
2828
VisionIpcServer server("camerad");
29-
server.create_buffers(VISION_STREAM_ROAD, 1, false, 100, 100);
30-
server.create_buffers(VISION_STREAM_WIDE_ROAD, 1, false, 100, 100);
29+
server.create_buffers(VISION_STREAM_ROAD, 1, 100, 100);
30+
server.create_buffers(VISION_STREAM_WIDE_ROAD, 1, 100, 100);
3131
server.start_listener();
3232
auto available_streams = VisionIpcClient::getAvailableStreams("camerad");
3333
REQUIRE(available_streams.size() == 2);
@@ -38,7 +38,7 @@ TEST_CASE("getAvailableStreams"){
3838
TEST_CASE("Check buffers"){
3939
size_t width = 100, height = 200, num_buffers = 5;
4040
VisionIpcServer server("camerad");
41-
server.create_buffers(VISION_STREAM_ROAD, num_buffers, false, width, height);
41+
server.create_buffers(VISION_STREAM_ROAD, num_buffers, width, height);
4242
server.start_listener();
4343

4444
VisionIpcClient client = VisionIpcClient("camerad", VISION_STREAM_ROAD, false);
@@ -50,24 +50,9 @@ TEST_CASE("Check buffers"){
5050
REQUIRE(client.num_buffers == num_buffers);
5151
}
5252

53-
TEST_CASE("Check yuv/rgb"){
54-
VisionIpcServer server("camerad");
55-
server.create_buffers(VISION_STREAM_ROAD, 1, false, 100, 100);
56-
server.create_buffers(VISION_STREAM_MAP, 1, true, 100, 100);
57-
server.start_listener();
58-
59-
VisionIpcClient client_yuv = VisionIpcClient("camerad", VISION_STREAM_ROAD, false);
60-
VisionIpcClient client_rgb = VisionIpcClient("camerad", VISION_STREAM_MAP, false);
61-
client_yuv.connect();
62-
client_rgb.connect();
63-
64-
REQUIRE(client_rgb.buffers[0].rgb == true);
65-
REQUIRE(client_yuv.buffers[0].rgb == false);
66-
}
67-
6853
TEST_CASE("Send single buffer"){
6954
VisionIpcServer server("camerad");
70-
server.create_buffers(VISION_STREAM_ROAD, 1, true, 100, 100);
55+
server.create_buffers(VISION_STREAM_ROAD, 1, 100, 100);
7156
server.start_listener();
7257

7358
VisionIpcClient client = VisionIpcClient("camerad", VISION_STREAM_ROAD, false);
@@ -96,7 +81,7 @@ TEST_CASE("Send single buffer"){
9681

9782
TEST_CASE("Test no conflate"){
9883
VisionIpcServer server("camerad");
99-
server.create_buffers(VISION_STREAM_ROAD, 1, true, 100, 100);
84+
server.create_buffers(VISION_STREAM_ROAD, 1, 100, 100);
10085
server.start_listener();
10186

10287
VisionIpcClient client = VisionIpcClient("camerad", VISION_STREAM_ROAD, false);
@@ -124,7 +109,7 @@ TEST_CASE("Test no conflate"){
124109

125110
TEST_CASE("Test conflate"){
126111
VisionIpcServer server("camerad");
127-
server.create_buffers(VISION_STREAM_ROAD, 1, true, 100, 100);
112+
server.create_buffers(VISION_STREAM_ROAD, 1, 100, 100);
128113
server.start_listener();
129114

130115
VisionIpcClient client = VisionIpcClient("camerad", VISION_STREAM_ROAD, true);

0 commit comments

Comments
 (0)