|
2 | 2 |
|
3 | 3 | use std::{mem, sync::Arc};
|
4 | 4 |
|
5 |
| -use arrow::{compute, compute::kernels::cast_utils::string_to_timestamp_nanos}; |
| 5 | +use arrow::{compute, compute::kernels::cast_utils::string_to_timestamp_nanos, error::ArrowError}; |
| 6 | +use chrono::{prelude::*, LocalResult}; |
6 | 7 | use datafusion::{
|
7 | 8 | arrow::datatypes::DataType,
|
8 | 9 | common::DFSchemaRef,
|
@@ -281,16 +282,59 @@ impl<'a> ExprRewriter for TypeRewriter<'a> {
|
281 | 282 | }
|
282 | 283 |
|
283 | 284 | fn string_to_timestamp_ms(string: &str) -> Result<ScalarValue> {
|
| 285 | + // TODO(lee): remove following codes after PR(https://github.com/apache/arrow-rs/pull/3787) merged |
| 286 | + // Because function `string_to_timestamp_nanos` returns a NaiveDateTime's |
| 287 | + // nanoseconds from a string without a specify time zone, We need to convert |
| 288 | + // it to local timestamp. |
| 289 | + |
| 290 | + // without a timezone specifier as a local time, using 'T' as a separator |
| 291 | + // Example: 2020-09-08T13:42:29.190855 |
| 292 | + if let Ok(ts) = NaiveDateTime::parse_from_str(string, "%Y-%m-%dT%H:%M:%S%.f") { |
| 293 | + let mills = naive_datetime_to_timestamp(string, ts).map_err(DataFusionError::from)?; |
| 294 | + return Ok(ScalarValue::TimestampMillisecond(Some(mills), None)); |
| 295 | + } |
| 296 | + |
| 297 | + // without a timezone specifier as a local time, using ' ' as a separator |
| 298 | + // Example: 2020-09-08 13:42:29.190855 |
| 299 | + if let Ok(ts) = NaiveDateTime::parse_from_str(string, "%Y-%m-%d %H:%M:%S%.f") { |
| 300 | + let mills = naive_datetime_to_timestamp(string, ts).map_err(DataFusionError::from)?; |
| 301 | + return Ok(ScalarValue::TimestampMillisecond(Some(mills), None)); |
| 302 | + } |
| 303 | + |
| 304 | + let result = string_to_timestamp_nanos(string); |
284 | 305 | Ok(ScalarValue::TimestampMillisecond(
|
285 | 306 | Some(
|
286 |
| - string_to_timestamp_nanos(string) |
| 307 | + result |
287 | 308 | .map(|t| t / 1_000_000)
|
288 | 309 | .map_err(DataFusionError::from)?,
|
289 | 310 | ),
|
290 | 311 | None,
|
291 | 312 | ))
|
292 | 313 | }
|
293 | 314 |
|
| 315 | +/// Converts the naive datetime (which has no specific timezone) to a |
| 316 | +/// nanosecond epoch timestamp relative to UTC. |
| 317 | +fn naive_datetime_to_timestamp(s: &str, datetime: NaiveDateTime) -> Result<i64, ArrowError> { |
| 318 | + let l = Local {}; |
| 319 | + |
| 320 | + match l.from_local_datetime(&datetime) { |
| 321 | + LocalResult::None => Err(ArrowError::CastError(format!( |
| 322 | + "Error parsing '{s}' as timestamp: local time representation is invalid" |
| 323 | + ))), |
| 324 | + LocalResult::Single(local_datetime) => { |
| 325 | + Ok(local_datetime.with_timezone(&Utc).timestamp_nanos() / 1_000_000) |
| 326 | + } |
| 327 | + // Ambiguous times can happen if the timestamp is exactly when |
| 328 | + // a daylight savings time transition occurs, for example, and |
| 329 | + // so the datetime could validly be said to be in two |
| 330 | + // potential offsets. However, since we are about to convert |
| 331 | + // to UTC anyways, we can pick one arbitrarily |
| 332 | + LocalResult::Ambiguous(local_datetime, _) => { |
| 333 | + Ok(local_datetime.with_timezone(&Utc).timestamp_nanos() / 1_000_000) |
| 334 | + } |
| 335 | + } |
| 336 | +} |
| 337 | + |
294 | 338 | enum TimestampType {
|
295 | 339 | Second,
|
296 | 340 | #[allow(dead_code)]
|
@@ -321,6 +365,7 @@ mod tests {
|
321 | 365 | };
|
322 | 366 |
|
323 | 367 | use super::*;
|
| 368 | + use crate::logical_optimizer::type_conversion; |
324 | 369 |
|
325 | 370 | fn expr_test_schema() -> DFSchemaRef {
|
326 | 371 | Arc::new(
|
@@ -445,7 +490,7 @@ mod tests {
|
445 | 490 |
|
446 | 491 | #[test]
|
447 | 492 | fn test_type_conversion_timestamp() {
|
448 |
| - let date_string = "2021-09-07 16:00:00".to_string(); |
| 493 | + let date_string = "2021-09-07T16:00:00Z".to_string(); |
449 | 494 | let schema = expr_test_schema();
|
450 | 495 | let mut rewriter = TypeRewriter {
|
451 | 496 | schemas: vec![&schema],
|
@@ -498,7 +543,7 @@ mod tests {
|
498 | 543 | );
|
499 | 544 |
|
500 | 545 | // Timestamp c6 between "2021-09-07 16:00:00" and "2021-09-07 17:00:00"
|
501 |
| - let date_string2 = "2021-09-07 17:00:00".to_string(); |
| 546 | + let date_string2 = "2021-09-07T17:00:00Z".to_string(); |
502 | 547 | let exp = Expr::Between(Between {
|
503 | 548 | expr: Box::new(col("c6")),
|
504 | 549 | negated: false,
|
@@ -530,4 +575,32 @@ mod tests {
|
530 | 575 | })
|
531 | 576 | );
|
532 | 577 | }
|
| 578 | + |
| 579 | + #[test] |
| 580 | + fn test_string_to_timestamp_ms() { |
| 581 | + let date_string = [ |
| 582 | + "2021-09-07T16:00:00+08:00", |
| 583 | + "2021-09-07 16:00:00+08:00", |
| 584 | + "2021-09-07T16:00:00Z", |
| 585 | + "2021-09-07 16:00:00Z", |
| 586 | + ]; |
| 587 | + let expects: [i64; 4] = [1631001600000, 1631001600000, 1631030400000, 1631030400000]; |
| 588 | + for (index, &string) in date_string.iter().enumerate() { |
| 589 | + let result = type_conversion::string_to_timestamp_ms(string); |
| 590 | + if let Ok(ScalarValue::TimestampMillisecond(Some(mills), _)) = result { |
| 591 | + let expect = *expects.get(index).unwrap(); |
| 592 | + assert_eq!(mills, expect) |
| 593 | + } |
| 594 | + } |
| 595 | + |
| 596 | + let date_string = "2021-09-07 16:00:00".to_string(); |
| 597 | + let d = NaiveDate::from_ymd_opt(2021, 9, 7).unwrap(); |
| 598 | + let t = NaiveTime::from_hms_milli_opt(16, 0, 0, 0).unwrap(); |
| 599 | + let dt = NaiveDateTime::new(d, t); |
| 600 | + let expect = naive_datetime_to_timestamp(&date_string, dt).unwrap(); |
| 601 | + let result = type_conversion::string_to_timestamp_ms(&date_string); |
| 602 | + if let Ok(ScalarValue::TimestampMillisecond(Some(mills), _)) = result { |
| 603 | + assert_eq!(mills, expect) |
| 604 | + } |
| 605 | + } |
533 | 606 | }
|
0 commit comments