arrow_json/reader/timestamp_array.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use chrono::TimeZone;
use std::marker::PhantomData;
use arrow_array::builder::PrimitiveBuilder;
use arrow_array::types::ArrowTimestampType;
use arrow_array::Array;
use arrow_cast::parse::string_to_datetime;
use arrow_data::ArrayData;
use arrow_schema::{ArrowError, DataType, TimeUnit};
use crate::reader::tape::{Tape, TapeElement};
use crate::reader::ArrayDecoder;
/// A specialized [`ArrayDecoder`] for timestamps
pub struct TimestampArrayDecoder<P: ArrowTimestampType, Tz: TimeZone> {
data_type: DataType,
timezone: Tz,
// Invariant and Send
phantom: PhantomData<fn(P) -> P>,
}
impl<P: ArrowTimestampType, Tz: TimeZone> TimestampArrayDecoder<P, Tz> {
pub fn new(data_type: DataType, timezone: Tz) -> Self {
Self {
data_type,
timezone,
phantom: Default::default(),
}
}
}
impl<P, Tz> ArrayDecoder for TimestampArrayDecoder<P, Tz>
where
P: ArrowTimestampType,
Tz: TimeZone + Send,
{
fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayData, ArrowError> {
let mut builder =
PrimitiveBuilder::<P>::with_capacity(pos.len()).with_data_type(self.data_type.clone());
for p in pos {
match tape.get(*p) {
TapeElement::Null => builder.append_null(),
TapeElement::String(idx) => {
let s = tape.get_string(idx);
let date = string_to_datetime(&self.timezone, s).map_err(|e| {
ArrowError::JsonError(format!(
"failed to parse \"{s}\" as {}: {}",
self.data_type, e
))
})?;
let value = match P::UNIT {
TimeUnit::Second => date.timestamp(),
TimeUnit::Millisecond => date.timestamp_millis(),
TimeUnit::Microsecond => date.timestamp_micros(),
TimeUnit::Nanosecond => date.timestamp_nanos_opt().ok_or_else(|| {
ArrowError::ParseError(format!(
"{} would overflow 64-bit signed nanoseconds",
date.to_rfc3339(),
))
})?,
};
builder.append_value(value)
}
TapeElement::Number(idx) => {
let s = tape.get_string(idx);
let b = s.as_bytes();
let value = lexical_core::parse::<i64>(b)
.or_else(|_| lexical_core::parse::<f64>(b).map(|x| x as i64))
.map_err(|_| {
ArrowError::JsonError(format!(
"failed to parse {s} as {}",
self.data_type
))
})?;
builder.append_value(value)
}
TapeElement::I32(v) => builder.append_value(v as i64),
TapeElement::I64(high) => match tape.get(p + 1) {
TapeElement::I32(low) => {
builder.append_value((high as i64) << 32 | (low as u32) as i64)
}
_ => unreachable!(),
},
_ => return Err(tape.error(*p, "primitive")),
}
}
Ok(builder.finish().into_data())
}
}