Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Channel: drain/drainUpTo include pending puts #980

Merged
merged 14 commits into from
Jan 9, 2025

Conversation

johnhungerford
Copy link
Contributor

closes #978

johnhungerford and others added 6 commits January 6, 2025 14:27
Co-authored-by: Adam Hearn <22334119+hearnadam@users.noreply.github.com>
Co-authored-by: Adam Hearn <22334119+hearnadam@users.noreply.github.com>
Co-authored-by: Adam Hearn <22334119+hearnadam@users.noreply.github.com>
@johnhungerford johnhungerford marked this pull request as draft January 7, 2025 17:00
@johnhungerford
Copy link
Contributor Author

@fwbrasil @hearnadam this update produces a potential issue: when streaming from a channel, there is no guarantee that you will consume elements in order. In drainUpTo, between draining from the queue and taking from the pending puts, elements in pending puts can be flushed into the queue. This has resulted in flakiness in some older tests that assert that putting Seq(0 to n) will come out Seq(0 to n).

So I guess the question is do we care that order is not preserved?

@fwbrasil
Copy link
Collaborator

fwbrasil commented Jan 7, 2025

Maintaining ordering would be ideal. What if we had a cycle of queue.drainUpTo/flush/queue.drainUpTo/flush/.. until drainUpTo returns empty? It wouldn't handle the case where the channel/queue has capacity 0 but it might be ok. I've been thinking that we should have a separate impl for channels without capacity

@johnhungerford
Copy link
Contributor Author

Good idea, that works nicely.

@johnhungerford johnhungerford marked this pull request as ready for review January 7, 2025 17:55
@johnhungerford
Copy link
Contributor Author

johnhungerford commented Jan 7, 2025

If maintaining order is required, our current batched put implementation is also problematic. When a full batch does not fit in the queue, the remainder is sent to the back of puts. Is this something we want to rethink?

The only solution I see to this is to add a stack of Puts that has priority over the puts Queue, so that if an offer to the main queue fails during flush() it can be thrown on the stack and processed next instead of going to the back of the line. I think even this could lead to mis-ordering with concurrent consumers in some cases, however.

@fwbrasil
Copy link
Collaborator

fwbrasil commented Jan 7, 2025

More relaxed ordering when the queue becomes full and there are multiple concurrent puts seems more reasonable given that concurrent puts can arrive in any order. To ensure ordering even in this scenario, I think we'd need to have a single atomic operation instead of having to deal with both the main queue and the pending takes/puts queues, which I think wouldn't allow some of the fast paths we currently have in the implementation like put succeeding immediately if there's space in the queue. I'd say it's ok. The flushing logic can also change ordering under high contention (but it's very rare)

@fwbrasil fwbrasil merged commit ff73419 into getkyo:main Jan 9, 2025
3 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Channel drain and drainUpTo should consider pending puts
3 participants