Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(meta): do configuration change in single barrier #20493

Merged
merged 1 commit into from
Feb 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -169,16 +169,6 @@ message FlushResponse {
uint64 hummock_version_id = 2;
}

// The reason why the data sources in the cluster are paused.
enum PausedReason {
PAUSED_REASON_UNSPECIFIED = 0;
// The cluster is paused due to configuration change, e.g. altering table schema and scaling.
PAUSED_REASON_CONFIG_CHANGE = 1;
// The cluster is paused due to manual operation, e.g. `risectl` command or the
// `pause_on_next_bootstrap` system variable.
PAUSED_REASON_MANUAL = 2;
}

message PauseRequest {}

message PauseResponse {}
Expand Down
10 changes: 0 additions & 10 deletions proto/stream_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,6 @@ message BarrierCompleteResponse {
uint32 database_id = 10;
}

message WaitEpochCommitRequest {
uint64 epoch = 1;
uint32 table_id = 2;
}

message WaitEpochCommitResponse {
common.Status status = 1;
}

message StreamingControlStreamRequest {
message InitialPartialGraph {
uint32 partial_graph_id = 1;
Expand Down Expand Up @@ -152,7 +143,6 @@ message GetMinUncommittedSstIdResponse {
}

service StreamService {
rpc WaitEpochCommit(WaitEpochCommitRequest) returns (WaitEpochCommitResponse);
rpc StreamingControlStream(stream StreamingControlStreamRequest) returns (stream StreamingControlStreamResponse);
rpc GetMinUncommittedSstId(GetMinUncommittedSstIdRequest) returns (GetMinUncommittedSstIdResponse);
}
Expand Down
31 changes: 0 additions & 31 deletions src/compute/src/rpc/service/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use await_tree::InstrumentAwait;
use futures::{Stream, StreamExt, TryStreamExt};
use risingwave_hummock_sdk::HummockSstableObjectId;
use risingwave_pb::stream_service::stream_service_server::StreamService;
use risingwave_pb::stream_service::*;
use risingwave_storage::dispatch_state_store;
use risingwave_storage::store::TryWaitEpochOptions;
use risingwave_stream::error::StreamError;
use risingwave_stream::task::{LocalStreamManager, StreamEnvironment};
use tokio::sync::mpsc::unbounded_channel;
use tokio_stream::wrappers::UnboundedReceiverStream;
Expand All @@ -42,33 +38,6 @@ impl StreamService for StreamServiceImpl {
type StreamingControlStreamStream =
impl Stream<Item = std::result::Result<StreamingControlStreamResponse, tonic::Status>>;

#[cfg_attr(coverage, coverage(off))]
async fn wait_epoch_commit(
&self,
request: Request<WaitEpochCommitRequest>,
) -> Result<Response<WaitEpochCommitResponse>, Status> {
let request = request.into_inner();
let epoch = request.epoch;

dispatch_state_store!(self.env.state_store(), store, {
use risingwave_hummock_sdk::HummockReadEpoch;
use risingwave_storage::StateStore;

store
.try_wait_epoch(
HummockReadEpoch::Committed(epoch),
TryWaitEpochOptions {
table_id: request.table_id.into(),
},
)
.instrument_await(format!("wait_epoch_commit (epoch {})", epoch))
.await
.map_err(StreamError::from)?;
});

Ok(Response::new(WaitEpochCommitResponse { status: None }))
}

async fn streaming_control_stream(
&self,
request: Request<Streaming<StreamingControlStreamRequest>>,
Expand Down
12 changes: 0 additions & 12 deletions src/ctl/src/cmd_impl/meta/pause_resume.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_pb::meta::PausedReason;

use crate::CtlContext;

pub fn desc(reason: PausedReason) -> &'static str {
// Method on optional enums derived from `prost` will use `Unspecified` if unset. So we treat
// `Unspecified` as not paused here.
match reason {
PausedReason::Unspecified => "not paused",
PausedReason::ConfigChange => "paused due to configuration change",
PausedReason::Manual => "paused manually",
}
}

pub async fn pause(context: &CtlContext) -> anyhow::Result<()> {
let meta_client = context.meta_client().await?;

Expand Down
2 changes: 1 addition & 1 deletion src/dml/src/dml_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub struct TableReader {
/// interface).
#[derive(Debug)]
pub struct DmlManager {
pub table_readers: RwLock<HashMap<TableId, TableReader>>,
table_readers: RwLock<HashMap<TableId, TableReader>>,
txn_id_generator: TxnIdGenerator,
dml_channel_initial_permits: usize,
}
Expand Down
4 changes: 2 additions & 2 deletions src/meta/service/src/stream_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl StreamManagerService for StreamServiceImpl {
async fn pause(&self, _: Request<PauseRequest>) -> Result<Response<PauseResponse>, Status> {
for database_id in self.metadata_manager.list_active_database_ids().await? {
self.barrier_scheduler
.run_command(database_id, Command::pause(PausedReason::Manual))
.run_command(database_id, Command::pause())
.await?;
}
Ok(Response::new(PauseResponse {}))
Expand All @@ -97,7 +97,7 @@ impl StreamManagerService for StreamServiceImpl {
async fn resume(&self, _: Request<ResumeRequest>) -> Result<Response<ResumeResponse>, Status> {
for database_id in self.metadata_manager.list_active_database_ids().await? {
self.barrier_scheduler
.run_command(database_id, Command::resume(PausedReason::Manual))
.run_command(database_id, Command::resume())
.await?;
}
Ok(Response::new(ResumeResponse {}))
Expand Down
65 changes: 8 additions & 57 deletions src/meta/src/barrier/checkpoint/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use risingwave_common::catalog::{DatabaseId, TableId};
use risingwave_meta_model::WorkerId;
use risingwave_pb::ddl_service::DdlProgress;
use risingwave_pb::hummock::HummockVersionStats;
use risingwave_pb::meta::PausedReason;
use risingwave_pb::stream_service::BarrierCompleteResponse;
use risingwave_pb::stream_service::streaming_control_stream_response::ResetDatabaseResponse;
use tracing::{debug, warn};
Expand All @@ -48,7 +47,6 @@ use crate::barrier::{
BarrierKind, Command, CreateStreamingJobCommandInfo, CreateStreamingJobType,
InflightSubscriptionInfo, SnapshotBackfillInfo, TracedEpoch,
};
use crate::manager::ActiveStreamingWorkerNodes;
use crate::rpc::metrics::GLOBAL_META_METRICS;
use crate::stream::fill_snapshot_backfill_epoch;
use crate::{MetaError, MetaResult};
Expand Down Expand Up @@ -147,7 +145,6 @@ impl CheckpointControl {
&mut self,
new_barrier: NewBarrier,
control_stream_manager: &mut ControlStreamManager,
active_streaming_nodes: &ActiveStreamingWorkerNodes,
) -> MetaResult<()> {
let NewBarrier {
command,
Expand Down Expand Up @@ -228,9 +225,7 @@ impl CheckpointControl {
max_prev_epoch,
)
}
Command::Flush
| Command::Pause(PausedReason::Manual)
| Command::Resume(PausedReason::Manual) => {
Command::Flush | Command::Pause | Command::Resume => {
for mut notifier in notifiers {
notifier.notify_started();
notifier.notify_collected();
Expand All @@ -254,7 +249,6 @@ impl CheckpointControl {
checkpoint,
span.clone(),
control_stream_manager,
active_streaming_nodes,
&self.hummock_version_stats,
curr_epoch.clone(),
)?;
Expand All @@ -270,7 +264,6 @@ impl CheckpointControl {
checkpoint,
span.clone(),
control_stream_manager,
active_streaming_nodes,
&self.hummock_version_stats,
curr_epoch.clone(),
)?;
Expand All @@ -289,7 +282,6 @@ impl CheckpointControl {
checkpoint,
span.clone(),
control_stream_manager,
active_streaming_nodes,
&self.hummock_version_stats,
curr_epoch.clone(),
)?;
Expand Down Expand Up @@ -507,8 +499,8 @@ pub(crate) struct DatabaseCheckpointControl {
/// Key is the `prev_epoch`.
command_ctx_queue: BTreeMap<u64, EpochNode>,
/// The barrier that are completing.
/// Some((`prev_epoch`, `should_pause_inject_barrier`))
completing_barrier: Option<(u64, bool)>,
/// Some(`prev_epoch`)
completing_barrier: Option<u64>,

committed_epoch: Option<u64>,
creating_streaming_job_controls: HashMap<TableId, CreatingStreamingJobControl>,
Expand Down Expand Up @@ -680,39 +672,7 @@ impl DatabaseCheckpointControl {
.count()
< in_flight_barrier_nums;

// Whether some command requires pausing concurrent barrier. If so, it must be the last one.
let should_pause = self
.command_ctx_queue
.last_key_value()
.and_then(|(_, x)| {
x.command_ctx
.command
.as_ref()
.map(Command::should_pause_inject_barrier)
})
.or(self
.completing_barrier
.map(|(_, should_pause)| should_pause))
.unwrap_or(false);
debug_assert_eq!(
self.command_ctx_queue
.values()
.filter_map(|node| {
node.command_ctx
.command
.as_ref()
.map(Command::should_pause_inject_barrier)
})
.chain(
self.completing_barrier
.map(|(_, should_pause)| should_pause)
.into_iter()
)
.any(|should_pause| should_pause),
should_pause
);

in_flight_not_full && !should_pause
in_flight_not_full
}

/// Return the earliest command waiting on the `worker_id`.
Expand Down Expand Up @@ -868,14 +828,7 @@ impl DatabaseCheckpointControl {
take(&mut node.state.resps),
self.collect_backfill_pinned_upstream_log_epoch(),
);
self.completing_barrier = Some((
node.command_ctx.barrier_info.prev_epoch(),
node.command_ctx
.command
.as_ref()
.map(|c| c.should_pause_inject_barrier())
.unwrap_or(false),
));
self.completing_barrier = Some(node.command_ctx.barrier_info.prev_epoch());
task.finished_jobs.extend(finished_jobs);
task.notifiers.extend(node.notifiers);
task.epoch_infos
Expand Down Expand Up @@ -914,7 +867,7 @@ impl DatabaseCheckpointControl {
creating_job_epochs: Vec<(TableId, u64)>,
) {
{
if let Some((prev_epoch, _)) = self.completing_barrier.take() {
if let Some(prev_epoch) = self.completing_barrier.take() {
assert_eq!(command_prev_epoch, Some(prev_epoch));
self.committed_epoch = Some(prev_epoch);
} else {
Expand Down Expand Up @@ -970,7 +923,6 @@ impl DatabaseCheckpointControl {
checkpoint: bool,
span: tracing::Span,
control_stream_manager: &mut ControlStreamManager,
active_streaming_nodes: &ActiveStreamingWorkerNodes,
hummock_version_stats: &HummockVersionStats,
curr_epoch: TracedEpoch,
) -> MetaResult<()> {
Expand Down Expand Up @@ -1047,7 +999,7 @@ impl DatabaseCheckpointControl {
}
}
CreateStreamingJobType::SnapshotBackfill(snapshot_backfill_info) => {
if self.state.paused_reason().is_some() {
if self.state.is_paused() {
warn!("cannot create streaming job with snapshot backfill when paused");
for notifier in notifiers {
notifier.notify_start_failed(
Expand Down Expand Up @@ -1082,7 +1034,7 @@ impl DatabaseCheckpointControl {
let mutation = command
.as_ref()
.expect("checked Some")
.to_mutation(None)
.to_mutation(false)
.expect("should have some mutation in `CreateStreamingJob` command");

control_stream_manager.add_partial_graph(self.database_id, Some(job_id))?;
Expand Down Expand Up @@ -1149,7 +1101,6 @@ impl DatabaseCheckpointControl {
notifiers.iter_mut().for_each(|n| n.notify_started());

let command_ctx = CommandContext::new(
active_streaming_nodes.current().clone(),
barrier_info,
pre_applied_subscription_info,
table_ids_to_commit.clone(),
Expand Down
2 changes: 1 addition & 1 deletion src/meta/src/barrier/checkpoint/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ impl DatabaseStatusAction<'_, EnterInitializing> {
&mut source_splits,
&mut background_jobs,
subscription_info,
None,
false,
&self.control.hummock_version_stats,
)?
};
Expand Down
Loading
Loading