arrow_json/reader/
timestamp_array.rs1use std::marker::PhantomData;
19use std::sync::Arc;
20
21use arrow_array::ArrayRef;
22use arrow_array::builder::PrimitiveBuilder;
23use arrow_array::types::ArrowTimestampType;
24use arrow_cast::parse::string_to_datetime;
25use arrow_schema::{ArrowError, DataType, TimeUnit};
26use chrono::TimeZone;
27
28use crate::reader::tape::{Tape, TapeElement};
29use crate::reader::{ArrayDecoder, DecoderContext};
30
31pub struct TimestampArrayDecoder<P: ArrowTimestampType, Tz: TimeZone> {
33 data_type: DataType,
34 timezone: Tz,
35 ignore_type_conflicts: bool,
36 phantom: PhantomData<fn(P) -> P>,
38}
39
40impl<P: ArrowTimestampType, Tz: TimeZone> TimestampArrayDecoder<P, Tz> {
41 pub fn new(ctx: &DecoderContext, data_type: &DataType, timezone: Tz) -> Self {
42 Self {
43 data_type: data_type.clone(),
44 timezone,
45 ignore_type_conflicts: ctx.ignore_type_conflicts(),
46 phantom: Default::default(),
47 }
48 }
49}
50
51impl<P, Tz> ArrayDecoder for TimestampArrayDecoder<P, Tz>
52where
53 P: ArrowTimestampType,
54 Tz: TimeZone + Send,
55{
56 fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayRef, ArrowError> {
57 let mut builder =
58 PrimitiveBuilder::<P>::with_capacity(pos.len()).with_data_type(self.data_type.clone());
59 for p in pos {
60 let value = match tape.get(*p) {
61 TapeElement::Null => {
62 builder.append_null();
63 continue;
64 }
65 TapeElement::String(idx) => {
66 let s = tape.get_string(idx);
67 let date = string_to_datetime(&self.timezone, s).map_err(|e| {
68 ArrowError::JsonError(format!(
69 "failed to parse \"{s}\" as {}: {}",
70 self.data_type, e
71 ))
72 });
73
74 date.and_then(|date| match P::UNIT {
75 TimeUnit::Second => Ok(date.timestamp()),
76 TimeUnit::Millisecond => Ok(date.timestamp_millis()),
77 TimeUnit::Microsecond => Ok(date.timestamp_micros()),
78 TimeUnit::Nanosecond => date.timestamp_nanos_opt().ok_or_else(|| {
79 ArrowError::ParseError(format!(
80 "{} would overflow 64-bit signed nanoseconds",
81 date.to_rfc3339(),
82 ))
83 }),
84 })
85 }
86 TapeElement::Number(idx) => {
87 let s = tape.get_string(idx);
88 let b = s.as_bytes();
89 lexical_core::parse::<i64>(b)
90 .or_else(|_| lexical_core::parse::<f64>(b).map(|x| x as i64))
91 .map_err(|_| {
92 ArrowError::JsonError(format!(
93 "failed to parse {s} as {}",
94 self.data_type
95 ))
96 })
97 }
98 TapeElement::I32(v) => Ok(v as i64),
99 TapeElement::I64(high) => match tape.get(p + 1) {
100 TapeElement::I32(low) => Ok(((high as i64) << 32) | (low as u32) as i64),
101 _ => unreachable!(),
102 },
103 _ => Err(tape.error(*p, "primitive")),
104 };
105
106 match value {
107 Ok(value) => builder.append_value(value),
108 Err(_) if self.ignore_type_conflicts => builder.append_null(),
109 Err(e) => return Err(e),
110 }
111 }
112
113 Ok(Arc::new(builder.finish()))
114 }
115}