diff --git a/src/js_stream.cc b/src/js_stream.cc index 7fcdfd9a94d9fd..09c4f58b96230e 100644 --- a/src/js_stream.cc +++ b/src/js_stream.cc @@ -71,6 +71,7 @@ int JSStream::DoShutdown(ShutdownWrap* req_wrap) { req_wrap->object() }; + req_wrap->Dispatched(); Local res = MakeCallback(env()->onshutdown_string(), ARRAY_SIZE(argv), argv); @@ -95,6 +96,7 @@ int JSStream::DoWrite(WriteWrap* w, bufs_arr }; + w->Dispatched(); Local res = MakeCallback(env()->onwrite_string(), ARRAY_SIZE(argv), argv); diff --git a/src/stream_base.cc b/src/stream_base.cc index 3a9f30f279bd2c..b2518404a8fe62 100644 --- a/src/stream_base.cc +++ b/src/stream_base.cc @@ -60,7 +60,6 @@ int StreamBase::Shutdown(const FunctionCallbackInfo& args) { AfterShutdown); int err = DoShutdown(req_wrap); - req_wrap->Dispatched(); if (err) delete req_wrap; return err; @@ -181,7 +180,6 @@ int StreamBase::Writev(const FunctionCallbackInfo& args) { if (bufs != bufs_) delete[] bufs; - req_wrap->Dispatched(); req_wrap->object()->Set(env->async(), True(env->isolate())); req_wrap->object()->Set(env->bytes_string(), Number::New(env->isolate(), bytes)); @@ -228,7 +226,6 @@ int StreamBase::WriteBuffer(const FunctionCallbackInfo& args) { req_wrap = WriteWrap::New(env, req_wrap_obj, this, AfterWrite); err = DoWrite(req_wrap, bufs, count, nullptr); - req_wrap->Dispatched(); req_wrap_obj->Set(env->async(), True(env->isolate())); if (err) @@ -347,7 +344,6 @@ int StreamBase::WriteString(const FunctionCallbackInfo& args) { reinterpret_cast(send_handle)); } - req_wrap->Dispatched(); req_wrap->object()->Set(env->async(), True(env->isolate())); if (err) diff --git a/src/stream_wrap.cc b/src/stream_wrap.cc index be5757d4a60bd1..540639d458050c 100644 --- a/src/stream_wrap.cc +++ b/src/stream_wrap.cc @@ -279,7 +279,10 @@ void StreamWrap::SetBlocking(const FunctionCallbackInfo& args) { int StreamWrap::DoShutdown(ShutdownWrap* req_wrap) { - return uv_shutdown(&req_wrap->req_, stream(), AfterShutdown); + int err; + err = uv_shutdown(&req_wrap->req_, stream(), AfterShutdown); + req_wrap->Dispatched(); + return err; } @@ -353,6 +356,7 @@ int StreamWrap::DoWrite(WriteWrap* w, } } + w->Dispatched(); UpdateWriteQueueSize(); return r; diff --git a/src/tls_wrap.cc b/src/tls_wrap.cc index c774a8490b54f2..b4d5a70d412573 100644 --- a/src/tls_wrap.cc +++ b/src/tls_wrap.cc @@ -109,7 +109,22 @@ bool TLSWrap::InvokeQueued(int status) { WriteItemList queue; pending_write_items_.MoveBack(&queue); while (WriteItem* wi = queue.PopFront()) { - wi->w_->Done(status); + if (wi->async_) { + wi->w_->Done(status); + } else { + CheckWriteItem* check = new CheckWriteItem(wi->w_, status); + int err = uv_check_init(env()->event_loop(), &check->check_); + check->check_.data = check; + if (err == 0) + err = uv_check_start(&check->check_, CheckWriteItem::CheckCb); + + // No luck today, do it on next InvokeQueued + if (err != 0) { + delete check; + pending_write_items_.PushBack(wi); + continue; + } + } delete wi; } @@ -117,6 +132,15 @@ bool TLSWrap::InvokeQueued(int status) { } +void TLSWrap::CheckWriteItem::CheckCb(uv_check_t* check) { + CheckWriteItem* c = reinterpret_cast(check->data); + + c->w_->Done(c->status_); + uv_close(reinterpret_cast(check), nullptr); + delete c; +} + + void TLSWrap::NewSessionDoneCb() { Cycle(); } @@ -306,7 +330,6 @@ void TLSWrap::EncOut() { for (size_t i = 0; i < count; i++) buf[i] = uv_buf_init(data[i], size[i]); int err = stream_->DoWrite(write_req, buf, count, nullptr); - write_req->Dispatched(); // Ignore errors, this should be already handled in js if (err) { @@ -557,7 +580,10 @@ int TLSWrap::DoWrite(WriteWrap* w, } // Queue callback to execute it on next tick - write_item_queue_.PushBack(new WriteItem(w)); + WriteItem* item = new WriteItem(w); + WriteItem::SyncScope item_async(item); + write_item_queue_.PushBack(item); + w->Dispatched(); // Write queued data if (empty) { diff --git a/src/tls_wrap.h b/src/tls_wrap.h index 9f095355bb58bd..65e270c6f15f89 100644 --- a/src/tls_wrap.h +++ b/src/tls_wrap.h @@ -65,16 +65,46 @@ class TLSWrap : public crypto::SSLWrap, // Write callback queue's item class WriteItem { public: - explicit WriteItem(WriteWrap* w) : w_(w) { + class SyncScope { + public: + explicit SyncScope(WriteItem* item) : item_(item) { + item_->async_ = false; + } + ~SyncScope() { + item_->async_ = true; + } + + private: + WriteItem* item_; + }; + + explicit WriteItem(WriteWrap* w) : w_(w), async_(false) { } ~WriteItem() { w_ = nullptr; } WriteWrap* w_; + bool async_; ListNode member_; }; + class CheckWriteItem { + public: + CheckWriteItem(WriteWrap* w, int status) : w_(w), status_(status) { + } + + ~CheckWriteItem() { + w_ = nullptr; + } + + static void CheckCb(uv_check_t* check); + + WriteWrap* w_; + int status_; + uv_check_t check_; + }; + TLSWrap(Environment* env, Kind kind, StreamBase* stream,