Skip to main content

arrow_json/reader/
timestamp_array.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::marker::PhantomData;
19use std::sync::Arc;
20
21use arrow_array::ArrayRef;
22use arrow_array::builder::PrimitiveBuilder;
23use arrow_array::types::ArrowTimestampType;
24use arrow_cast::parse::string_to_datetime;
25use arrow_schema::{ArrowError, DataType, TimeUnit};
26use chrono::TimeZone;
27
28use crate::reader::tape::{Tape, TapeElement};
29use crate::reader::{ArrayDecoder, DecoderContext};
30
31/// A specialized [`ArrayDecoder`] for timestamps
32pub struct TimestampArrayDecoder<P: ArrowTimestampType, Tz: TimeZone> {
33    data_type: DataType,
34    timezone: Tz,
35    ignore_type_conflicts: bool,
36    // Invariant and Send
37    phantom: PhantomData<fn(P) -> P>,
38}
39
40impl<P: ArrowTimestampType, Tz: TimeZone> TimestampArrayDecoder<P, Tz> {
41    pub fn new(ctx: &DecoderContext, data_type: &DataType, timezone: Tz) -> Self {
42        Self {
43            data_type: data_type.clone(),
44            timezone,
45            ignore_type_conflicts: ctx.ignore_type_conflicts(),
46            phantom: Default::default(),
47        }
48    }
49}
50
51impl<P, Tz> ArrayDecoder for TimestampArrayDecoder<P, Tz>
52where
53    P: ArrowTimestampType,
54    Tz: TimeZone + Send,
55{
56    fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayRef, ArrowError> {
57        let mut builder =
58            PrimitiveBuilder::<P>::with_capacity(pos.len()).with_data_type(self.data_type.clone());
59        for p in pos {
60            let value = match tape.get(*p) {
61                TapeElement::Null => {
62                    builder.append_null();
63                    continue;
64                }
65                TapeElement::String(idx) => {
66                    let s = tape.get_string(idx);
67                    let date = string_to_datetime(&self.timezone, s).map_err(|e| {
68                        ArrowError::JsonError(format!(
69                            "failed to parse \"{s}\" as {}: {}",
70                            self.data_type, e
71                        ))
72                    });
73
74                    date.and_then(|date| match P::UNIT {
75                        TimeUnit::Second => Ok(date.timestamp()),
76                        TimeUnit::Millisecond => Ok(date.timestamp_millis()),
77                        TimeUnit::Microsecond => Ok(date.timestamp_micros()),
78                        TimeUnit::Nanosecond => date.timestamp_nanos_opt().ok_or_else(|| {
79                            ArrowError::ParseError(format!(
80                                "{} would overflow 64-bit signed nanoseconds",
81                                date.to_rfc3339(),
82                            ))
83                        }),
84                    })
85                }
86                TapeElement::Number(idx) => {
87                    let s = tape.get_string(idx);
88                    let b = s.as_bytes();
89                    lexical_core::parse::<i64>(b)
90                        .or_else(|_| lexical_core::parse::<f64>(b).map(|x| x as i64))
91                        .map_err(|_| {
92                            ArrowError::JsonError(format!(
93                                "failed to parse {s} as {}",
94                                self.data_type
95                            ))
96                        })
97                }
98                TapeElement::I32(v) => Ok(v as i64),
99                TapeElement::I64(high) => match tape.get(p + 1) {
100                    TapeElement::I32(low) => Ok(((high as i64) << 32) | (low as u32) as i64),
101                    _ => unreachable!(),
102                },
103                _ => Err(tape.error(*p, "primitive")),
104            };
105
106            match value {
107                Ok(value) => builder.append_value(value),
108                Err(_) if self.ignore_type_conflicts => builder.append_null(),
109                Err(e) => return Err(e),
110            }
111        }
112
113        Ok(Arc::new(builder.finish()))
114    }
115}