arrow_json/reader/
timestamp_array.rs1use chrono::TimeZone;
19use std::marker::PhantomData;
20
21use arrow_array::builder::PrimitiveBuilder;
22use arrow_array::types::ArrowTimestampType;
23use arrow_array::Array;
24use arrow_cast::parse::string_to_datetime;
25use arrow_data::ArrayData;
26use arrow_schema::{ArrowError, DataType, TimeUnit};
27
28use crate::reader::tape::{Tape, TapeElement};
29use crate::reader::ArrayDecoder;
30
31pub struct TimestampArrayDecoder<P: ArrowTimestampType, Tz: TimeZone> {
33 data_type: DataType,
34 timezone: Tz,
35 phantom: PhantomData<fn(P) -> P>,
37}
38
39impl<P: ArrowTimestampType, Tz: TimeZone> TimestampArrayDecoder<P, Tz> {
40 pub fn new(data_type: DataType, timezone: Tz) -> Self {
41 Self {
42 data_type,
43 timezone,
44 phantom: Default::default(),
45 }
46 }
47}
48
49impl<P, Tz> ArrayDecoder for TimestampArrayDecoder<P, Tz>
50where
51 P: ArrowTimestampType,
52 Tz: TimeZone + Send,
53{
54 fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayData, ArrowError> {
55 let mut builder =
56 PrimitiveBuilder::<P>::with_capacity(pos.len()).with_data_type(self.data_type.clone());
57
58 for p in pos {
59 match tape.get(*p) {
60 TapeElement::Null => builder.append_null(),
61 TapeElement::String(idx) => {
62 let s = tape.get_string(idx);
63 let date = string_to_datetime(&self.timezone, s).map_err(|e| {
64 ArrowError::JsonError(format!(
65 "failed to parse \"{s}\" as {}: {}",
66 self.data_type, e
67 ))
68 })?;
69
70 let value = match P::UNIT {
71 TimeUnit::Second => date.timestamp(),
72 TimeUnit::Millisecond => date.timestamp_millis(),
73 TimeUnit::Microsecond => date.timestamp_micros(),
74 TimeUnit::Nanosecond => date.timestamp_nanos_opt().ok_or_else(|| {
75 ArrowError::ParseError(format!(
76 "{} would overflow 64-bit signed nanoseconds",
77 date.to_rfc3339(),
78 ))
79 })?,
80 };
81 builder.append_value(value)
82 }
83 TapeElement::Number(idx) => {
84 let s = tape.get_string(idx);
85 let b = s.as_bytes();
86 let value = lexical_core::parse::<i64>(b)
87 .or_else(|_| lexical_core::parse::<f64>(b).map(|x| x as i64))
88 .map_err(|_| {
89 ArrowError::JsonError(format!(
90 "failed to parse {s} as {}",
91 self.data_type
92 ))
93 })?;
94
95 builder.append_value(value)
96 }
97 TapeElement::I32(v) => builder.append_value(v as i64),
98 TapeElement::I64(high) => match tape.get(p + 1) {
99 TapeElement::I32(low) => {
100 builder.append_value(((high as i64) << 32) | (low as u32) as i64)
101 }
102 _ => unreachable!(),
103 },
104 _ => return Err(tape.error(*p, "primitive")),
105 }
106 }
107
108 Ok(builder.finish().into_data())
109 }
110}