1use crate::StructMode;
19use arrow_array::cast::AsArray;
20use arrow_array::types::*;
21use arrow_array::*;
22use arrow_buffer::{ArrowNativeType, NullBuffer, OffsetBuffer, ScalarBuffer};
23use arrow_cast::display::{ArrayFormatter, FormatOptions};
24use arrow_schema::{ArrowError, DataType, FieldRef};
25use half::f16;
26use lexical_core::FormattedSize;
27use serde::Serializer;
28use std::io::Write;
29
30#[derive(Debug, Clone, Default)]
31pub struct EncoderOptions {
32 pub explicit_nulls: bool,
33 pub struct_mode: StructMode,
34}
35
36pub trait Encoder {
40 fn encode(&mut self, idx: usize, out: &mut Vec<u8>);
44}
45
46pub fn make_encoder<'a>(
47 array: &'a dyn Array,
48 options: &EncoderOptions,
49) -> Result<Box<dyn Encoder + 'a>, ArrowError> {
50 let (encoder, nulls) = make_encoder_impl(array, options)?;
51 assert!(nulls.is_none(), "root cannot be nullable");
52 Ok(encoder)
53}
54
55fn make_encoder_impl<'a>(
56 array: &'a dyn Array,
57 options: &EncoderOptions,
58) -> Result<(Box<dyn Encoder + 'a>, Option<NullBuffer>), ArrowError> {
59 macro_rules! primitive_helper {
60 ($t:ty) => {{
61 let array = array.as_primitive::<$t>();
62 let nulls = array.nulls().cloned();
63 (Box::new(PrimitiveEncoder::new(array)) as _, nulls)
64 }};
65 }
66
67 Ok(downcast_integer! {
68 array.data_type() => (primitive_helper),
69 DataType::Float16 => primitive_helper!(Float16Type),
70 DataType::Float32 => primitive_helper!(Float32Type),
71 DataType::Float64 => primitive_helper!(Float64Type),
72 DataType::Boolean => {
73 let array = array.as_boolean();
74 (Box::new(BooleanEncoder(array)), array.nulls().cloned())
75 }
76 DataType::Null => (Box::new(NullEncoder), array.logical_nulls()),
77 DataType::Utf8 => {
78 let array = array.as_string::<i32>();
79 (Box::new(StringEncoder(array)) as _, array.nulls().cloned())
80 }
81 DataType::LargeUtf8 => {
82 let array = array.as_string::<i64>();
83 (Box::new(StringEncoder(array)) as _, array.nulls().cloned())
84 }
85 DataType::Utf8View => {
86 let array = array.as_string_view();
87 (Box::new(StringViewEncoder(array)) as _, array.nulls().cloned())
88 }
89 DataType::List(_) => {
90 let array = array.as_list::<i32>();
91 (Box::new(ListEncoder::try_new(array, options)?) as _, array.nulls().cloned())
92 }
93 DataType::LargeList(_) => {
94 let array = array.as_list::<i64>();
95 (Box::new(ListEncoder::try_new(array, options)?) as _, array.nulls().cloned())
96 }
97 DataType::FixedSizeList(_, _) => {
98 let array = array.as_fixed_size_list();
99 (Box::new(FixedSizeListEncoder::try_new(array, options)?) as _, array.nulls().cloned())
100 }
101
102 DataType::Dictionary(_, _) => downcast_dictionary_array! {
103 array => (Box::new(DictionaryEncoder::try_new(array, options)?) as _, array.logical_nulls()),
104 _ => unreachable!()
105 }
106
107 DataType::Map(_, _) => {
108 let array = array.as_map();
109 (Box::new(MapEncoder::try_new(array, options)?) as _, array.nulls().cloned())
110 }
111
112 DataType::FixedSizeBinary(_) => {
113 let array = array.as_fixed_size_binary();
114 (Box::new(BinaryEncoder::new(array)) as _, array.nulls().cloned())
115 }
116
117 DataType::Binary => {
118 let array: &BinaryArray = array.as_binary();
119 (Box::new(BinaryEncoder::new(array)) as _, array.nulls().cloned())
120 }
121
122 DataType::LargeBinary => {
123 let array: &LargeBinaryArray = array.as_binary();
124 (Box::new(BinaryEncoder::new(array)) as _, array.nulls().cloned())
125 }
126
127 DataType::Struct(fields) => {
128 let array = array.as_struct();
129 let encoders = fields.iter().zip(array.columns()).map(|(field, array)| {
130 let (encoder, nulls) = make_encoder_impl(array, options)?;
131 Ok(FieldEncoder{
132 field: field.clone(),
133 encoder, nulls
134 })
135 }).collect::<Result<Vec<_>, ArrowError>>()?;
136
137 let encoder = StructArrayEncoder{
138 encoders,
139 explicit_nulls: options.explicit_nulls,
140 struct_mode: options.struct_mode,
141 };
142 (Box::new(encoder) as _, array.nulls().cloned())
143 }
144 DataType::Decimal128(_, _) | DataType::Decimal256(_, _) => {
145 let options = FormatOptions::new().with_display_error(true);
146 let formatter = ArrayFormatter::try_new(array, &options)?;
147 (Box::new(RawArrayFormatter(formatter)) as _, array.nulls().cloned())
148 }
149 d => match d.is_temporal() {
150 true => {
151 let options = FormatOptions::new().with_display_error(true);
156 let formatter = ArrayFormatter::try_new(array, &options)?;
157 (Box::new(formatter) as _, array.nulls().cloned())
158 }
159 false => return Err(ArrowError::InvalidArgumentError(format!("JSON Writer does not support data type: {d}"))),
160 }
161 })
162}
163
164fn encode_string(s: &str, out: &mut Vec<u8>) {
165 let mut serializer = serde_json::Serializer::new(out);
166 serializer.serialize_str(s).unwrap();
167}
168
169struct FieldEncoder<'a> {
170 field: FieldRef,
171 encoder: Box<dyn Encoder + 'a>,
172 nulls: Option<NullBuffer>,
173}
174
175struct StructArrayEncoder<'a> {
176 encoders: Vec<FieldEncoder<'a>>,
177 explicit_nulls: bool,
178 struct_mode: StructMode,
179}
180
181#[inline(always)]
183fn is_some_and<T>(opt: Option<T>, f: impl FnOnce(T) -> bool) -> bool {
184 match opt {
185 None => false,
186 Some(x) => f(x),
187 }
188}
189
190impl Encoder for StructArrayEncoder<'_> {
191 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
192 match self.struct_mode {
193 StructMode::ObjectOnly => out.push(b'{'),
194 StructMode::ListOnly => out.push(b'['),
195 }
196 let mut is_first = true;
197 let drop_nulls = (self.struct_mode == StructMode::ObjectOnly) && !self.explicit_nulls;
199 for field_encoder in &mut self.encoders {
200 let is_null = is_some_and(field_encoder.nulls.as_ref(), |n| n.is_null(idx));
201 if drop_nulls && is_null {
202 continue;
203 }
204
205 if !is_first {
206 out.push(b',');
207 }
208 is_first = false;
209
210 if self.struct_mode == StructMode::ObjectOnly {
211 encode_string(field_encoder.field.name(), out);
212 out.push(b':');
213 }
214
215 match is_null {
216 true => out.extend_from_slice(b"null"),
217 false => field_encoder.encoder.encode(idx, out),
218 }
219 }
220 match self.struct_mode {
221 StructMode::ObjectOnly => out.push(b'}'),
222 StructMode::ListOnly => out.push(b']'),
223 }
224 }
225}
226
227trait PrimitiveEncode: ArrowNativeType {
228 type Buffer;
229
230 fn init_buffer() -> Self::Buffer;
232
233 fn encode(self, buf: &mut Self::Buffer) -> &[u8];
237}
238
239macro_rules! integer_encode {
240 ($($t:ty),*) => {
241 $(
242 impl PrimitiveEncode for $t {
243 type Buffer = [u8; Self::FORMATTED_SIZE];
244
245 fn init_buffer() -> Self::Buffer {
246 [0; Self::FORMATTED_SIZE]
247 }
248
249 fn encode(self, buf: &mut Self::Buffer) -> &[u8] {
250 lexical_core::write(self, buf)
251 }
252 }
253 )*
254 };
255}
256integer_encode!(i8, i16, i32, i64, u8, u16, u32, u64);
257
258macro_rules! float_encode {
259 ($($t:ty),*) => {
260 $(
261 impl PrimitiveEncode for $t {
262 type Buffer = [u8; Self::FORMATTED_SIZE];
263
264 fn init_buffer() -> Self::Buffer {
265 [0; Self::FORMATTED_SIZE]
266 }
267
268 fn encode(self, buf: &mut Self::Buffer) -> &[u8] {
269 if self.is_infinite() || self.is_nan() {
270 b"null"
271 } else {
272 lexical_core::write(self, buf)
273 }
274 }
275 }
276 )*
277 };
278}
279float_encode!(f32, f64);
280
281impl PrimitiveEncode for f16 {
282 type Buffer = <f32 as PrimitiveEncode>::Buffer;
283
284 fn init_buffer() -> Self::Buffer {
285 f32::init_buffer()
286 }
287
288 fn encode(self, buf: &mut Self::Buffer) -> &[u8] {
289 self.to_f32().encode(buf)
290 }
291}
292
293struct PrimitiveEncoder<N: PrimitiveEncode> {
294 values: ScalarBuffer<N>,
295 buffer: N::Buffer,
296}
297
298impl<N: PrimitiveEncode> PrimitiveEncoder<N> {
299 fn new<P: ArrowPrimitiveType<Native = N>>(array: &PrimitiveArray<P>) -> Self {
300 Self {
301 values: array.values().clone(),
302 buffer: N::init_buffer(),
303 }
304 }
305}
306
307impl<N: PrimitiveEncode> Encoder for PrimitiveEncoder<N> {
308 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
309 out.extend_from_slice(self.values[idx].encode(&mut self.buffer));
310 }
311}
312
313struct BooleanEncoder<'a>(&'a BooleanArray);
314
315impl Encoder for BooleanEncoder<'_> {
316 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
317 match self.0.value(idx) {
318 true => out.extend_from_slice(b"true"),
319 false => out.extend_from_slice(b"false"),
320 }
321 }
322}
323
324struct StringEncoder<'a, O: OffsetSizeTrait>(&'a GenericStringArray<O>);
325
326impl<O: OffsetSizeTrait> Encoder for StringEncoder<'_, O> {
327 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
328 encode_string(self.0.value(idx), out);
329 }
330}
331
332struct StringViewEncoder<'a>(&'a StringViewArray);
333
334impl Encoder for StringViewEncoder<'_> {
335 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
336 encode_string(self.0.value(idx), out);
337 }
338}
339
340struct ListEncoder<'a, O: OffsetSizeTrait> {
341 offsets: OffsetBuffer<O>,
342 nulls: Option<NullBuffer>,
343 encoder: Box<dyn Encoder + 'a>,
344}
345
346impl<'a, O: OffsetSizeTrait> ListEncoder<'a, O> {
347 fn try_new(
348 array: &'a GenericListArray<O>,
349 options: &EncoderOptions,
350 ) -> Result<Self, ArrowError> {
351 let (encoder, nulls) = make_encoder_impl(array.values().as_ref(), options)?;
352 Ok(Self {
353 offsets: array.offsets().clone(),
354 encoder,
355 nulls,
356 })
357 }
358}
359
360impl<O: OffsetSizeTrait> Encoder for ListEncoder<'_, O> {
361 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
362 let end = self.offsets[idx + 1].as_usize();
363 let start = self.offsets[idx].as_usize();
364 out.push(b'[');
365 match self.nulls.as_ref() {
366 Some(n) => (start..end).for_each(|idx| {
367 if idx != start {
368 out.push(b',')
369 }
370 match n.is_null(idx) {
371 true => out.extend_from_slice(b"null"),
372 false => self.encoder.encode(idx, out),
373 }
374 }),
375 None => (start..end).for_each(|idx| {
376 if idx != start {
377 out.push(b',')
378 }
379 self.encoder.encode(idx, out);
380 }),
381 }
382 out.push(b']');
383 }
384}
385
386struct FixedSizeListEncoder<'a> {
387 value_length: usize,
388 nulls: Option<NullBuffer>,
389 encoder: Box<dyn Encoder + 'a>,
390}
391
392impl<'a> FixedSizeListEncoder<'a> {
393 fn try_new(
394 array: &'a FixedSizeListArray,
395 options: &EncoderOptions,
396 ) -> Result<Self, ArrowError> {
397 let (encoder, nulls) = make_encoder_impl(array.values().as_ref(), options)?;
398 Ok(Self {
399 encoder,
400 nulls,
401 value_length: array.value_length().as_usize(),
402 })
403 }
404}
405
406impl Encoder for FixedSizeListEncoder<'_> {
407 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
408 let start = idx * self.value_length;
409 let end = start + self.value_length;
410 out.push(b'[');
411 match self.nulls.as_ref() {
412 Some(n) => (start..end).for_each(|idx| {
413 if idx != start {
414 out.push(b',');
415 }
416 if n.is_null(idx) {
417 out.extend_from_slice(b"null");
418 } else {
419 self.encoder.encode(idx, out);
420 }
421 }),
422 None => (start..end).for_each(|idx| {
423 if idx != start {
424 out.push(b',');
425 }
426 self.encoder.encode(idx, out);
427 }),
428 }
429 out.push(b']');
430 }
431}
432
433struct DictionaryEncoder<'a, K: ArrowDictionaryKeyType> {
434 keys: ScalarBuffer<K::Native>,
435 encoder: Box<dyn Encoder + 'a>,
436}
437
438impl<'a, K: ArrowDictionaryKeyType> DictionaryEncoder<'a, K> {
439 fn try_new(
440 array: &'a DictionaryArray<K>,
441 options: &EncoderOptions,
442 ) -> Result<Self, ArrowError> {
443 let (encoder, _) = make_encoder_impl(array.values().as_ref(), options)?;
444
445 Ok(Self {
446 keys: array.keys().values().clone(),
447 encoder,
448 })
449 }
450}
451
452impl<K: ArrowDictionaryKeyType> Encoder for DictionaryEncoder<'_, K> {
453 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
454 self.encoder.encode(self.keys[idx].as_usize(), out)
455 }
456}
457
458impl Encoder for ArrayFormatter<'_> {
459 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
460 out.push(b'"');
461 let _ = write!(out, "{}", self.value(idx));
464 out.push(b'"')
465 }
466}
467
468struct RawArrayFormatter<'a>(ArrayFormatter<'a>);
470
471impl Encoder for RawArrayFormatter<'_> {
472 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
473 let _ = write!(out, "{}", self.0.value(idx));
474 }
475}
476
477struct NullEncoder;
478
479impl Encoder for NullEncoder {
480 fn encode(&mut self, _idx: usize, _out: &mut Vec<u8>) {
481 unreachable!()
482 }
483}
484
485struct MapEncoder<'a> {
486 offsets: OffsetBuffer<i32>,
487 keys: Box<dyn Encoder + 'a>,
488 values: Box<dyn Encoder + 'a>,
489 value_nulls: Option<NullBuffer>,
490 explicit_nulls: bool,
491}
492
493impl<'a> MapEncoder<'a> {
494 fn try_new(array: &'a MapArray, options: &EncoderOptions) -> Result<Self, ArrowError> {
495 let values = array.values();
496 let keys = array.keys();
497
498 if !matches!(keys.data_type(), DataType::Utf8 | DataType::LargeUtf8) {
499 return Err(ArrowError::JsonError(format!(
500 "Only UTF8 keys supported by JSON MapArray Writer: got {:?}",
501 keys.data_type()
502 )));
503 }
504
505 let (keys, key_nulls) = make_encoder_impl(keys, options)?;
506 let (values, value_nulls) = make_encoder_impl(values, options)?;
507
508 if is_some_and(key_nulls, |x| x.null_count() != 0) {
510 return Err(ArrowError::InvalidArgumentError(
511 "Encountered nulls in MapArray keys".to_string(),
512 ));
513 }
514
515 if is_some_and(array.entries().nulls(), |x| x.null_count() != 0) {
516 return Err(ArrowError::InvalidArgumentError(
517 "Encountered nulls in MapArray entries".to_string(),
518 ));
519 }
520
521 Ok(Self {
522 offsets: array.offsets().clone(),
523 keys,
524 values,
525 value_nulls,
526 explicit_nulls: options.explicit_nulls,
527 })
528 }
529}
530
531impl Encoder for MapEncoder<'_> {
532 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
533 let end = self.offsets[idx + 1].as_usize();
534 let start = self.offsets[idx].as_usize();
535
536 let mut is_first = true;
537
538 out.push(b'{');
539 for idx in start..end {
540 let is_null = is_some_and(self.value_nulls.as_ref(), |n| n.is_null(idx));
541 if is_null && !self.explicit_nulls {
542 continue;
543 }
544
545 if !is_first {
546 out.push(b',');
547 }
548 is_first = false;
549
550 self.keys.encode(idx, out);
551 out.push(b':');
552
553 match is_null {
554 true => out.extend_from_slice(b"null"),
555 false => self.values.encode(idx, out),
556 }
557 }
558 out.push(b'}');
559 }
560}
561
562struct BinaryEncoder<B>(B);
565
566impl<'a, B> BinaryEncoder<B>
567where
568 B: ArrayAccessor<Item = &'a [u8]>,
569{
570 fn new(array: B) -> Self {
571 Self(array)
572 }
573}
574
575impl<'a, B> Encoder for BinaryEncoder<B>
576where
577 B: ArrayAccessor<Item = &'a [u8]>,
578{
579 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
580 out.push(b'"');
581 for byte in self.0.value(idx) {
582 write!(out, "{byte:02x}").unwrap();
584 }
585 out.push(b'"');
586 }
587}