1use std::io::Write;
18use std::sync::Arc;
19
20use crate::StructMode;
21use arrow_array::cast::AsArray;
22use arrow_array::types::*;
23use arrow_array::*;
24use arrow_buffer::{ArrowNativeType, NullBuffer, OffsetBuffer, ScalarBuffer};
25use arrow_cast::display::{ArrayFormatter, FormatOptions};
26use arrow_schema::{ArrowError, DataType, FieldRef};
27use half::f16;
28use lexical_core::FormattedSize;
29use serde::Serializer;
30
31#[derive(Debug, Clone, Default)]
33pub struct EncoderOptions {
34 explicit_nulls: bool,
36 struct_mode: StructMode,
38 encoder_factory: Option<Arc<dyn EncoderFactory>>,
40}
41
42impl EncoderOptions {
43 pub fn with_explicit_nulls(mut self, explicit_nulls: bool) -> Self {
45 self.explicit_nulls = explicit_nulls;
46 self
47 }
48
49 pub fn with_struct_mode(mut self, struct_mode: StructMode) -> Self {
51 self.struct_mode = struct_mode;
52 self
53 }
54
55 pub fn with_encoder_factory(mut self, encoder_factory: Arc<dyn EncoderFactory>) -> Self {
57 self.encoder_factory = Some(encoder_factory);
58 self
59 }
60
61 pub fn explicit_nulls(&self) -> bool {
63 self.explicit_nulls
64 }
65
66 pub fn struct_mode(&self) -> StructMode {
68 self.struct_mode
69 }
70
71 pub fn encoder_factory(&self) -> Option<&Arc<dyn EncoderFactory>> {
73 self.encoder_factory.as_ref()
74 }
75}
76
77pub trait EncoderFactory: std::fmt::Debug + Send + Sync {
174 fn make_default_encoder<'a>(
182 &self,
183 _field: &'a FieldRef,
184 _array: &'a dyn Array,
185 _options: &'a EncoderOptions,
186 ) -> Result<Option<NullableEncoder<'a>>, ArrowError> {
187 Ok(None)
188 }
189}
190
191pub struct NullableEncoder<'a> {
194 encoder: Box<dyn Encoder + 'a>,
195 nulls: Option<NullBuffer>,
196}
197
198impl<'a> NullableEncoder<'a> {
199 pub fn new(encoder: Box<dyn Encoder + 'a>, nulls: Option<NullBuffer>) -> Self {
201 Self { encoder, nulls }
202 }
203
204 pub fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
206 self.encoder.encode(idx, out)
207 }
208
209 pub fn is_null(&self, idx: usize) -> bool {
211 self.nulls.as_ref().is_some_and(|nulls| nulls.is_null(idx))
212 }
213
214 pub fn has_nulls(&self) -> bool {
216 match self.nulls {
217 Some(ref nulls) => nulls.null_count() > 0,
218 None => false,
219 }
220 }
221}
222
223impl Encoder for NullableEncoder<'_> {
224 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
225 self.encoder.encode(idx, out)
226 }
227}
228
229pub trait Encoder {
233 fn encode(&mut self, idx: usize, out: &mut Vec<u8>);
237}
238
239pub fn make_encoder<'a>(
243 field: &'a FieldRef,
244 array: &'a dyn Array,
245 options: &'a EncoderOptions,
246) -> Result<NullableEncoder<'a>, ArrowError> {
247 macro_rules! primitive_helper {
248 ($t:ty) => {{
249 let array = array.as_primitive::<$t>();
250 let nulls = array.nulls().cloned();
251 NullableEncoder::new(Box::new(PrimitiveEncoder::new(array)), nulls)
252 }};
253 }
254
255 if let Some(factory) = options.encoder_factory() {
256 if let Some(encoder) = factory.make_default_encoder(field, array, options)? {
257 return Ok(encoder);
258 }
259 }
260
261 let nulls = array.nulls().cloned();
262 let encoder = downcast_integer! {
263 array.data_type() => (primitive_helper),
264 DataType::Float16 => primitive_helper!(Float16Type),
265 DataType::Float32 => primitive_helper!(Float32Type),
266 DataType::Float64 => primitive_helper!(Float64Type),
267 DataType::Boolean => {
268 let array = array.as_boolean();
269 NullableEncoder::new(Box::new(BooleanEncoder(array)), array.nulls().cloned())
270 }
271 DataType::Null => NullableEncoder::new(Box::new(NullEncoder), array.logical_nulls()),
272 DataType::Utf8 => {
273 let array = array.as_string::<i32>();
274 NullableEncoder::new(Box::new(StringEncoder(array)), array.nulls().cloned())
275 }
276 DataType::LargeUtf8 => {
277 let array = array.as_string::<i64>();
278 NullableEncoder::new(Box::new(StringEncoder(array)), array.nulls().cloned())
279 }
280 DataType::Utf8View => {
281 let array = array.as_string_view();
282 NullableEncoder::new(Box::new(StringViewEncoder(array)), array.nulls().cloned())
283 }
284 DataType::List(_) => {
285 let array = array.as_list::<i32>();
286 NullableEncoder::new(Box::new(ListEncoder::try_new(field, array, options)?), array.nulls().cloned())
287 }
288 DataType::LargeList(_) => {
289 let array = array.as_list::<i64>();
290 NullableEncoder::new(Box::new(ListEncoder::try_new(field, array, options)?), array.nulls().cloned())
291 }
292 DataType::FixedSizeList(_, _) => {
293 let array = array.as_fixed_size_list();
294 NullableEncoder::new(Box::new(FixedSizeListEncoder::try_new(field, array, options)?), array.nulls().cloned())
295 }
296
297 DataType::Dictionary(_, _) => downcast_dictionary_array! {
298 array => {
299 NullableEncoder::new(Box::new(DictionaryEncoder::try_new(field, array, options)?), array.nulls().cloned())
300 },
301 _ => unreachable!()
302 }
303
304 DataType::Map(_, _) => {
305 let array = array.as_map();
306 NullableEncoder::new(Box::new(MapEncoder::try_new(field, array, options)?), array.nulls().cloned())
307 }
308
309 DataType::FixedSizeBinary(_) => {
310 let array = array.as_fixed_size_binary();
311 NullableEncoder::new(Box::new(BinaryEncoder::new(array)) as _, array.nulls().cloned())
312 }
313
314 DataType::Binary => {
315 let array: &BinaryArray = array.as_binary();
316 NullableEncoder::new(Box::new(BinaryEncoder::new(array)), array.nulls().cloned())
317 }
318
319 DataType::LargeBinary => {
320 let array: &LargeBinaryArray = array.as_binary();
321 NullableEncoder::new(Box::new(BinaryEncoder::new(array)), array.nulls().cloned())
322 }
323
324 DataType::Struct(fields) => {
325 let array = array.as_struct();
326 let encoders = fields.iter().zip(array.columns()).map(|(field, array)| {
327 let encoder = make_encoder(field, array, options)?;
328 Ok(FieldEncoder{
329 field: field.clone(),
330 encoder,
331 })
332 }).collect::<Result<Vec<_>, ArrowError>>()?;
333
334 let encoder = StructArrayEncoder{
335 encoders,
336 explicit_nulls: options.explicit_nulls(),
337 struct_mode: options.struct_mode(),
338 };
339 let nulls = array.nulls().cloned();
340 NullableEncoder::new(Box::new(encoder) as Box<dyn Encoder + 'a>, nulls)
341 }
342 DataType::Decimal128(_, _) | DataType::Decimal256(_, _) => {
343 let options = FormatOptions::new().with_display_error(true);
344 let formatter = JsonArrayFormatter::new(ArrayFormatter::try_new(array, &options)?);
345 NullableEncoder::new(Box::new(RawArrayFormatter(formatter)) as Box<dyn Encoder + 'a>, nulls)
346 }
347 d => match d.is_temporal() {
348 true => {
349 let options = FormatOptions::new().with_display_error(true);
354 let formatter = ArrayFormatter::try_new(array, &options)?;
355 let formatter = JsonArrayFormatter::new(formatter);
356 NullableEncoder::new(Box::new(formatter) as Box<dyn Encoder + 'a>, nulls)
357 }
358 false => return Err(ArrowError::JsonError(format!(
359 "Unsupported data type for JSON encoding: {d:?}",
360 )))
361 }
362 };
363
364 Ok(encoder)
365}
366
367fn encode_string(s: &str, out: &mut Vec<u8>) {
368 let mut serializer = serde_json::Serializer::new(out);
369 serializer.serialize_str(s).unwrap();
370}
371
372struct FieldEncoder<'a> {
373 field: FieldRef,
374 encoder: NullableEncoder<'a>,
375}
376
377impl FieldEncoder<'_> {
378 fn is_null(&self, idx: usize) -> bool {
379 self.encoder.is_null(idx)
380 }
381}
382
383struct StructArrayEncoder<'a> {
384 encoders: Vec<FieldEncoder<'a>>,
385 explicit_nulls: bool,
386 struct_mode: StructMode,
387}
388
389impl Encoder for StructArrayEncoder<'_> {
390 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
391 match self.struct_mode {
392 StructMode::ObjectOnly => out.push(b'{'),
393 StructMode::ListOnly => out.push(b'['),
394 }
395 let mut is_first = true;
396 let drop_nulls = (self.struct_mode == StructMode::ObjectOnly) && !self.explicit_nulls;
398
399 for field_encoder in self.encoders.iter_mut() {
400 let is_null = field_encoder.is_null(idx);
401 if is_null && drop_nulls {
402 continue;
403 }
404
405 if !is_first {
406 out.push(b',');
407 }
408 is_first = false;
409
410 if self.struct_mode == StructMode::ObjectOnly {
411 encode_string(field_encoder.field.name(), out);
412 out.push(b':');
413 }
414
415 if is_null {
416 out.extend_from_slice(b"null");
417 } else {
418 field_encoder.encoder.encode(idx, out);
419 }
420 }
421 match self.struct_mode {
422 StructMode::ObjectOnly => out.push(b'}'),
423 StructMode::ListOnly => out.push(b']'),
424 }
425 }
426}
427
428trait PrimitiveEncode: ArrowNativeType {
429 type Buffer;
430
431 fn init_buffer() -> Self::Buffer;
433
434 fn encode(self, buf: &mut Self::Buffer) -> &[u8];
438}
439
440macro_rules! integer_encode {
441 ($($t:ty),*) => {
442 $(
443 impl PrimitiveEncode for $t {
444 type Buffer = [u8; Self::FORMATTED_SIZE];
445
446 fn init_buffer() -> Self::Buffer {
447 [0; Self::FORMATTED_SIZE]
448 }
449
450 fn encode(self, buf: &mut Self::Buffer) -> &[u8] {
451 lexical_core::write(self, buf)
452 }
453 }
454 )*
455 };
456}
457integer_encode!(i8, i16, i32, i64, u8, u16, u32, u64);
458
459macro_rules! float_encode {
460 ($($t:ty),*) => {
461 $(
462 impl PrimitiveEncode for $t {
463 type Buffer = [u8; Self::FORMATTED_SIZE];
464
465 fn init_buffer() -> Self::Buffer {
466 [0; Self::FORMATTED_SIZE]
467 }
468
469 fn encode(self, buf: &mut Self::Buffer) -> &[u8] {
470 if self.is_infinite() || self.is_nan() {
471 b"null"
472 } else {
473 lexical_core::write(self, buf)
474 }
475 }
476 }
477 )*
478 };
479}
480float_encode!(f32, f64);
481
482impl PrimitiveEncode for f16 {
483 type Buffer = <f32 as PrimitiveEncode>::Buffer;
484
485 fn init_buffer() -> Self::Buffer {
486 f32::init_buffer()
487 }
488
489 fn encode(self, buf: &mut Self::Buffer) -> &[u8] {
490 self.to_f32().encode(buf)
491 }
492}
493
494struct PrimitiveEncoder<N: PrimitiveEncode> {
495 values: ScalarBuffer<N>,
496 buffer: N::Buffer,
497}
498
499impl<N: PrimitiveEncode> PrimitiveEncoder<N> {
500 fn new<P: ArrowPrimitiveType<Native = N>>(array: &PrimitiveArray<P>) -> Self {
501 Self {
502 values: array.values().clone(),
503 buffer: N::init_buffer(),
504 }
505 }
506}
507
508impl<N: PrimitiveEncode> Encoder for PrimitiveEncoder<N> {
509 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
510 out.extend_from_slice(self.values[idx].encode(&mut self.buffer));
511 }
512}
513
514struct BooleanEncoder<'a>(&'a BooleanArray);
515
516impl Encoder for BooleanEncoder<'_> {
517 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
518 match self.0.value(idx) {
519 true => out.extend_from_slice(b"true"),
520 false => out.extend_from_slice(b"false"),
521 }
522 }
523}
524
525struct StringEncoder<'a, O: OffsetSizeTrait>(&'a GenericStringArray<O>);
526
527impl<O: OffsetSizeTrait> Encoder for StringEncoder<'_, O> {
528 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
529 encode_string(self.0.value(idx), out);
530 }
531}
532
533struct StringViewEncoder<'a>(&'a StringViewArray);
534
535impl Encoder for StringViewEncoder<'_> {
536 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
537 encode_string(self.0.value(idx), out);
538 }
539}
540
541struct ListEncoder<'a, O: OffsetSizeTrait> {
542 offsets: OffsetBuffer<O>,
543 encoder: NullableEncoder<'a>,
544}
545
546impl<'a, O: OffsetSizeTrait> ListEncoder<'a, O> {
547 fn try_new(
548 field: &'a FieldRef,
549 array: &'a GenericListArray<O>,
550 options: &'a EncoderOptions,
551 ) -> Result<Self, ArrowError> {
552 let encoder = make_encoder(field, array.values().as_ref(), options)?;
553 Ok(Self {
554 offsets: array.offsets().clone(),
555 encoder,
556 })
557 }
558}
559
560impl<O: OffsetSizeTrait> Encoder for ListEncoder<'_, O> {
561 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
562 let end = self.offsets[idx + 1].as_usize();
563 let start = self.offsets[idx].as_usize();
564 out.push(b'[');
565
566 if self.encoder.has_nulls() {
567 for idx in start..end {
568 if idx != start {
569 out.push(b',')
570 }
571 if self.encoder.is_null(idx) {
572 out.extend_from_slice(b"null");
573 } else {
574 self.encoder.encode(idx, out);
575 }
576 }
577 } else {
578 for idx in start..end {
579 if idx != start {
580 out.push(b',')
581 }
582 self.encoder.encode(idx, out);
583 }
584 }
585 out.push(b']');
586 }
587}
588
589struct FixedSizeListEncoder<'a> {
590 value_length: usize,
591 encoder: NullableEncoder<'a>,
592}
593
594impl<'a> FixedSizeListEncoder<'a> {
595 fn try_new(
596 field: &'a FieldRef,
597 array: &'a FixedSizeListArray,
598 options: &'a EncoderOptions,
599 ) -> Result<Self, ArrowError> {
600 let encoder = make_encoder(field, array.values().as_ref(), options)?;
601 Ok(Self {
602 encoder,
603 value_length: array.value_length().as_usize(),
604 })
605 }
606}
607
608impl Encoder for FixedSizeListEncoder<'_> {
609 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
610 let start = idx * self.value_length;
611 let end = start + self.value_length;
612 out.push(b'[');
613 if self.encoder.has_nulls() {
614 for idx in start..end {
615 if idx != start {
616 out.push(b',')
617 }
618 if self.encoder.is_null(idx) {
619 out.extend_from_slice(b"null");
620 } else {
621 self.encoder.encode(idx, out);
622 }
623 }
624 } else {
625 for idx in start..end {
626 if idx != start {
627 out.push(b',')
628 }
629 self.encoder.encode(idx, out);
630 }
631 }
632 out.push(b']');
633 }
634}
635
636struct DictionaryEncoder<'a, K: ArrowDictionaryKeyType> {
637 keys: ScalarBuffer<K::Native>,
638 encoder: NullableEncoder<'a>,
639}
640
641impl<'a, K: ArrowDictionaryKeyType> DictionaryEncoder<'a, K> {
642 fn try_new(
643 field: &'a FieldRef,
644 array: &'a DictionaryArray<K>,
645 options: &'a EncoderOptions,
646 ) -> Result<Self, ArrowError> {
647 let encoder = make_encoder(field, array.values().as_ref(), options)?;
648
649 Ok(Self {
650 keys: array.keys().values().clone(),
651 encoder,
652 })
653 }
654}
655
656impl<K: ArrowDictionaryKeyType> Encoder for DictionaryEncoder<'_, K> {
657 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
658 self.encoder.encode(self.keys[idx].as_usize(), out)
659 }
660}
661
662struct JsonArrayFormatter<'a> {
664 formatter: ArrayFormatter<'a>,
665}
666
667impl<'a> JsonArrayFormatter<'a> {
668 fn new(formatter: ArrayFormatter<'a>) -> Self {
669 Self { formatter }
670 }
671}
672
673impl Encoder for JsonArrayFormatter<'_> {
674 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
675 out.push(b'"');
676 let _ = write!(out, "{}", self.formatter.value(idx));
679 out.push(b'"')
680 }
681}
682
683struct RawArrayFormatter<'a>(JsonArrayFormatter<'a>);
685
686impl Encoder for RawArrayFormatter<'_> {
687 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
688 let _ = write!(out, "{}", self.0.formatter.value(idx));
689 }
690}
691
692struct NullEncoder;
693
694impl Encoder for NullEncoder {
695 fn encode(&mut self, _idx: usize, _out: &mut Vec<u8>) {
696 unreachable!()
697 }
698}
699
700struct MapEncoder<'a> {
701 offsets: OffsetBuffer<i32>,
702 keys: NullableEncoder<'a>,
703 values: NullableEncoder<'a>,
704 explicit_nulls: bool,
705}
706
707impl<'a> MapEncoder<'a> {
708 fn try_new(
709 field: &'a FieldRef,
710 array: &'a MapArray,
711 options: &'a EncoderOptions,
712 ) -> Result<Self, ArrowError> {
713 let values = array.values();
714 let keys = array.keys();
715
716 if !matches!(keys.data_type(), DataType::Utf8 | DataType::LargeUtf8) {
717 return Err(ArrowError::JsonError(format!(
718 "Only UTF8 keys supported by JSON MapArray Writer: got {:?}",
719 keys.data_type()
720 )));
721 }
722
723 let keys = make_encoder(field, keys, options)?;
724 let values = make_encoder(field, values, options)?;
725
726 if keys.has_nulls() {
728 return Err(ArrowError::InvalidArgumentError(
729 "Encountered nulls in MapArray keys".to_string(),
730 ));
731 }
732
733 if array.entries().nulls().is_some_and(|x| x.null_count() != 0) {
734 return Err(ArrowError::InvalidArgumentError(
735 "Encountered nulls in MapArray entries".to_string(),
736 ));
737 }
738
739 Ok(Self {
740 offsets: array.offsets().clone(),
741 keys,
742 values,
743 explicit_nulls: options.explicit_nulls(),
744 })
745 }
746}
747
748impl Encoder for MapEncoder<'_> {
749 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
750 let end = self.offsets[idx + 1].as_usize();
751 let start = self.offsets[idx].as_usize();
752
753 let mut is_first = true;
754
755 out.push(b'{');
756
757 for idx in start..end {
758 let is_null = self.values.is_null(idx);
759 if is_null && !self.explicit_nulls {
760 continue;
761 }
762
763 if !is_first {
764 out.push(b',');
765 }
766 is_first = false;
767
768 self.keys.encode(idx, out);
769 out.push(b':');
770
771 if is_null {
772 out.extend_from_slice(b"null");
773 } else {
774 self.values.encode(idx, out);
775 }
776 }
777 out.push(b'}');
778 }
779}
780
781struct BinaryEncoder<B>(B);
784
785impl<'a, B> BinaryEncoder<B>
786where
787 B: ArrayAccessor<Item = &'a [u8]>,
788{
789 fn new(array: B) -> Self {
790 Self(array)
791 }
792}
793
794impl<'a, B> Encoder for BinaryEncoder<B>
795where
796 B: ArrayAccessor<Item = &'a [u8]>,
797{
798 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
799 out.push(b'"');
800 for byte in self.0.value(idx) {
801 write!(out, "{byte:02x}").unwrap();
803 }
804 out.push(b'"');
805 }
806}