1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
1718use chrono::TimeZone;
19use std::marker::PhantomData;
2021use arrow_array::builder::PrimitiveBuilder;
22use arrow_array::types::ArrowTimestampType;
23use arrow_array::Array;
24use arrow_cast::parse::string_to_datetime;
25use arrow_data::ArrayData;
26use arrow_schema::{ArrowError, DataType, TimeUnit};
2728use crate::reader::tape::{Tape, TapeElement};
29use crate::reader::ArrayDecoder;
3031/// A specialized [`ArrayDecoder`] for timestamps
32pub struct TimestampArrayDecoder<P: ArrowTimestampType, Tz: TimeZone> {
33 data_type: DataType,
34 timezone: Tz,
35// Invariant and Send
36phantom: PhantomData<fn(P) -> P>,
37}
3839impl<P: ArrowTimestampType, Tz: TimeZone> TimestampArrayDecoder<P, Tz> {
40pub fn new(data_type: DataType, timezone: Tz) -> Self {
41Self {
42 data_type,
43 timezone,
44 phantom: Default::default(),
45 }
46 }
47}
4849impl<P, Tz> ArrayDecoder for TimestampArrayDecoder<P, Tz>
50where
51P: ArrowTimestampType,
52 Tz: TimeZone + Send,
53{
54fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayData, ArrowError> {
55let mut builder =
56 PrimitiveBuilder::<P>::with_capacity(pos.len()).with_data_type(self.data_type.clone());
5758for p in pos {
59match tape.get(*p) {
60 TapeElement::Null => builder.append_null(),
61 TapeElement::String(idx) => {
62let s = tape.get_string(idx);
63let date = string_to_datetime(&self.timezone, s).map_err(|e| {
64 ArrowError::JsonError(format!(
65"failed to parse \"{s}\" as {}: {}",
66self.data_type, e
67 ))
68 })?;
6970let value = match P::UNIT {
71 TimeUnit::Second => date.timestamp(),
72 TimeUnit::Millisecond => date.timestamp_millis(),
73 TimeUnit::Microsecond => date.timestamp_micros(),
74 TimeUnit::Nanosecond => date.timestamp_nanos_opt().ok_or_else(|| {
75 ArrowError::ParseError(format!(
76"{} would overflow 64-bit signed nanoseconds",
77 date.to_rfc3339(),
78 ))
79 })?,
80 };
81 builder.append_value(value)
82 }
83 TapeElement::Number(idx) => {
84let s = tape.get_string(idx);
85let b = s.as_bytes();
86let value = lexical_core::parse::<i64>(b)
87 .or_else(|_| lexical_core::parse::<f64>(b).map(|x| x as i64))
88 .map_err(|_| {
89 ArrowError::JsonError(format!(
90"failed to parse {s} as {}",
91self.data_type
92 ))
93 })?;
9495 builder.append_value(value)
96 }
97 TapeElement::I32(v) => builder.append_value(v as i64),
98 TapeElement::I64(high) => match tape.get(p + 1) {
99 TapeElement::I32(low) => {
100 builder.append_value(((high as i64) << 32) | (low as u32) as i64)
101 }
102_ => unreachable!(),
103 },
104_ => return Err(tape.error(*p, "primitive")),
105 }
106 }
107108Ok(builder.finish().into_data())
109 }
110}