Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Add time for each slt record in simulation test #8724

Merged
merged 5 commits into from
Mar 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/tests/simulation/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ itertools = "0.10"
lru = { git = "https://github.com/risingwavelabs/lru-rs.git", branch = "evict_by_timestamp" }
madsim = "0.2.17"
paste = "1"
pin-project = "1.0"
pretty_assertions = "1"
rand = "0.8"
rdkafka = { package = "madsim-rdkafka", version = "=0.2.14-alpha", features = ["cmake-build"] }
Expand Down
25 changes: 22 additions & 3 deletions src/tests/simulation/src/slt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use sqllogictest::ParallelTestError;

use crate::client::RisingWave;
use crate::cluster::{Cluster, KillOpts};
use crate::utils::TimedExt;

fn is_create_table_as(sql: &str) -> bool {
let parts: Vec<String> = sql
Expand Down Expand Up @@ -112,7 +113,13 @@ pub async fn run_slt_task(cluster: Arc<Cluster>, glob: &str, opts: &KillOpts) {

// For normal records.
if !kill {
match tester.run_async(record).await {
match tester
.run_async(record.clone())
.timed(|_res, elapsed| {
println!("Record {:?} finished in {:?}", record, elapsed)
})
.await
{
Comment on lines +116 to +122
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I came up with a simpler solution that is more specialized and can avoid TimedFuture utility.
But the current way also looks good to me for its universality.

Suggested change
match tester
.run_async(record.clone())
.timed(|_res, elapsed| {
println!("Record {:?} finished in {:?}", record, elapsed)
})
.await
{
match run_record(&mut tester, &record).await {

if we have:

async fn run_record(
    tester: &mut sqllogictest::Runner<RisingWave>,
    record: &sqllogictest::Record,
) -> Result<(), sqllogictest::TestError> {
    let t0 = std::time::Instant::now();
    let res = tester.run_async(record.clone()).await;
    println!("Record {:?} finished in {:?}", record, t0.elapsed());
    res
}

Ok(_) => continue,
Err(e) => panic!("{}", e),
}
Expand All @@ -128,7 +135,13 @@ pub async fn run_slt_task(cluster: Arc<Cluster>, glob: &str, opts: &KillOpts) {
if cmd.ignore_kill() {
for i in 0usize.. {
let delay = Duration::from_secs(1 << i);
if let Err(err) = tester.run_async(record.clone()).await {
if let Err(err) = tester
.run_async(record.clone())
.timed(|_res, elapsed| {
println!("Record {:?} finished in {:?}", record, elapsed)
})
.await
{
// cluster could be still under recovering if killed before, retry if
// meets `no reader for dml in table with id {}`.
let should_retry =
Expand Down Expand Up @@ -162,7 +175,13 @@ pub async fn run_slt_task(cluster: Arc<Cluster>, glob: &str, opts: &KillOpts) {
// retry up to 5 times until it succeed
for i in 0usize.. {
let delay = Duration::from_secs(1 << i);
match tester.run_async(record.clone()).await {
match tester
.run_async(record.clone())
.timed(|_res, elapsed| {
println!("Record {:?} finished in {:?}", record, elapsed)
})
.await
{
Ok(_) => break,
// allow 'table exists' error when retry CREATE statement
Err(e)
Expand Down
19 changes: 19 additions & 0 deletions src/tests/simulation/src/utils/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

mod assert_result;
pub use assert_result::*;

mod timed_future;
pub use timed_future::*;
76 changes: 76 additions & 0 deletions src/tests/simulation/src/utils/timed_future.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::future::Future;
use std::pin::{pin, Pin};
use std::task::{Context, Poll};
use std::time::{Duration, Instant};

use pin_project::pin_project;

/// Inspired by https://stackoverflow.com/a/59935743/2990323
/// A wrapper around a Future which adds timing data.
#[pin_project]
pub struct Timed<Fut, F>
where
Fut: Future,
F: Fn(&Fut::Output, Duration),
{
#[pin]
inner: Fut,
f: F,
start: Option<Instant>,
}

impl<Fut, F> Future for Timed<Fut, F>
where
Fut: Future,
F: Fn(&Fut::Output, Duration),
{
type Output = Fut::Output;

fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
let this = self.project();
let start = this.start.get_or_insert_with(Instant::now);

match this.inner.poll(cx) {
// If the inner future is still pending, this wrapper is still pending.
Poll::Pending => Poll::Pending,

// If the inner future is done, measure the elapsed time and finish this wrapper future.
Poll::Ready(v) => {
let elapsed = start.elapsed();
(this.f)(&v, elapsed);

Poll::Ready(v)
}
}
}
}

pub trait TimedExt: Sized + Future {
fn timed<F>(self, f: F) -> Timed<Self, F>
where
F: Fn(&Self::Output, Duration),
{
Timed {
inner: self,
f,
start: None,
}
}
}

// All futures can use the `.timed` method defined above
impl<F: Future> TimedExt for F {}