1use std::io::Write;
18use std::sync::Arc;
19
20use crate::StructMode;
21use arrow_array::cast::AsArray;
22use arrow_array::types::*;
23use arrow_array::*;
24use arrow_buffer::{ArrowNativeType, NullBuffer, OffsetBuffer, ScalarBuffer};
25use arrow_cast::display::{ArrayFormatter, FormatOptions};
26use arrow_schema::{ArrowError, DataType, FieldRef};
27use half::f16;
28use lexical_core::FormattedSize;
29use serde_core::Serializer;
30
31#[derive(Debug, Clone, Default)]
33pub struct EncoderOptions {
34 explicit_nulls: bool,
36 struct_mode: StructMode,
38 encoder_factory: Option<Arc<dyn EncoderFactory>>,
40 date_format: Option<String>,
42 datetime_format: Option<String>,
44 timestamp_format: Option<String>,
46 timestamp_tz_format: Option<String>,
48 time_format: Option<String>,
50}
51
52impl EncoderOptions {
53 pub fn with_explicit_nulls(mut self, explicit_nulls: bool) -> Self {
55 self.explicit_nulls = explicit_nulls;
56 self
57 }
58
59 pub fn with_struct_mode(mut self, struct_mode: StructMode) -> Self {
61 self.struct_mode = struct_mode;
62 self
63 }
64
65 pub fn with_encoder_factory(mut self, encoder_factory: Arc<dyn EncoderFactory>) -> Self {
67 self.encoder_factory = Some(encoder_factory);
68 self
69 }
70
71 pub fn explicit_nulls(&self) -> bool {
73 self.explicit_nulls
74 }
75
76 pub fn struct_mode(&self) -> StructMode {
78 self.struct_mode
79 }
80
81 pub fn encoder_factory(&self) -> Option<&Arc<dyn EncoderFactory>> {
83 self.encoder_factory.as_ref()
84 }
85
86 pub fn with_date_format(mut self, format: String) -> Self {
88 self.date_format = Some(format);
89 self
90 }
91
92 pub fn date_format(&self) -> Option<&str> {
94 self.date_format.as_deref()
95 }
96
97 pub fn with_datetime_format(mut self, format: String) -> Self {
99 self.datetime_format = Some(format);
100 self
101 }
102
103 pub fn datetime_format(&self) -> Option<&str> {
105 self.datetime_format.as_deref()
106 }
107
108 pub fn with_time_format(mut self, format: String) -> Self {
110 self.time_format = Some(format);
111 self
112 }
113
114 pub fn time_format(&self) -> Option<&str> {
116 self.time_format.as_deref()
117 }
118
119 pub fn with_timestamp_format(mut self, format: String) -> Self {
121 self.timestamp_format = Some(format);
122 self
123 }
124
125 pub fn timestamp_format(&self) -> Option<&str> {
127 self.timestamp_format.as_deref()
128 }
129
130 pub fn with_timestamp_tz_format(mut self, tz_format: String) -> Self {
132 self.timestamp_tz_format = Some(tz_format);
133 self
134 }
135
136 pub fn timestamp_tz_format(&self) -> Option<&str> {
138 self.timestamp_tz_format.as_deref()
139 }
140}
141
142pub trait EncoderFactory: std::fmt::Debug + Send + Sync {
239 fn make_default_encoder<'a>(
247 &self,
248 _field: &'a FieldRef,
249 _array: &'a dyn Array,
250 _options: &'a EncoderOptions,
251 ) -> Result<Option<NullableEncoder<'a>>, ArrowError> {
252 Ok(None)
253 }
254}
255
256pub struct NullableEncoder<'a> {
259 encoder: Box<dyn Encoder + 'a>,
260 nulls: Option<NullBuffer>,
261}
262
263impl<'a> NullableEncoder<'a> {
264 pub fn new(encoder: Box<dyn Encoder + 'a>, nulls: Option<NullBuffer>) -> Self {
266 Self { encoder, nulls }
267 }
268
269 pub fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
271 self.encoder.encode(idx, out)
272 }
273
274 pub fn is_null(&self, idx: usize) -> bool {
276 self.nulls.as_ref().is_some_and(|nulls| nulls.is_null(idx))
277 }
278
279 pub fn has_nulls(&self) -> bool {
281 match self.nulls {
282 Some(ref nulls) => nulls.null_count() > 0,
283 None => false,
284 }
285 }
286}
287
288impl Encoder for NullableEncoder<'_> {
289 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
290 self.encoder.encode(idx, out)
291 }
292}
293
294pub trait Encoder {
298 fn encode(&mut self, idx: usize, out: &mut Vec<u8>);
302}
303
304pub fn make_encoder<'a>(
308 field: &'a FieldRef,
309 array: &'a dyn Array,
310 options: &'a EncoderOptions,
311) -> Result<NullableEncoder<'a>, ArrowError> {
312 macro_rules! primitive_helper {
313 ($t:ty) => {{
314 let array = array.as_primitive::<$t>();
315 let nulls = array.nulls().cloned();
316 NullableEncoder::new(Box::new(PrimitiveEncoder::new(array)), nulls)
317 }};
318 }
319
320 if let Some(factory) = options.encoder_factory() {
321 if let Some(encoder) = factory.make_default_encoder(field, array, options)? {
322 return Ok(encoder);
323 }
324 }
325
326 let nulls = array.nulls().cloned();
327 let encoder = downcast_integer! {
328 array.data_type() => (primitive_helper),
329 DataType::Float16 => primitive_helper!(Float16Type),
330 DataType::Float32 => primitive_helper!(Float32Type),
331 DataType::Float64 => primitive_helper!(Float64Type),
332 DataType::Boolean => {
333 let array = array.as_boolean();
334 NullableEncoder::new(Box::new(BooleanEncoder(array)), array.nulls().cloned())
335 }
336 DataType::Null => NullableEncoder::new(Box::new(NullEncoder), array.logical_nulls()),
337 DataType::Utf8 => {
338 let array = array.as_string::<i32>();
339 NullableEncoder::new(Box::new(StringEncoder(array)), array.nulls().cloned())
340 }
341 DataType::LargeUtf8 => {
342 let array = array.as_string::<i64>();
343 NullableEncoder::new(Box::new(StringEncoder(array)), array.nulls().cloned())
344 }
345 DataType::Utf8View => {
346 let array = array.as_string_view();
347 NullableEncoder::new(Box::new(StringViewEncoder(array)), array.nulls().cloned())
348 }
349 DataType::BinaryView => {
350 let array = array.as_binary_view();
351 NullableEncoder::new(Box::new(BinaryViewEncoder(array)), array.nulls().cloned())
352 }
353 DataType::List(_) => {
354 let array = array.as_list::<i32>();
355 NullableEncoder::new(Box::new(ListEncoder::try_new(field, array, options)?), array.nulls().cloned())
356 }
357 DataType::LargeList(_) => {
358 let array = array.as_list::<i64>();
359 NullableEncoder::new(Box::new(ListEncoder::try_new(field, array, options)?), array.nulls().cloned())
360 }
361 DataType::FixedSizeList(_, _) => {
362 let array = array.as_fixed_size_list();
363 NullableEncoder::new(Box::new(FixedSizeListEncoder::try_new(field, array, options)?), array.nulls().cloned())
364 }
365
366 DataType::Dictionary(_, _) => downcast_dictionary_array! {
367 array => {
368 NullableEncoder::new(Box::new(DictionaryEncoder::try_new(field, array, options)?), array.nulls().cloned())
369 },
370 _ => unreachable!()
371 }
372
373 DataType::Map(_, _) => {
374 let array = array.as_map();
375 NullableEncoder::new(Box::new(MapEncoder::try_new(field, array, options)?), array.nulls().cloned())
376 }
377
378 DataType::FixedSizeBinary(_) => {
379 let array = array.as_fixed_size_binary();
380 NullableEncoder::new(Box::new(BinaryEncoder::new(array)) as _, array.nulls().cloned())
381 }
382
383 DataType::Binary => {
384 let array: &BinaryArray = array.as_binary();
385 NullableEncoder::new(Box::new(BinaryEncoder::new(array)), array.nulls().cloned())
386 }
387
388 DataType::LargeBinary => {
389 let array: &LargeBinaryArray = array.as_binary();
390 NullableEncoder::new(Box::new(BinaryEncoder::new(array)), array.nulls().cloned())
391 }
392
393 DataType::Struct(fields) => {
394 let array = array.as_struct();
395 let encoders = fields.iter().zip(array.columns()).map(|(field, array)| {
396 let encoder = make_encoder(field, array, options)?;
397 Ok(FieldEncoder{
398 field: field.clone(),
399 encoder,
400 })
401 }).collect::<Result<Vec<_>, ArrowError>>()?;
402
403 let encoder = StructArrayEncoder{
404 encoders,
405 explicit_nulls: options.explicit_nulls(),
406 struct_mode: options.struct_mode(),
407 };
408 let nulls = array.nulls().cloned();
409 NullableEncoder::new(Box::new(encoder) as Box<dyn Encoder + 'a>, nulls)
410 }
411 DataType::Decimal32(_, _) | DataType::Decimal64(_, _) | DataType::Decimal128(_, _) | DataType::Decimal256(_, _) => {
412 let options = FormatOptions::new().with_display_error(true);
413 let formatter = JsonArrayFormatter::new(ArrayFormatter::try_new(array, &options)?);
414 NullableEncoder::new(Box::new(RawArrayFormatter(formatter)) as Box<dyn Encoder + 'a>, nulls)
415 }
416 d => match d.is_temporal() {
417 true => {
418 let fops = FormatOptions::new().with_display_error(true)
423 .with_date_format(options.date_format.as_deref())
424 .with_datetime_format(options.datetime_format.as_deref())
425 .with_timestamp_format(options.timestamp_format.as_deref())
426 .with_timestamp_tz_format(options.timestamp_tz_format.as_deref())
427 .with_time_format(options.time_format.as_deref());
428
429 let formatter = ArrayFormatter::try_new(array, &fops)?;
430 let formatter = JsonArrayFormatter::new(formatter);
431 NullableEncoder::new(Box::new(formatter) as Box<dyn Encoder + 'a>, nulls)
432 }
433 false => return Err(ArrowError::JsonError(format!(
434 "Unsupported data type for JSON encoding: {d:?}",
435 )))
436 }
437 };
438
439 Ok(encoder)
440}
441
442fn encode_string(s: &str, out: &mut Vec<u8>) {
443 let mut serializer = serde_json::Serializer::new(out);
444 serializer.serialize_str(s).unwrap();
445}
446
447fn encode_binary(bytes: &[u8], out: &mut Vec<u8>) {
448 out.push(b'"');
449 for byte in bytes {
450 write!(out, "{byte:02x}").unwrap();
451 }
452 out.push(b'"');
453}
454
455struct FieldEncoder<'a> {
456 field: FieldRef,
457 encoder: NullableEncoder<'a>,
458}
459
460impl FieldEncoder<'_> {
461 fn is_null(&self, idx: usize) -> bool {
462 self.encoder.is_null(idx)
463 }
464}
465
466struct StructArrayEncoder<'a> {
467 encoders: Vec<FieldEncoder<'a>>,
468 explicit_nulls: bool,
469 struct_mode: StructMode,
470}
471
472impl Encoder for StructArrayEncoder<'_> {
473 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
474 match self.struct_mode {
475 StructMode::ObjectOnly => out.push(b'{'),
476 StructMode::ListOnly => out.push(b'['),
477 }
478 let mut is_first = true;
479 let drop_nulls = (self.struct_mode == StructMode::ObjectOnly) && !self.explicit_nulls;
481
482 for field_encoder in self.encoders.iter_mut() {
483 let is_null = field_encoder.is_null(idx);
484 if is_null && drop_nulls {
485 continue;
486 }
487
488 if !is_first {
489 out.push(b',');
490 }
491 is_first = false;
492
493 if self.struct_mode == StructMode::ObjectOnly {
494 encode_string(field_encoder.field.name(), out);
495 out.push(b':');
496 }
497
498 if is_null {
499 out.extend_from_slice(b"null");
500 } else {
501 field_encoder.encoder.encode(idx, out);
502 }
503 }
504 match self.struct_mode {
505 StructMode::ObjectOnly => out.push(b'}'),
506 StructMode::ListOnly => out.push(b']'),
507 }
508 }
509}
510
511trait PrimitiveEncode: ArrowNativeType {
512 type Buffer;
513
514 fn init_buffer() -> Self::Buffer;
516
517 fn encode(self, buf: &mut Self::Buffer) -> &[u8];
521}
522
523macro_rules! integer_encode {
524 ($($t:ty),*) => {
525 $(
526 impl PrimitiveEncode for $t {
527 type Buffer = [u8; Self::FORMATTED_SIZE];
528
529 fn init_buffer() -> Self::Buffer {
530 [0; Self::FORMATTED_SIZE]
531 }
532
533 fn encode(self, buf: &mut Self::Buffer) -> &[u8] {
534 lexical_core::write(self, buf)
535 }
536 }
537 )*
538 };
539}
540integer_encode!(i8, i16, i32, i64, u8, u16, u32, u64);
541
542macro_rules! float_encode {
543 ($($t:ty),*) => {
544 $(
545 impl PrimitiveEncode for $t {
546 type Buffer = [u8; Self::FORMATTED_SIZE];
547
548 fn init_buffer() -> Self::Buffer {
549 [0; Self::FORMATTED_SIZE]
550 }
551
552 fn encode(self, buf: &mut Self::Buffer) -> &[u8] {
553 if self.is_infinite() || self.is_nan() {
554 b"null"
555 } else {
556 lexical_core::write(self, buf)
557 }
558 }
559 }
560 )*
561 };
562}
563float_encode!(f32, f64);
564
565impl PrimitiveEncode for f16 {
566 type Buffer = <f32 as PrimitiveEncode>::Buffer;
567
568 fn init_buffer() -> Self::Buffer {
569 f32::init_buffer()
570 }
571
572 fn encode(self, buf: &mut Self::Buffer) -> &[u8] {
573 self.to_f32().encode(buf)
574 }
575}
576
577struct PrimitiveEncoder<N: PrimitiveEncode> {
578 values: ScalarBuffer<N>,
579 buffer: N::Buffer,
580}
581
582impl<N: PrimitiveEncode> PrimitiveEncoder<N> {
583 fn new<P: ArrowPrimitiveType<Native = N>>(array: &PrimitiveArray<P>) -> Self {
584 Self {
585 values: array.values().clone(),
586 buffer: N::init_buffer(),
587 }
588 }
589}
590
591impl<N: PrimitiveEncode> Encoder for PrimitiveEncoder<N> {
592 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
593 out.extend_from_slice(self.values[idx].encode(&mut self.buffer));
594 }
595}
596
597struct BooleanEncoder<'a>(&'a BooleanArray);
598
599impl Encoder for BooleanEncoder<'_> {
600 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
601 match self.0.value(idx) {
602 true => out.extend_from_slice(b"true"),
603 false => out.extend_from_slice(b"false"),
604 }
605 }
606}
607
608struct StringEncoder<'a, O: OffsetSizeTrait>(&'a GenericStringArray<O>);
609
610impl<O: OffsetSizeTrait> Encoder for StringEncoder<'_, O> {
611 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
612 encode_string(self.0.value(idx), out);
613 }
614}
615
616struct StringViewEncoder<'a>(&'a StringViewArray);
617
618impl Encoder for StringViewEncoder<'_> {
619 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
620 encode_string(self.0.value(idx), out);
621 }
622}
623
624struct BinaryViewEncoder<'a>(&'a BinaryViewArray);
625
626impl Encoder for BinaryViewEncoder<'_> {
627 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
628 encode_binary(self.0.value(idx), out);
629 }
630}
631
632struct ListEncoder<'a, O: OffsetSizeTrait> {
633 offsets: OffsetBuffer<O>,
634 encoder: NullableEncoder<'a>,
635}
636
637impl<'a, O: OffsetSizeTrait> ListEncoder<'a, O> {
638 fn try_new(
639 field: &'a FieldRef,
640 array: &'a GenericListArray<O>,
641 options: &'a EncoderOptions,
642 ) -> Result<Self, ArrowError> {
643 let encoder = make_encoder(field, array.values().as_ref(), options)?;
644 Ok(Self {
645 offsets: array.offsets().clone(),
646 encoder,
647 })
648 }
649}
650
651impl<O: OffsetSizeTrait> Encoder for ListEncoder<'_, O> {
652 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
653 let end = self.offsets[idx + 1].as_usize();
654 let start = self.offsets[idx].as_usize();
655 out.push(b'[');
656
657 if self.encoder.has_nulls() {
658 for idx in start..end {
659 if idx != start {
660 out.push(b',')
661 }
662 if self.encoder.is_null(idx) {
663 out.extend_from_slice(b"null");
664 } else {
665 self.encoder.encode(idx, out);
666 }
667 }
668 } else {
669 for idx in start..end {
670 if idx != start {
671 out.push(b',')
672 }
673 self.encoder.encode(idx, out);
674 }
675 }
676 out.push(b']');
677 }
678}
679
680struct FixedSizeListEncoder<'a> {
681 value_length: usize,
682 encoder: NullableEncoder<'a>,
683}
684
685impl<'a> FixedSizeListEncoder<'a> {
686 fn try_new(
687 field: &'a FieldRef,
688 array: &'a FixedSizeListArray,
689 options: &'a EncoderOptions,
690 ) -> Result<Self, ArrowError> {
691 let encoder = make_encoder(field, array.values().as_ref(), options)?;
692 Ok(Self {
693 encoder,
694 value_length: array.value_length().as_usize(),
695 })
696 }
697}
698
699impl Encoder for FixedSizeListEncoder<'_> {
700 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
701 let start = idx * self.value_length;
702 let end = start + self.value_length;
703 out.push(b'[');
704 if self.encoder.has_nulls() {
705 for idx in start..end {
706 if idx != start {
707 out.push(b',')
708 }
709 if self.encoder.is_null(idx) {
710 out.extend_from_slice(b"null");
711 } else {
712 self.encoder.encode(idx, out);
713 }
714 }
715 } else {
716 for idx in start..end {
717 if idx != start {
718 out.push(b',')
719 }
720 self.encoder.encode(idx, out);
721 }
722 }
723 out.push(b']');
724 }
725}
726
727struct DictionaryEncoder<'a, K: ArrowDictionaryKeyType> {
728 keys: ScalarBuffer<K::Native>,
729 encoder: NullableEncoder<'a>,
730}
731
732impl<'a, K: ArrowDictionaryKeyType> DictionaryEncoder<'a, K> {
733 fn try_new(
734 field: &'a FieldRef,
735 array: &'a DictionaryArray<K>,
736 options: &'a EncoderOptions,
737 ) -> Result<Self, ArrowError> {
738 let encoder = make_encoder(field, array.values().as_ref(), options)?;
739
740 Ok(Self {
741 keys: array.keys().values().clone(),
742 encoder,
743 })
744 }
745}
746
747impl<K: ArrowDictionaryKeyType> Encoder for DictionaryEncoder<'_, K> {
748 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
749 self.encoder.encode(self.keys[idx].as_usize(), out)
750 }
751}
752
753struct JsonArrayFormatter<'a> {
755 formatter: ArrayFormatter<'a>,
756}
757
758impl<'a> JsonArrayFormatter<'a> {
759 fn new(formatter: ArrayFormatter<'a>) -> Self {
760 Self { formatter }
761 }
762}
763
764impl Encoder for JsonArrayFormatter<'_> {
765 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
766 out.push(b'"');
767 let _ = write!(out, "{}", self.formatter.value(idx));
770 out.push(b'"')
771 }
772}
773
774struct RawArrayFormatter<'a>(JsonArrayFormatter<'a>);
776
777impl Encoder for RawArrayFormatter<'_> {
778 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
779 let _ = write!(out, "{}", self.0.formatter.value(idx));
780 }
781}
782
783struct NullEncoder;
784
785impl Encoder for NullEncoder {
786 fn encode(&mut self, _idx: usize, _out: &mut Vec<u8>) {
787 unreachable!()
788 }
789}
790
791struct MapEncoder<'a> {
792 offsets: OffsetBuffer<i32>,
793 keys: NullableEncoder<'a>,
794 values: NullableEncoder<'a>,
795 explicit_nulls: bool,
796}
797
798impl<'a> MapEncoder<'a> {
799 fn try_new(
800 field: &'a FieldRef,
801 array: &'a MapArray,
802 options: &'a EncoderOptions,
803 ) -> Result<Self, ArrowError> {
804 let values = array.values();
805 let keys = array.keys();
806
807 if !matches!(
808 keys.data_type(),
809 DataType::Utf8 | DataType::LargeUtf8 | DataType::Utf8View
810 ) {
811 return Err(ArrowError::JsonError(format!(
812 "Only UTF8 keys supported by JSON MapArray Writer: got {:?}",
813 keys.data_type()
814 )));
815 }
816
817 let keys = make_encoder(field, keys, options)?;
818 let values = make_encoder(field, values, options)?;
819
820 if keys.has_nulls() {
822 return Err(ArrowError::InvalidArgumentError(
823 "Encountered nulls in MapArray keys".to_string(),
824 ));
825 }
826
827 if array.entries().nulls().is_some_and(|x| x.null_count() != 0) {
828 return Err(ArrowError::InvalidArgumentError(
829 "Encountered nulls in MapArray entries".to_string(),
830 ));
831 }
832
833 Ok(Self {
834 offsets: array.offsets().clone(),
835 keys,
836 values,
837 explicit_nulls: options.explicit_nulls(),
838 })
839 }
840}
841
842impl Encoder for MapEncoder<'_> {
843 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
844 let end = self.offsets[idx + 1].as_usize();
845 let start = self.offsets[idx].as_usize();
846
847 let mut is_first = true;
848
849 out.push(b'{');
850
851 for idx in start..end {
852 let is_null = self.values.is_null(idx);
853 if is_null && !self.explicit_nulls {
854 continue;
855 }
856
857 if !is_first {
858 out.push(b',');
859 }
860 is_first = false;
861
862 self.keys.encode(idx, out);
863 out.push(b':');
864
865 if is_null {
866 out.extend_from_slice(b"null");
867 } else {
868 self.values.encode(idx, out);
869 }
870 }
871 out.push(b'}');
872 }
873}
874
875struct BinaryEncoder<B>(B);
878
879impl<'a, B> BinaryEncoder<B>
880where
881 B: ArrayAccessor<Item = &'a [u8]>,
882{
883 fn new(array: B) -> Self {
884 Self(array)
885 }
886}
887
888impl<'a, B> Encoder for BinaryEncoder<B>
889where
890 B: ArrayAccessor<Item = &'a [u8]>,
891{
892 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
893 out.push(b'"');
894 for byte in self.0.value(idx) {
895 write!(out, "{byte:02x}").unwrap();
897 }
898 out.push(b'"');
899 }
900}