Skip to content

Commit

Permalink
feat(meta): do configuration change in single barrier
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Feb 27, 2025
1 parent 659960e commit 7709bfc
Show file tree
Hide file tree
Showing 18 changed files with 77 additions and 308 deletions.
10 changes: 0 additions & 10 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -169,16 +169,6 @@ message FlushResponse {
uint64 hummock_version_id = 2;
}

// The reason why the data sources in the cluster are paused.
enum PausedReason {
PAUSED_REASON_UNSPECIFIED = 0;
// The cluster is paused due to configuration change, e.g. altering table schema and scaling.
PAUSED_REASON_CONFIG_CHANGE = 1;
// The cluster is paused due to manual operation, e.g. `risectl` command or the
// `pause_on_next_bootstrap` system variable.
PAUSED_REASON_MANUAL = 2;
}

message PauseRequest {}

message PauseResponse {}
Expand Down
10 changes: 0 additions & 10 deletions proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,6 @@ message BarrierCompleteResponse {
uint32 database_id = 10;
}

message WaitEpochCommitRequest {
uint64 epoch = 1;
uint32 table_id = 2;
}

message WaitEpochCommitResponse {
common.Status status = 1;
}

message StreamingControlStreamRequest {
message InitialPartialGraph {
uint32 partial_graph_id = 1;
Expand Down Expand Up @@ -152,7 +143,6 @@ message GetMinUncommittedSstIdResponse {
}

service StreamService {
rpc WaitEpochCommit(WaitEpochCommitRequest) returns (WaitEpochCommitResponse);
rpc StreamingControlStream(stream StreamingControlStreamRequest) returns (stream StreamingControlStreamResponse);
rpc GetMinUncommittedSstId(GetMinUncommittedSstIdRequest) returns (GetMinUncommittedSstIdResponse);
}
Expand Down
31 changes: 0 additions & 31 deletions src/compute/src/rpc/service/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use await_tree::InstrumentAwait;
use futures::{Stream, StreamExt, TryStreamExt};
use risingwave_hummock_sdk::HummockSstableObjectId;
use risingwave_pb::stream_service::stream_service_server::StreamService;
use risingwave_pb::stream_service::*;
use risingwave_storage::dispatch_state_store;
use risingwave_storage::store::TryWaitEpochOptions;
use risingwave_stream::error::StreamError;
use risingwave_stream::task::{LocalStreamManager, StreamEnvironment};
use tokio::sync::mpsc::unbounded_channel;
use tokio_stream::wrappers::UnboundedReceiverStream;
Expand All @@ -42,33 +38,6 @@ impl StreamService for StreamServiceImpl {
type StreamingControlStreamStream =
impl Stream<Item = std::result::Result<StreamingControlStreamResponse, tonic::Status>>;

#[cfg_attr(coverage, coverage(off))]
async fn wait_epoch_commit(
&self,
request: Request<WaitEpochCommitRequest>,
) -> Result<Response<WaitEpochCommitResponse>, Status> {
let request = request.into_inner();
let epoch = request.epoch;

dispatch_state_store!(self.env.state_store(), store, {
use risingwave_hummock_sdk::HummockReadEpoch;
use risingwave_storage::StateStore;

store
.try_wait_epoch(
HummockReadEpoch::Committed(epoch),
TryWaitEpochOptions {
table_id: request.table_id.into(),
},
)
.instrument_await(format!("wait_epoch_commit (epoch {})", epoch))
.await
.map_err(StreamError::from)?;
});

Ok(Response::new(WaitEpochCommitResponse { status: None }))
}

async fn streaming_control_stream(
&self,
request: Request<Streaming<StreamingControlStreamRequest>>,
Expand Down
12 changes: 0 additions & 12 deletions src/ctl/src/cmd_impl/meta/pause_resume.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_pb::meta::PausedReason;

use crate::CtlContext;

pub fn desc(reason: PausedReason) -> &'static str {
// Method on optional enums derived from `prost` will use `Unspecified` if unset. So we treat
// `Unspecified` as not paused here.
match reason {
PausedReason::Unspecified => "not paused",
PausedReason::ConfigChange => "paused due to configuration change",
PausedReason::Manual => "paused manually",
}
}

pub async fn pause(context: &CtlContext) -> anyhow::Result<()> {
let meta_client = context.meta_client().await?;

Expand Down
2 changes: 1 addition & 1 deletion src/dml/src/dml_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub struct TableReader {
/// interface).
#[derive(Debug)]
pub struct DmlManager {
pub table_readers: RwLock<HashMap<TableId, TableReader>>,
table_readers: RwLock<HashMap<TableId, TableReader>>,
txn_id_generator: TxnIdGenerator,
dml_channel_initial_permits: usize,
}
Expand Down
4 changes: 2 additions & 2 deletions src/meta/service/src/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl StreamManagerService for StreamServiceImpl {
async fn pause(&self, _: Request<PauseRequest>) -> Result<Response<PauseResponse>, Status> {
for database_id in self.metadata_manager.list_active_database_ids().await? {
self.barrier_scheduler
.run_command(database_id, Command::pause(PausedReason::Manual))
.run_command(database_id, Command::pause())
.await?;
}
Ok(Response::new(PauseResponse {}))
Expand All @@ -97,7 +97,7 @@ impl StreamManagerService for StreamServiceImpl {
async fn resume(&self, _: Request<ResumeRequest>) -> Result<Response<ResumeResponse>, Status> {
for database_id in self.metadata_manager.list_active_database_ids().await? {
self.barrier_scheduler
.run_command(database_id, Command::resume(PausedReason::Manual))
.run_command(database_id, Command::resume())
.await?;
}
Ok(Response::new(ResumeResponse {}))
Expand Down
65 changes: 8 additions & 57 deletions src/meta/src/barrier/checkpoint/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use risingwave_common::catalog::{DatabaseId, TableId};
use risingwave_meta_model::WorkerId;
use risingwave_pb::ddl_service::DdlProgress;
use risingwave_pb::hummock::HummockVersionStats;
use risingwave_pb::meta::PausedReason;
use risingwave_pb::stream_service::BarrierCompleteResponse;
use risingwave_pb::stream_service::streaming_control_stream_response::ResetDatabaseResponse;
use tracing::{debug, warn};
Expand All @@ -48,7 +47,6 @@ use crate::barrier::{
BarrierKind, Command, CreateStreamingJobCommandInfo, CreateStreamingJobType,
InflightSubscriptionInfo, SnapshotBackfillInfo, TracedEpoch,
};
use crate::manager::ActiveStreamingWorkerNodes;
use crate::rpc::metrics::GLOBAL_META_METRICS;
use crate::stream::fill_snapshot_backfill_epoch;
use crate::{MetaError, MetaResult};
Expand Down Expand Up @@ -147,7 +145,6 @@ impl CheckpointControl {
&mut self,
new_barrier: NewBarrier,
control_stream_manager: &mut ControlStreamManager,
active_streaming_nodes: &ActiveStreamingWorkerNodes,
) -> MetaResult<()> {
let NewBarrier {
command,
Expand Down Expand Up @@ -228,9 +225,7 @@ impl CheckpointControl {
max_prev_epoch,
)
}
Command::Flush
| Command::Pause(PausedReason::Manual)
| Command::Resume(PausedReason::Manual) => {
Command::Flush | Command::Pause | Command::Resume => {
for mut notifier in notifiers {
notifier.notify_started();
notifier.notify_collected();
Expand All @@ -254,7 +249,6 @@ impl CheckpointControl {
checkpoint,
span.clone(),
control_stream_manager,
active_streaming_nodes,
&self.hummock_version_stats,
curr_epoch.clone(),
)?;
Expand All @@ -270,7 +264,6 @@ impl CheckpointControl {
checkpoint,
span.clone(),
control_stream_manager,
active_streaming_nodes,
&self.hummock_version_stats,
curr_epoch.clone(),
)?;
Expand All @@ -289,7 +282,6 @@ impl CheckpointControl {
checkpoint,
span.clone(),
control_stream_manager,
active_streaming_nodes,
&self.hummock_version_stats,
curr_epoch.clone(),
)?;
Expand Down Expand Up @@ -507,8 +499,8 @@ pub(crate) struct DatabaseCheckpointControl {
/// Key is the `prev_epoch`.
command_ctx_queue: BTreeMap<u64, EpochNode>,
/// The barrier that are completing.
/// Some((`prev_epoch`, `should_pause_inject_barrier`))
completing_barrier: Option<(u64, bool)>,
/// Some(`prev_epoch`)
completing_barrier: Option<u64>,

committed_epoch: Option<u64>,
creating_streaming_job_controls: HashMap<TableId, CreatingStreamingJobControl>,
Expand Down Expand Up @@ -680,39 +672,7 @@ impl DatabaseCheckpointControl {
.count()
< in_flight_barrier_nums;

// Whether some command requires pausing concurrent barrier. If so, it must be the last one.
let should_pause = self
.command_ctx_queue
.last_key_value()
.and_then(|(_, x)| {
x.command_ctx
.command
.as_ref()
.map(Command::should_pause_inject_barrier)
})
.or(self
.completing_barrier
.map(|(_, should_pause)| should_pause))
.unwrap_or(false);
debug_assert_eq!(
self.command_ctx_queue
.values()
.filter_map(|node| {
node.command_ctx
.command
.as_ref()
.map(Command::should_pause_inject_barrier)
})
.chain(
self.completing_barrier
.map(|(_, should_pause)| should_pause)
.into_iter()
)
.any(|should_pause| should_pause),
should_pause
);

in_flight_not_full && !should_pause
in_flight_not_full
}

/// Return the earliest command waiting on the `worker_id`.
Expand Down Expand Up @@ -868,14 +828,7 @@ impl DatabaseCheckpointControl {
take(&mut node.state.resps),
self.collect_backfill_pinned_upstream_log_epoch(),
);
self.completing_barrier = Some((
node.command_ctx.barrier_info.prev_epoch(),
node.command_ctx
.command
.as_ref()
.map(|c| c.should_pause_inject_barrier())
.unwrap_or(false),
));
self.completing_barrier = Some(node.command_ctx.barrier_info.prev_epoch());
task.finished_jobs.extend(finished_jobs);
task.notifiers.extend(node.notifiers);
task.epoch_infos
Expand Down Expand Up @@ -914,7 +867,7 @@ impl DatabaseCheckpointControl {
creating_job_epochs: Vec<(TableId, u64)>,
) {
{
if let Some((prev_epoch, _)) = self.completing_barrier.take() {
if let Some(prev_epoch) = self.completing_barrier.take() {
assert_eq!(command_prev_epoch, Some(prev_epoch));
self.committed_epoch = Some(prev_epoch);
} else {
Expand Down Expand Up @@ -970,7 +923,6 @@ impl DatabaseCheckpointControl {
checkpoint: bool,
span: tracing::Span,
control_stream_manager: &mut ControlStreamManager,
active_streaming_nodes: &ActiveStreamingWorkerNodes,
hummock_version_stats: &HummockVersionStats,
curr_epoch: TracedEpoch,
) -> MetaResult<()> {
Expand Down Expand Up @@ -1047,7 +999,7 @@ impl DatabaseCheckpointControl {
}
}
CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) => {
if self.state.paused_reason().is_some() {
if self.state.is_paused() {
warn!("cannot create streaming job with snapshot backfill when paused");
for notifier in notifiers {
notifier.notify_start_failed(
Expand Down Expand Up @@ -1082,7 +1034,7 @@ impl DatabaseCheckpointControl {
let mutation = command
.as_ref()
.expect("checked Some")
.to_mutation(None)
.to_mutation(false)
.expect("should have some mutation in `CreateStreamingJob` command");

control_stream_manager.add_partial_graph(self.database_id, Some(job_id))?;
Expand Down Expand Up @@ -1149,7 +1101,6 @@ impl DatabaseCheckpointControl {
notifiers.iter_mut().for_each(|n| n.notify_started());

let command_ctx = CommandContext::new(
active_streaming_nodes.current().clone(),
barrier_info,
pre_applied_subscription_info,
table_ids_to_commit.clone(),
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/barrier/checkpoint/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ impl DatabaseStatusAction<'_, EnterInitializing> {
&mut source_splits,
&mut background_jobs,
subscription_info,
None,
false,
&self.control.hummock_version_stats,
)?
};
Expand Down
Loading

0 comments on commit 7709bfc

Please sign in to comment.