Skip to content

Commit 5b8021a

Browse files
committed
fix batch create stream and make SetHostSocket thread safe
1 parent 3d16b5b commit 5b8021a

File tree

3 files changed

+14
-18
lines changed

3 files changed

+14
-18
lines changed

src/brpc/controller.cpp

+2-2
Original file line numberDiff line numberDiff line change
@@ -1437,12 +1437,12 @@ void Controller::HandleStreamConnection(Socket *host_socket) {
14371437
Stream* s = (Stream*)ptrs[0]->conn();
14381438
s->SetConnected(_remote_stream_settings);
14391439
if (stream_num > 1) {
1440-
const auto& extra_stream_ids = _remote_stream_settings->extra_stream_ids();
1440+
auto extra_stream_ids = std::move(*_remote_stream_settings->mutable_extra_stream_ids());
14411441
_remote_stream_settings->clear_extra_stream_ids();
14421442
for (size_t i = 1; i < stream_num; ++i) {
14431443
Stream* extra_stream = (Stream *) ptrs[i]->conn();
14441444
_remote_stream_settings->set_stream_id(extra_stream_ids[i - 1]);
1445-
s->ShareHostSocket(*extra_stream);
1445+
s->SetHostSocket(host_socket);
14461446
extra_stream->SetConnected(_remote_stream_settings);
14471447
}
14481448
}

src/brpc/stream.cpp

+10-15
Original file line numberDiff line numberDiff line change
@@ -640,17 +640,16 @@ void Stream::SendFeedback() {
640640
}
641641

642642
int Stream::SetHostSocket(Socket *host_socket) {
643-
if (_host_socket != NULL) {
644-
CHECK(false) << "SetHostSocket has already been called";
645-
return -1;
646-
}
647-
SocketUniquePtr ptr;
648-
host_socket->ReAddress(&ptr);
649-
// TODO add *this to host socke
650-
if (ptr->AddStream(id()) != 0) {
651-
return -1;
652-
}
653-
_host_socket = ptr.release();
643+
std::call_once(set_host_socket_flag, [this, host_socket]() {
644+
SocketUniquePtr ptr;
645+
host_socket->ReAddress(&ptr);
646+
// TODO add *this to host socke
647+
if (ptr->AddStream(id()) != 0) {
648+
CHECK(false) << id() << " fail to add stream to host socket";
649+
return -1;
650+
}
651+
_host_socket = ptr.release();
652+
});
654653
return 0;
655654
}
656655

@@ -710,10 +709,6 @@ void Stream::Close(int error_code, const char* reason_fmt, ...) {
710709
return TriggerOnConnectIfNeed();
711710
}
712711

713-
int Stream::ShareHostSocket(Stream& other_stream) {
714-
return other_stream.SetHostSocket(_host_socket);
715-
}
716-
717712
int Stream::SetFailed(StreamId id, int error_code, const char* reason_fmt, ...) {
718713
SocketUniquePtr ptr;
719714
if (Socket::AddressFailedAsWell(id, &ptr) == -1) {

src/brpc/stream_impl.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
#ifndef BRPC_STREAM_IMPL_H
2020
#define BRPC_STREAM_IMPL_H
2121

22+
#include <mutex>
2223
#include "bthread/bthread.h"
2324
#include "bthread/execution_queue.h"
2425
#include "brpc/socket.h"
@@ -67,7 +68,6 @@ class BAIDU_CACHELINE_ALIGNMENT Stream : public SocketConnection {
6768
__attribute__ ((__format__ (__printf__, 3, 4)));
6869
void Close(int error_code, const char* reason_fmt, ...)
6970
__attribute__ ((__format__ (__printf__, 3, 4)));
70-
int ShareHostSocket(Stream& other_stream);
7171

7272
private:
7373
friend void StreamWait(StreamId stream_id, const timespec *due_time,
@@ -134,6 +134,7 @@ friend struct butil::DefaultDeleter<Stream>;
134134
butil::IOBuf *_pending_buf;
135135
int64_t _start_idle_timer_us;
136136
bthread_timer_t _idle_timer;
137+
std::once_flag set_host_socket_flag;
137138
};
138139

139140
} // namespace brpc

0 commit comments

Comments
 (0)