Skip to content

Commit f0e6da1

Browse files
committed
fix: fix fully flushed region open in kakfa wal (#1061)
## Rationale Now when opening a fully flushed region, it may hang forever... This pr fix this problem on wal level, but it is the problem in kafka client that is planned to fix in later. ## Detailed Changes See title. ## Test Plan Test manually.
1 parent dbf5542 commit f0e6da1

File tree

1 file changed

+43
-5
lines changed

1 file changed

+43
-5
lines changed

wal/src/message_queue_impl/region.rs

+43-5
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,17 @@ impl<M: MessageQueue> Region<M> {
224224
namespace, region_id
225225
);
226226

227-
// Fetch high watermark and check.
227+
// Fetch earliest, high watermark and check.
228+
let earliest = message_queue
229+
.fetch_offset(meta_topic, OffsetType::EarliestOffset)
230+
.await
231+
.box_err()
232+
.context(OpenWithCause {
233+
namespace,
234+
region_id,
235+
msg: "failed while recover from meta",
236+
})?;
237+
228238
let high_watermark = message_queue
229239
.fetch_offset(meta_topic, OffsetType::HighWaterMark)
230240
.await
@@ -235,9 +245,19 @@ impl<M: MessageQueue> Region<M> {
235245
msg: "failed while recover from meta",
236246
})?;
237247

238-
if high_watermark == 0 {
239-
debug!("Meta topic is empty, it just needs to recover from log topic, namespace:{}, region id:{}", namespace, region_id);
240-
return Ok(None);
248+
if earliest == high_watermark {
249+
if high_watermark == 0 {
250+
info!("Recover region meta from meta, found empty meta topic, just need to recover from log topic, namespace:{}, region id:{}",
251+
namespace, region_id);
252+
return Ok(None);
253+
}
254+
255+
return OpenNoCause {
256+
namespace,
257+
region_id,
258+
msg: "region meta impossible to be empty when having written logs",
259+
}
260+
.fail();
241261
}
242262

243263
// Fetch snapshot from meta topic(just fetch the last snapshot).
@@ -313,6 +333,9 @@ impl<M: MessageQueue> Region<M> {
313333
});
314334

315335
let region_safe_delete_offset = if min_safe_delete_offset == i64::MAX {
336+
info!("Recover region meta from meta, min_safe_delete_offset not exist, region_meta_snapshot:{:?}, namespace:{}, region id:{}",
337+
value, namespace, region_id);
338+
316339
None
317340
} else {
318341
Some(min_safe_delete_offset)
@@ -352,6 +375,17 @@ impl<M: MessageQueue> Region<M> {
352375
// FIXME: should not judge whether topic is empty or not by caller.
353376
// The consumer iterator should return immediately rather than hanging when
354377
// topic empty.
378+
// Fetch earliest, high watermark and check.
379+
let earliest = message_queue
380+
.fetch_offset(log_topic, OffsetType::EarliestOffset)
381+
.await
382+
.box_err()
383+
.context(OpenWithCause {
384+
namespace,
385+
region_id,
386+
msg: "failed while recover from log",
387+
})?;
388+
355389
let high_watermark = message_queue
356390
.fetch_offset(log_topic, OffsetType::HighWaterMark)
357391
.await
@@ -361,7 +395,11 @@ impl<M: MessageQueue> Region<M> {
361395
region_id,
362396
msg: "failed while recover from log",
363397
})?;
364-
if high_watermark == 0 {
398+
399+
if earliest == high_watermark {
400+
info!("Recover region meta from log, found empty log topic, namespace:{}, region_id:{}, earliest:{}, high_watermark:{}",
401+
namespace, region_id, earliest, high_watermark
402+
);
365403
return Ok(());
366404
}
367405

0 commit comments

Comments
 (0)