1use std::io::Write;
18use std::sync::Arc;
19
20use crate::StructMode;
21use arrow_array::cast::AsArray;
22use arrow_array::types::*;
23use arrow_array::*;
24use arrow_buffer::{ArrowNativeType, NullBuffer, OffsetBuffer, ScalarBuffer};
25use arrow_cast::display::{ArrayFormatter, FormatOptions};
26use arrow_schema::{ArrowError, DataType, FieldRef};
27use half::f16;
28use lexical_core::FormattedSize;
29use serde_core::Serializer;
30
31#[derive(Debug, Clone, Default)]
33pub struct EncoderOptions {
34 explicit_nulls: bool,
36 struct_mode: StructMode,
38 encoder_factory: Option<Arc<dyn EncoderFactory>>,
40 date_format: Option<String>,
42 datetime_format: Option<String>,
44 timestamp_format: Option<String>,
46 timestamp_tz_format: Option<String>,
48 time_format: Option<String>,
50}
51
52impl EncoderOptions {
53 pub fn with_explicit_nulls(mut self, explicit_nulls: bool) -> Self {
55 self.explicit_nulls = explicit_nulls;
56 self
57 }
58
59 pub fn with_struct_mode(mut self, struct_mode: StructMode) -> Self {
61 self.struct_mode = struct_mode;
62 self
63 }
64
65 pub fn with_encoder_factory(mut self, encoder_factory: Arc<dyn EncoderFactory>) -> Self {
67 self.encoder_factory = Some(encoder_factory);
68 self
69 }
70
71 pub fn explicit_nulls(&self) -> bool {
73 self.explicit_nulls
74 }
75
76 pub fn struct_mode(&self) -> StructMode {
78 self.struct_mode
79 }
80
81 pub fn encoder_factory(&self) -> Option<&Arc<dyn EncoderFactory>> {
83 self.encoder_factory.as_ref()
84 }
85
86 pub fn with_date_format(mut self, format: String) -> Self {
88 self.date_format = Some(format);
89 self
90 }
91
92 pub fn date_format(&self) -> Option<&str> {
94 self.date_format.as_deref()
95 }
96
97 pub fn with_datetime_format(mut self, format: String) -> Self {
99 self.datetime_format = Some(format);
100 self
101 }
102
103 pub fn datetime_format(&self) -> Option<&str> {
105 self.datetime_format.as_deref()
106 }
107
108 pub fn with_time_format(mut self, format: String) -> Self {
110 self.time_format = Some(format);
111 self
112 }
113
114 pub fn time_format(&self) -> Option<&str> {
116 self.time_format.as_deref()
117 }
118
119 pub fn with_timestamp_format(mut self, format: String) -> Self {
121 self.timestamp_format = Some(format);
122 self
123 }
124
125 pub fn timestamp_format(&self) -> Option<&str> {
127 self.timestamp_format.as_deref()
128 }
129
130 pub fn with_timestamp_tz_format(mut self, tz_format: String) -> Self {
132 self.timestamp_tz_format = Some(tz_format);
133 self
134 }
135
136 pub fn timestamp_tz_format(&self) -> Option<&str> {
138 self.timestamp_tz_format.as_deref()
139 }
140}
141
142pub trait EncoderFactory: std::fmt::Debug + Send + Sync {
239 fn make_default_encoder<'a>(
247 &self,
248 _field: &'a FieldRef,
249 _array: &'a dyn Array,
250 _options: &'a EncoderOptions,
251 ) -> Result<Option<NullableEncoder<'a>>, ArrowError> {
252 Ok(None)
253 }
254}
255
256pub struct NullableEncoder<'a> {
259 encoder: Box<dyn Encoder + 'a>,
260 nulls: Option<NullBuffer>,
261}
262
263impl<'a> NullableEncoder<'a> {
264 pub fn new(encoder: Box<dyn Encoder + 'a>, nulls: Option<NullBuffer>) -> Self {
266 Self { encoder, nulls }
267 }
268
269 pub fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
271 self.encoder.encode(idx, out)
272 }
273
274 pub fn is_null(&self, idx: usize) -> bool {
276 self.nulls.as_ref().is_some_and(|nulls| nulls.is_null(idx))
277 }
278
279 pub fn has_nulls(&self) -> bool {
281 match self.nulls {
282 Some(ref nulls) => nulls.null_count() > 0,
283 None => false,
284 }
285 }
286}
287
288impl Encoder for NullableEncoder<'_> {
289 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
290 self.encoder.encode(idx, out)
291 }
292}
293
294pub trait Encoder {
298 fn encode(&mut self, idx: usize, out: &mut Vec<u8>);
302}
303
304pub fn make_encoder<'a>(
308 field: &'a FieldRef,
309 array: &'a dyn Array,
310 options: &'a EncoderOptions,
311) -> Result<NullableEncoder<'a>, ArrowError> {
312 macro_rules! primitive_helper {
313 ($t:ty) => {{
314 let array = array.as_primitive::<$t>();
315 let nulls = array.nulls().cloned();
316 NullableEncoder::new(Box::new(PrimitiveEncoder::new(array)), nulls)
317 }};
318 }
319
320 if let Some(factory) = options.encoder_factory() {
321 if let Some(encoder) = factory.make_default_encoder(field, array, options)? {
322 return Ok(encoder);
323 }
324 }
325
326 let nulls = array.nulls().cloned();
327 let encoder = downcast_integer! {
328 array.data_type() => (primitive_helper),
329 DataType::Float16 => primitive_helper!(Float16Type),
330 DataType::Float32 => primitive_helper!(Float32Type),
331 DataType::Float64 => primitive_helper!(Float64Type),
332 DataType::Boolean => {
333 let array = array.as_boolean();
334 NullableEncoder::new(Box::new(BooleanEncoder(array)), array.nulls().cloned())
335 }
336 DataType::Null => NullableEncoder::new(Box::new(NullEncoder), array.logical_nulls()),
337 DataType::Utf8 => {
338 let array = array.as_string::<i32>();
339 NullableEncoder::new(Box::new(StringEncoder(array)), array.nulls().cloned())
340 }
341 DataType::LargeUtf8 => {
342 let array = array.as_string::<i64>();
343 NullableEncoder::new(Box::new(StringEncoder(array)), array.nulls().cloned())
344 }
345 DataType::Utf8View => {
346 let array = array.as_string_view();
347 NullableEncoder::new(Box::new(StringViewEncoder(array)), array.nulls().cloned())
348 }
349 DataType::BinaryView => {
350 let array = array.as_binary_view();
351 NullableEncoder::new(Box::new(BinaryViewEncoder(array)), array.nulls().cloned())
352 }
353 DataType::List(_) => {
354 let array = array.as_list::<i32>();
355 NullableEncoder::new(Box::new(ListLikeEncoder::try_new(field, array, options)?), array.nulls().cloned())
356 }
357 DataType::LargeList(_) => {
358 let array = array.as_list::<i64>();
359 NullableEncoder::new(Box::new(ListLikeEncoder::try_new(field, array, options)?), array.nulls().cloned())
360 }
361 DataType::ListView(_) => {
362 let array = array.as_list_view::<i32>();
363 NullableEncoder::new(Box::new(ListLikeEncoder::try_new(field, array, options)?), array.nulls().cloned())
364 }
365 DataType::LargeListView(_) => {
366 let array = array.as_list_view::<i64>();
367 NullableEncoder::new(Box::new(ListLikeEncoder::try_new(field, array, options)?), array.nulls().cloned())
368 }
369 DataType::FixedSizeList(_, _) => {
370 let array = array.as_fixed_size_list();
371 NullableEncoder::new(Box::new(ListLikeEncoder::try_new(field, array, options)?), array.nulls().cloned())
372 }
373
374 DataType::Dictionary(_, _) => downcast_dictionary_array! {
375 array => {
376 NullableEncoder::new(Box::new(DictionaryEncoder::try_new(field, array, options)?), array.nulls().cloned())
377 },
378 _ => unreachable!()
379 }
380
381 DataType::RunEndEncoded(_, _) => downcast_run_array! {
382 array => {
383 NullableEncoder::new(
384 Box::new(RunEndEncodedEncoder::try_new(field, array, options)?),
385 array.logical_nulls(),
386 )
387 },
388 _ => unreachable!()
389 }
390
391 DataType::Map(_, _) => {
392 let array = array.as_map();
393 NullableEncoder::new(Box::new(MapEncoder::try_new(field, array, options)?), array.nulls().cloned())
394 }
395
396 DataType::FixedSizeBinary(_) => {
397 let array = array.as_fixed_size_binary();
398 NullableEncoder::new(Box::new(BinaryEncoder::new(array)) as _, array.nulls().cloned())
399 }
400
401 DataType::Binary => {
402 let array: &BinaryArray = array.as_binary();
403 NullableEncoder::new(Box::new(BinaryEncoder::new(array)), array.nulls().cloned())
404 }
405
406 DataType::LargeBinary => {
407 let array: &LargeBinaryArray = array.as_binary();
408 NullableEncoder::new(Box::new(BinaryEncoder::new(array)), array.nulls().cloned())
409 }
410
411 DataType::Struct(fields) => {
412 let array = array.as_struct();
413 let encoders = fields.iter().zip(array.columns()).map(|(field, array)| {
414 let encoder = make_encoder(field, array, options)?;
415 Ok(FieldEncoder{
416 field: field.clone(),
417 encoder,
418 })
419 }).collect::<Result<Vec<_>, ArrowError>>()?;
420
421 let encoder = StructArrayEncoder{
422 encoders,
423 explicit_nulls: options.explicit_nulls(),
424 struct_mode: options.struct_mode(),
425 };
426 let nulls = array.nulls().cloned();
427 NullableEncoder::new(Box::new(encoder) as Box<dyn Encoder + 'a>, nulls)
428 }
429 DataType::Decimal32(_, _) | DataType::Decimal64(_, _) | DataType::Decimal128(_, _) | DataType::Decimal256(_, _) => {
430 let options = FormatOptions::new().with_display_error(true);
431 let formatter = JsonArrayFormatter::new(ArrayFormatter::try_new(array, &options)?);
432 NullableEncoder::new(Box::new(RawArrayFormatter(formatter)) as Box<dyn Encoder + 'a>, nulls)
433 }
434 d => match d.is_temporal() {
435 true => {
436 let fops = FormatOptions::new().with_display_error(true)
441 .with_date_format(options.date_format.as_deref())
442 .with_datetime_format(options.datetime_format.as_deref())
443 .with_timestamp_format(options.timestamp_format.as_deref())
444 .with_timestamp_tz_format(options.timestamp_tz_format.as_deref())
445 .with_time_format(options.time_format.as_deref());
446
447 let formatter = ArrayFormatter::try_new(array, &fops)?;
448 let formatter = JsonArrayFormatter::new(formatter);
449 NullableEncoder::new(Box::new(formatter) as Box<dyn Encoder + 'a>, nulls)
450 }
451 false => return Err(ArrowError::JsonError(format!(
452 "Unsupported data type for JSON encoding: {d:?}",
453 )))
454 }
455 };
456
457 Ok(encoder)
458}
459
460fn encode_string(s: &str, out: &mut Vec<u8>) {
461 let mut serializer = serde_json::Serializer::new(out);
462 serializer.serialize_str(s).unwrap();
463}
464
465fn encode_binary(bytes: &[u8], out: &mut Vec<u8>) {
466 out.push(b'"');
467 for byte in bytes {
468 write!(out, "{byte:02x}").unwrap();
469 }
470 out.push(b'"');
471}
472
473struct FieldEncoder<'a> {
474 field: FieldRef,
475 encoder: NullableEncoder<'a>,
476}
477
478impl FieldEncoder<'_> {
479 fn is_null(&self, idx: usize) -> bool {
480 self.encoder.is_null(idx)
481 }
482}
483
484struct StructArrayEncoder<'a> {
485 encoders: Vec<FieldEncoder<'a>>,
486 explicit_nulls: bool,
487 struct_mode: StructMode,
488}
489
490impl Encoder for StructArrayEncoder<'_> {
491 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
492 match self.struct_mode {
493 StructMode::ObjectOnly => out.push(b'{'),
494 StructMode::ListOnly => out.push(b'['),
495 }
496 let mut is_first = true;
497 let drop_nulls = (self.struct_mode == StructMode::ObjectOnly) && !self.explicit_nulls;
499
500 for field_encoder in self.encoders.iter_mut() {
501 let is_null = field_encoder.is_null(idx);
502 if is_null && drop_nulls {
503 continue;
504 }
505
506 if !is_first {
507 out.push(b',');
508 }
509 is_first = false;
510
511 if self.struct_mode == StructMode::ObjectOnly {
512 encode_string(field_encoder.field.name(), out);
513 out.push(b':');
514 }
515
516 if is_null {
517 out.extend_from_slice(b"null");
518 } else {
519 field_encoder.encoder.encode(idx, out);
520 }
521 }
522 match self.struct_mode {
523 StructMode::ObjectOnly => out.push(b'}'),
524 StructMode::ListOnly => out.push(b']'),
525 }
526 }
527}
528
529trait PrimitiveEncode: ArrowNativeType {
530 type Buffer;
531
532 fn init_buffer() -> Self::Buffer;
534
535 fn encode(self, buf: &mut Self::Buffer) -> &[u8];
539}
540
541macro_rules! integer_encode {
542 ($($t:ty),*) => {
543 $(
544 impl PrimitiveEncode for $t {
545 type Buffer = [u8; Self::FORMATTED_SIZE];
546
547 fn init_buffer() -> Self::Buffer {
548 [0; Self::FORMATTED_SIZE]
549 }
550
551 fn encode(self, buf: &mut Self::Buffer) -> &[u8] {
552 lexical_core::write(self, buf)
553 }
554 }
555 )*
556 };
557}
558integer_encode!(i8, i16, i32, i64, u8, u16, u32, u64);
559
560macro_rules! float_encode {
561 ($($t:ty),*) => {
562 $(
563 impl PrimitiveEncode for $t {
564 type Buffer = [u8; Self::FORMATTED_SIZE];
565
566 fn init_buffer() -> Self::Buffer {
567 [0; Self::FORMATTED_SIZE]
568 }
569
570 fn encode(self, buf: &mut Self::Buffer) -> &[u8] {
571 if self.is_infinite() || self.is_nan() {
572 b"null"
573 } else {
574 lexical_core::write(self, buf)
575 }
576 }
577 }
578 )*
579 };
580}
581float_encode!(f32, f64);
582
583impl PrimitiveEncode for f16 {
584 type Buffer = <f32 as PrimitiveEncode>::Buffer;
585
586 fn init_buffer() -> Self::Buffer {
587 f32::init_buffer()
588 }
589
590 fn encode(self, buf: &mut Self::Buffer) -> &[u8] {
591 self.to_f32().encode(buf)
592 }
593}
594
595struct PrimitiveEncoder<N: PrimitiveEncode> {
596 values: ScalarBuffer<N>,
597 buffer: N::Buffer,
598}
599
600impl<N: PrimitiveEncode> PrimitiveEncoder<N> {
601 fn new<P: ArrowPrimitiveType<Native = N>>(array: &PrimitiveArray<P>) -> Self {
602 Self {
603 values: array.values().clone(),
604 buffer: N::init_buffer(),
605 }
606 }
607}
608
609impl<N: PrimitiveEncode> Encoder for PrimitiveEncoder<N> {
610 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
611 out.extend_from_slice(self.values[idx].encode(&mut self.buffer));
612 }
613}
614
615struct BooleanEncoder<'a>(&'a BooleanArray);
616
617impl Encoder for BooleanEncoder<'_> {
618 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
619 match self.0.value(idx) {
620 true => out.extend_from_slice(b"true"),
621 false => out.extend_from_slice(b"false"),
622 }
623 }
624}
625
626struct StringEncoder<'a, O: OffsetSizeTrait>(&'a GenericStringArray<O>);
627
628impl<O: OffsetSizeTrait> Encoder for StringEncoder<'_, O> {
629 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
630 encode_string(self.0.value(idx), out);
631 }
632}
633
634struct StringViewEncoder<'a>(&'a StringViewArray);
635
636impl Encoder for StringViewEncoder<'_> {
637 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
638 encode_string(self.0.value(idx), out);
639 }
640}
641
642struct BinaryViewEncoder<'a>(&'a BinaryViewArray);
643
644impl Encoder for BinaryViewEncoder<'_> {
645 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
646 encode_binary(self.0.value(idx), out);
647 }
648}
649
650struct ListLikeEncoder<'a, L: ListLikeArray> {
651 list_array: &'a L,
652 encoder: NullableEncoder<'a>,
653}
654
655impl<'a, L: ListLikeArray> ListLikeEncoder<'a, L> {
656 fn try_new(
657 field: &'a FieldRef,
658 array: &'a L,
659 options: &'a EncoderOptions,
660 ) -> Result<Self, ArrowError> {
661 let encoder = make_encoder(field, array.values().as_ref(), options)?;
662 Ok(Self {
663 list_array: array,
664 encoder,
665 })
666 }
667}
668
669impl<L: ListLikeArray> Encoder for ListLikeEncoder<'_, L> {
670 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
671 let range = self.list_array.element_range(idx);
672 let start = range.start;
673 let end = range.end;
674 out.push(b'[');
675 if self.encoder.has_nulls() {
676 for idx in start..end {
677 if idx != start {
678 out.push(b',')
679 }
680 if self.encoder.is_null(idx) {
681 out.extend_from_slice(b"null");
682 } else {
683 self.encoder.encode(idx, out);
684 }
685 }
686 } else {
687 for idx in start..end {
688 if idx != start {
689 out.push(b',')
690 }
691 self.encoder.encode(idx, out);
692 }
693 }
694 out.push(b']');
695 }
696}
697
698struct DictionaryEncoder<'a, K: ArrowDictionaryKeyType> {
699 keys: ScalarBuffer<K::Native>,
700 encoder: NullableEncoder<'a>,
701}
702
703impl<'a, K: ArrowDictionaryKeyType> DictionaryEncoder<'a, K> {
704 fn try_new(
705 field: &'a FieldRef,
706 array: &'a DictionaryArray<K>,
707 options: &'a EncoderOptions,
708 ) -> Result<Self, ArrowError> {
709 let encoder = make_encoder(field, array.values().as_ref(), options)?;
710
711 Ok(Self {
712 keys: array.keys().values().clone(),
713 encoder,
714 })
715 }
716}
717
718impl<K: ArrowDictionaryKeyType> Encoder for DictionaryEncoder<'_, K> {
719 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
720 self.encoder.encode(self.keys[idx].as_usize(), out)
721 }
722}
723
724struct RunEndEncodedEncoder<'a, R: RunEndIndexType> {
725 run_array: &'a RunArray<R>,
726 encoder: NullableEncoder<'a>,
727}
728
729impl<'a, R: RunEndIndexType> RunEndEncodedEncoder<'a, R> {
730 fn try_new(
731 field: &'a FieldRef,
732 array: &'a RunArray<R>,
733 options: &'a EncoderOptions,
734 ) -> Result<Self, ArrowError> {
735 let encoder = make_encoder(field, array.values().as_ref(), options)?;
736 Ok(Self {
737 run_array: array,
738 encoder,
739 })
740 }
741}
742
743impl<R: RunEndIndexType> Encoder for RunEndEncodedEncoder<'_, R> {
744 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
745 let physical_idx = self.run_array.get_physical_index(idx);
746 self.encoder.encode(physical_idx, out)
747 }
748}
749
750struct JsonArrayFormatter<'a> {
752 formatter: ArrayFormatter<'a>,
753}
754
755impl<'a> JsonArrayFormatter<'a> {
756 fn new(formatter: ArrayFormatter<'a>) -> Self {
757 Self { formatter }
758 }
759}
760
761impl Encoder for JsonArrayFormatter<'_> {
762 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
763 out.push(b'"');
764 let _ = write!(out, "{}", self.formatter.value(idx));
767 out.push(b'"')
768 }
769}
770
771struct RawArrayFormatter<'a>(JsonArrayFormatter<'a>);
773
774impl Encoder for RawArrayFormatter<'_> {
775 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
776 let _ = write!(out, "{}", self.0.formatter.value(idx));
777 }
778}
779
780struct NullEncoder;
781
782impl Encoder for NullEncoder {
783 fn encode(&mut self, _idx: usize, _out: &mut Vec<u8>) {
784 unreachable!()
785 }
786}
787
788struct MapEncoder<'a> {
789 offsets: OffsetBuffer<i32>,
790 keys: NullableEncoder<'a>,
791 values: NullableEncoder<'a>,
792 explicit_nulls: bool,
793}
794
795impl<'a> MapEncoder<'a> {
796 fn try_new(
797 field: &'a FieldRef,
798 array: &'a MapArray,
799 options: &'a EncoderOptions,
800 ) -> Result<Self, ArrowError> {
801 let values = array.values();
802 let keys = array.keys();
803
804 if !matches!(
805 keys.data_type(),
806 DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View
807 ) {
808 return Err(ArrowError::JsonError(format!(
809 "Only UTF8 keys supported by JSON MapArray Writer: got {:?}",
810 keys.data_type()
811 )));
812 }
813
814 let keys = make_encoder(field, keys, options)?;
815 let values = make_encoder(field, values, options)?;
816
817 if keys.has_nulls() {
819 return Err(ArrowError::InvalidArgumentError(
820 "Encountered nulls in MapArray keys".to_string(),
821 ));
822 }
823
824 if array.entries().nulls().is_some_and(|x| x.null_count() != 0) {
825 return Err(ArrowError::InvalidArgumentError(
826 "Encountered nulls in MapArray entries".to_string(),
827 ));
828 }
829
830 Ok(Self {
831 offsets: array.offsets().clone(),
832 keys,
833 values,
834 explicit_nulls: options.explicit_nulls(),
835 })
836 }
837}
838
839impl Encoder for MapEncoder<'_> {
840 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
841 let end = self.offsets[idx + 1].as_usize();
842 let start = self.offsets[idx].as_usize();
843
844 let mut is_first = true;
845
846 out.push(b'{');
847
848 for idx in start..end {
849 let is_null = self.values.is_null(idx);
850 if is_null && !self.explicit_nulls {
851 continue;
852 }
853
854 if !is_first {
855 out.push(b',');
856 }
857 is_first = false;
858
859 self.keys.encode(idx, out);
860 out.push(b':');
861
862 if is_null {
863 out.extend_from_slice(b"null");
864 } else {
865 self.values.encode(idx, out);
866 }
867 }
868 out.push(b'}');
869 }
870}
871
872struct BinaryEncoder<B>(B);
875
876impl<'a, B> BinaryEncoder<B>
877where
878 B: ArrayAccessor<Item = &'a [u8]>,
879{
880 fn new(array: B) -> Self {
881 Self(array)
882 }
883}
884
885impl<'a, B> Encoder for BinaryEncoder<B>
886where
887 B: ArrayAccessor<Item = &'a [u8]>,
888{
889 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
890 out.push(b'"');
891 for byte in self.0.value(idx) {
892 write!(out, "{byte:02x}").unwrap();
894 }
895 out.push(b'"');
896 }
897}