-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
internal/grpcsync: Provide an internal-only pub-sub type API #6167
Conversation
@my4-dev : Thanks for doing this. |
and rename it to pubsub
I've finished incorporating our ideas. |
Co-authored-by: Easwar Swaminathan <easwars@google.com>
Co-authored-by: Easwar Swaminathan <easwars@google.com>
@easwars : Thank you for your suggestion. I've corrected. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the code is quite close to what I had in mind initially. I'll start reviewing the tests soon.
@arvindbr8 is out of town until when @easwars gets back so reassigning to @easwars. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Just some minor nits this time around.
internal/grpcsync/pubsub_test.go
Outdated
for i := 0; i < numPublished; i++ { | ||
select { | ||
case <-ts1.onMsgCh: | ||
case <-time.After(defaultTestShortTimeout): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This one should probably use the defaultTestTimeout
instead.
internal/grpcsync/pubsub_test.go
Outdated
// Register another subscriber and ensure that it receives the last published message. | ||
ts2 := newTestSubscriber(numPublished) | ||
pubsub.Subscribe(ts2) | ||
wantMsgs2 := []int{numPublished - 1} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be wantMsgs2 := wantMsgs1[len(wantMsgs1)-1:]
internal/grpcsync/pubsub_test.go
Outdated
for i := 0; i < numPublished; i++ { | ||
select { | ||
case <-ts1.onMsgCh: | ||
case <-time.After(defaultTestShortTimeout): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be defaultTestTimeout
and the same applies to the next goroutine. We generally use defaultTestShortTimeout
only in places where we are waiting for events which we expect to not happen in the test. For example, if we are not expecting a message to arrive from the server, we block for this short timeout and if we receive a message from the server in that timeframe, then we declare an error.
@easwars : Thank you for approving this PR! I've fixed |
@my4-dev : Thanks for taking care of all the comments. @arvindbr8 : Could you please be the second reviewer on this one. Thanks. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A few comments. Could you also please add comments to the test cases. It would really help with readability for other users.
}() | ||
|
||
wg.Wait() | ||
if isTimeout { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we fail later in the case of a timeout? I believe its better to t.Fatal whenever the reader goroutine times out and fail fast.
Consider the worst case where onMessage() callback is never invoked. We would have to wait for 10 * 10 ms for the test to fail, where as it could have failed in the first run.
Let me know if there was a different consideration for why we need the isTimeout boolean to check this.
Alternatively, you could use a t.Errorf() when a reader loop times out and then do this after the wg.Wait()
if t.Failed() {
t.FailNow()
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whatever we choose, there are trade-offs.
t.Fatal
If we invoke t.Fatal()
in the goroutine which is called by the main goroutine, the main isn't cancelled.
So, we can't use this in sub goroutine.
t.Errof
& t.Failed()
It is more like Golang than using boolean flag. But many error messages would be displayed if there is a timeout in loops and it would make logs difficult to read.
Could you please share your opinion about that🙇♂️
go func() { | ||
for i := 0; i < numPublished; i++ { | ||
select { | ||
case <-ts1.onMsgCh: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, we are trying to check the subscriber receieves the messages in the order they are published. And we know the message that is published in each iteration (which is i := range numPublished
). Why dont we just check for the message received here and fail if its not the one that is expected?
that way we can nix wantMsg1
and gotMsg1
variables and also fail faster.
I usually like the idea of always failing fast as possible in test assertions. Here lets say that the first message itself came out of order, we would have to wait until all the messages are received to fail the test.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case, the one sub goroutine is used to call Publish()
and the other is used to check to see if ts1.OnMessage()
function was called the correct number of times.
Both are processed asynchronously.
Also, if we tried to fail faster, it would be more complicated than before.
(I haven't come up with the idea to do yet.)
t.Fatalf("Received messages is %v, want %v", gotMsgs1, wantMsgs1) | ||
} | ||
|
||
// Register another subscriber and ensure that it receives the last published message. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
receives the last published message
-- which we know in this case if numPublished - 1
right? If you go by my comment above, we could also think about nixing wantMsgs2
right?
select { | ||
case <-ts1.onMsgCh: | ||
case <-time.After(defaultTestTimeout): | ||
errCh <- fmt.Errorf("") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since errCh is initialized with size 1, Wouldn't this be a blocking write if there are multiple timeouts since the channel is only read after wg.Wait()
?
I would recommend not using an errCh
but instead use t.Error()
or t.Fatal()
in this case? using t.Error
might help you check if both subscribers failed in this case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't this be a blocking write if there are multiple timeouts since the channel is only read after wg.Wait()?
This is correct! So we have to fix this problem.
But, as I said the above comment, I'm hesitating using t.Errorf()
or t.Fatalf()
.
} | ||
} | ||
|
||
func (s) TestPubSub_PublishMsgs_BeforeRegisterSub(t *testing.T) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this not already tests in the previous test case here? Maybe remove that check from the previous test case if you would like to have separate test cases for both
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They have different preconditions.
Previous one is tested by calling pubsub.Subscribe()
at the status that some test subsribers have already added to pubsub
.
The other one is tested at the status that no subscriber has added to pubsub
.
Considering above, should we remove either?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since the test cases are WAI. I'm approving it and taking my comments as an action item.
This PR adds an internal-only generic pub-sub type API.
Please refer to this conversation in #6036 .
RELEASE NOTES: none