Skip to content

Commit

Permalink
fix: avoid panic when upstream input is closed for lookup (risingwave…
Browse files Browse the repository at this point in the history
  • Loading branch information
yezizp2012 authored Mar 14, 2023
1 parent 338ace9 commit 0e93998
Showing 1 changed file with 5 additions and 4 deletions.
9 changes: 5 additions & 4 deletions src/stream/src/executor/lookup/sides.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use anyhow::Context;
use either::Either;
use futures::stream::PollNext;
use futures::StreamExt;
Expand Down Expand Up @@ -247,7 +248,7 @@ pub async fn stream_lookup_arrange_prev_epoch(
match input
.next()
.await
.expect("unexpected close of barrier aligner")?
.context("unexpected close of barrier aligner")??
{
Either::Left(Message::Watermark(_)) => {
todo!("https://github.com/risingwavelabs/risingwave/issues/6042")
Expand Down Expand Up @@ -298,7 +299,7 @@ pub async fn stream_lookup_arrange_this_epoch(
match input
.next()
.await
.expect("unexpected close of barrier aligner")?
.context("unexpected close of barrier aligner")??
{
Either::Left(Message::Chunk(msg)) => {
// Should wait until arrangement from this epoch is available.
Expand Down Expand Up @@ -333,7 +334,7 @@ pub async fn stream_lookup_arrange_this_epoch(
match input
.next()
.await
.expect("unexpected close of barrier aligner")?
.context("unexpected close of barrier aligner")??
{
Either::Left(Message::Chunk(msg)) => yield ArrangeMessage::Stream(msg),
Either::Left(Message::Barrier(b)) => {
Expand All @@ -355,7 +356,7 @@ pub async fn stream_lookup_arrange_this_epoch(
match input
.next()
.await
.expect("unexpected close of barrier aligner")?
.context("unexpected close of barrier aligner")??
{
Either::Left(_) => unreachable!(),
Either::Right(Message::Chunk(chunk)) => {
Expand Down

0 comments on commit 0e93998

Please sign in to comment.