diff --git a/src/common/src/field_generator/mod.rs b/src/common/src/field_generator/mod.rs index d313097cfe5ca..bfa56d24ddbfd 100644 --- a/src/common/src/field_generator/mod.rs +++ b/src/common/src/field_generator/mod.rs @@ -19,6 +19,7 @@ mod varchar; use std::time::Duration; use anyhow::Result; +use chrono::{DateTime, FixedOffset}; pub use numeric::*; use serde_json::Value; pub use timestamp::*; @@ -155,11 +156,13 @@ impl FieldGeneratorImpl { } pub fn with_timestamp( + base: Option>, max_past: Option, max_past_mode: Option, seed: u64, ) -> Result { Ok(FieldGeneratorImpl::Timestamp(TimestampField::new( + base, max_past, max_past_mode, seed, @@ -293,7 +296,7 @@ mod tests { let mut generator = match data_type { DataType::Varchar => FieldGeneratorImpl::with_varchar(None, seed).unwrap(), DataType::Timestamp => { - FieldGeneratorImpl::with_timestamp(None, None, seed).unwrap() + FieldGeneratorImpl::with_timestamp(None, None, None, seed).unwrap() } _ => FieldGeneratorImpl::with_number_random(data_type, None, None, seed).unwrap(), }; @@ -321,4 +324,16 @@ mod tests { assert_eq!(datum2_new, datum2); } } + + #[test] + fn test_deterministic_timestamp() { + let seed = 1234; + let base_time: DateTime = + DateTime::parse_from_rfc3339("2020-01-01T00:00:00+00:00").unwrap(); + let mut generator = + FieldGeneratorImpl::with_timestamp(Some(base_time), None, None, seed).unwrap(); + let val1 = generator.generate_json(1); + let val2 = generator.generate_json(1); + assert_eq!(val1, val2); + } } diff --git a/src/common/src/field_generator/timestamp.rs b/src/common/src/field_generator/timestamp.rs index 2165e541bb5af..5b582ba55b4d8 100644 --- a/src/common/src/field_generator/timestamp.rs +++ b/src/common/src/field_generator/timestamp.rs @@ -31,6 +31,7 @@ enum LocalNow { } pub struct TimestampField { + base: Option>, max_past: Duration, local_now: LocalNow, seed: u64, @@ -38,6 +39,7 @@ pub struct TimestampField { impl TimestampField { pub fn new( + base: Option>, max_past_option: Option, max_past_mode: Option, seed: u64, @@ -61,6 +63,7 @@ impl TimestampField { }; debug!(?local_now, ?max_past, "parse timestamp field option"); Ok(Self { + base, // convert to chrono::Duration max_past: chrono::Duration::from_std(max_past)?, local_now, @@ -72,12 +75,15 @@ impl TimestampField { let milliseconds = self.max_past.num_milliseconds(); let mut rng = StdRng::seed_from_u64(offset ^ self.seed); let max_milliseconds = rng.gen_range(0..=milliseconds); - let now = match self.local_now { - LocalNow::Relative => Local::now() - .naive_local() - .duration_round(Duration::microseconds(1)) - .unwrap(), - LocalNow::Absolute(now) => now, + let now = match self.base { + Some(base) => base.naive_local(), + None => match self.local_now { + LocalNow::Relative => Local::now() + .naive_local() + .duration_round(Duration::microseconds(1)) + .unwrap(), + LocalNow::Absolute(now) => now, + }, }; now - Duration::milliseconds(max_milliseconds) } diff --git a/src/connector/src/source/datagen/source/reader.rs b/src/connector/src/source/datagen/source/reader.rs index a3f17c1ca4b35..67b6b112fb0c8 100644 --- a/src/connector/src/source/datagen/source/reader.rs +++ b/src/connector/src/source/datagen/source/reader.rs @@ -14,7 +14,7 @@ use std::collections::HashMap; -use anyhow::Result; +use anyhow::{anyhow, Result}; use async_trait::async_trait; use futures::{StreamExt, TryStreamExt}; use futures_async_stream::try_stream; @@ -204,8 +204,22 @@ fn generator_from_data_type( let max_past_mode_value = fields_option_map .get(&max_past_mode_key) .map(|s| s.to_lowercase()); + let basetime = match fields_option_map.get(format!("fields.{}.basetime", name).as_str()) + { + Some(base) => { + Some(chrono::DateTime::parse_from_rfc3339(base).map_err(|e| { + anyhow!("cannot parse {:?} to rfc3339 due to {:?}", base, e) + })?) + } + None => None, + }; - FieldGeneratorImpl::with_timestamp(max_past_value, max_past_mode_value, random_seed) + FieldGeneratorImpl::with_timestamp( + basetime, + max_past_value, + max_past_mode_value, + random_seed, + ) } DataType::Varchar => { let length_key = format!("fields.{}.length", name);