1use std::io::Write;
18use std::sync::Arc;
19
20use crate::StructMode;
21use arrow_array::cast::AsArray;
22use arrow_array::types::*;
23use arrow_array::*;
24use arrow_buffer::{ArrowNativeType, NullBuffer, OffsetBuffer, ScalarBuffer};
25use arrow_cast::display::{ArrayFormatter, FormatOptions};
26use arrow_schema::{ArrowError, DataType, FieldRef};
27use half::f16;
28use lexical_core::FormattedSize;
29use serde::Serializer;
30
31#[derive(Debug, Clone, Default)]
33pub struct EncoderOptions {
34 explicit_nulls: bool,
36 struct_mode: StructMode,
38 encoder_factory: Option<Arc<dyn EncoderFactory>>,
40}
41
42impl EncoderOptions {
43 pub fn with_explicit_nulls(mut self, explicit_nulls: bool) -> Self {
45 self.explicit_nulls = explicit_nulls;
46 self
47 }
48
49 pub fn with_struct_mode(mut self, struct_mode: StructMode) -> Self {
51 self.struct_mode = struct_mode;
52 self
53 }
54
55 pub fn with_encoder_factory(mut self, encoder_factory: Arc<dyn EncoderFactory>) -> Self {
57 self.encoder_factory = Some(encoder_factory);
58 self
59 }
60
61 pub fn explicit_nulls(&self) -> bool {
63 self.explicit_nulls
64 }
65
66 pub fn struct_mode(&self) -> StructMode {
68 self.struct_mode
69 }
70
71 pub fn encoder_factory(&self) -> Option<&Arc<dyn EncoderFactory>> {
73 self.encoder_factory.as_ref()
74 }
75}
76
77pub trait EncoderFactory: std::fmt::Debug + Send + Sync {
174 fn make_default_encoder<'a>(
182 &self,
183 _field: &'a FieldRef,
184 _array: &'a dyn Array,
185 _options: &'a EncoderOptions,
186 ) -> Result<Option<NullableEncoder<'a>>, ArrowError> {
187 Ok(None)
188 }
189}
190
191pub struct NullableEncoder<'a> {
194 encoder: Box<dyn Encoder + 'a>,
195 nulls: Option<NullBuffer>,
196}
197
198impl<'a> NullableEncoder<'a> {
199 pub fn new(encoder: Box<dyn Encoder + 'a>, nulls: Option<NullBuffer>) -> Self {
201 Self { encoder, nulls }
202 }
203
204 pub fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
206 self.encoder.encode(idx, out)
207 }
208
209 pub fn is_null(&self, idx: usize) -> bool {
211 self.nulls.as_ref().is_some_and(|nulls| nulls.is_null(idx))
212 }
213
214 pub fn has_nulls(&self) -> bool {
216 match self.nulls {
217 Some(ref nulls) => nulls.null_count() > 0,
218 None => false,
219 }
220 }
221}
222
223impl Encoder for NullableEncoder<'_> {
224 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
225 self.encoder.encode(idx, out)
226 }
227}
228
229pub trait Encoder {
233 fn encode(&mut self, idx: usize, out: &mut Vec<u8>);
237}
238
239pub fn make_encoder<'a>(
243 field: &'a FieldRef,
244 array: &'a dyn Array,
245 options: &'a EncoderOptions,
246) -> Result<NullableEncoder<'a>, ArrowError> {
247 macro_rules! primitive_helper {
248 ($t:ty) => {{
249 let array = array.as_primitive::<$t>();
250 let nulls = array.nulls().cloned();
251 NullableEncoder::new(Box::new(PrimitiveEncoder::new(array)), nulls)
252 }};
253 }
254
255 if let Some(factory) = options.encoder_factory() {
256 if let Some(encoder) = factory.make_default_encoder(field, array, options)? {
257 return Ok(encoder);
258 }
259 }
260
261 let nulls = array.nulls().cloned();
262 let encoder = downcast_integer! {
263 array.data_type() => (primitive_helper),
264 DataType::Float16 => primitive_helper!(Float16Type),
265 DataType::Float32 => primitive_helper!(Float32Type),
266 DataType::Float64 => primitive_helper!(Float64Type),
267 DataType::Boolean => {
268 let array = array.as_boolean();
269 NullableEncoder::new(Box::new(BooleanEncoder(array)), array.nulls().cloned())
270 }
271 DataType::Null => NullableEncoder::new(Box::new(NullEncoder), array.logical_nulls()),
272 DataType::Utf8 => {
273 let array = array.as_string::<i32>();
274 NullableEncoder::new(Box::new(StringEncoder(array)), array.nulls().cloned())
275 }
276 DataType::LargeUtf8 => {
277 let array = array.as_string::<i64>();
278 NullableEncoder::new(Box::new(StringEncoder(array)), array.nulls().cloned())
279 }
280 DataType::Utf8View => {
281 let array = array.as_string_view();
282 NullableEncoder::new(Box::new(StringViewEncoder(array)), array.nulls().cloned())
283 }
284 DataType::List(_) => {
285 let array = array.as_list::<i32>();
286 NullableEncoder::new(Box::new(ListEncoder::try_new(field, array, options)?), array.nulls().cloned())
287 }
288 DataType::LargeList(_) => {
289 let array = array.as_list::<i64>();
290 NullableEncoder::new(Box::new(ListEncoder::try_new(field, array, options)?), array.nulls().cloned())
291 }
292 DataType::FixedSizeList(_, _) => {
293 let array = array.as_fixed_size_list();
294 NullableEncoder::new(Box::new(FixedSizeListEncoder::try_new(field, array, options)?), array.nulls().cloned())
295 }
296
297 DataType::Dictionary(_, _) => downcast_dictionary_array! {
298 array => {
299 NullableEncoder::new(Box::new(DictionaryEncoder::try_new(field, array, options)?), array.nulls().cloned())
300 },
301 _ => unreachable!()
302 }
303
304 DataType::Map(_, _) => {
305 let array = array.as_map();
306 NullableEncoder::new(Box::new(MapEncoder::try_new(field, array, options)?), array.nulls().cloned())
307 }
308
309 DataType::FixedSizeBinary(_) => {
310 let array = array.as_fixed_size_binary();
311 NullableEncoder::new(Box::new(BinaryEncoder::new(array)) as _, array.nulls().cloned())
312 }
313
314 DataType::Binary => {
315 let array: &BinaryArray = array.as_binary();
316 NullableEncoder::new(Box::new(BinaryEncoder::new(array)), array.nulls().cloned())
317 }
318
319 DataType::LargeBinary => {
320 let array: &LargeBinaryArray = array.as_binary();
321 NullableEncoder::new(Box::new(BinaryEncoder::new(array)), array.nulls().cloned())
322 }
323
324 DataType::Struct(fields) => {
325 let array = array.as_struct();
326 let encoders = fields.iter().zip(array.columns()).map(|(field, array)| {
327 let encoder = make_encoder(field, array, options)?;
328 Ok(FieldEncoder{
329 field: field.clone(),
330 encoder,
331 })
332 }).collect::<Result<Vec<_>, ArrowError>>()?;
333
334 let encoder = StructArrayEncoder{
335 encoders,
336 explicit_nulls: options.explicit_nulls(),
337 struct_mode: options.struct_mode(),
338 };
339 let nulls = array.nulls().cloned();
340 NullableEncoder::new(Box::new(encoder) as Box<dyn Encoder + 'a>, nulls)
341 }
342 DataType::Decimal128(_, _) | DataType::Decimal256(_, _) => {
343 let options = FormatOptions::new().with_display_error(true);
344 let formatter = JsonArrayFormatter::new(ArrayFormatter::try_new(array, &options)?);
345 NullableEncoder::new(Box::new(RawArrayFormatter(formatter)) as Box<dyn Encoder + 'a>, nulls)
346 }
347 d => match d.is_temporal() {
348 true => {
349 let options = FormatOptions::new().with_display_error(true);
354 let formatter = ArrayFormatter::try_new(array, &options)?;
355 let formatter = JsonArrayFormatter::new(formatter);
356 NullableEncoder::new(Box::new(formatter) as Box<dyn Encoder + 'a>, nulls)
357 }
358 false => return Err(ArrowError::JsonError(format!(
359 "Unsupported data type for JSON encoding: {:?}",
360 d
361 )))
362 }
363 };
364
365 Ok(encoder)
366}
367
368fn encode_string(s: &str, out: &mut Vec<u8>) {
369 let mut serializer = serde_json::Serializer::new(out);
370 serializer.serialize_str(s).unwrap();
371}
372
373struct FieldEncoder<'a> {
374 field: FieldRef,
375 encoder: NullableEncoder<'a>,
376}
377
378impl FieldEncoder<'_> {
379 fn is_null(&self, idx: usize) -> bool {
380 self.encoder.is_null(idx)
381 }
382}
383
384struct StructArrayEncoder<'a> {
385 encoders: Vec<FieldEncoder<'a>>,
386 explicit_nulls: bool,
387 struct_mode: StructMode,
388}
389
390impl Encoder for StructArrayEncoder<'_> {
391 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
392 match self.struct_mode {
393 StructMode::ObjectOnly => out.push(b'{'),
394 StructMode::ListOnly => out.push(b'['),
395 }
396 let mut is_first = true;
397 let drop_nulls = (self.struct_mode == StructMode::ObjectOnly) && !self.explicit_nulls;
399
400 for field_encoder in self.encoders.iter_mut() {
401 let is_null = field_encoder.is_null(idx);
402 if is_null && drop_nulls {
403 continue;
404 }
405
406 if !is_first {
407 out.push(b',');
408 }
409 is_first = false;
410
411 if self.struct_mode == StructMode::ObjectOnly {
412 encode_string(field_encoder.field.name(), out);
413 out.push(b':');
414 }
415
416 if is_null {
417 out.extend_from_slice(b"null");
418 } else {
419 field_encoder.encoder.encode(idx, out);
420 }
421 }
422 match self.struct_mode {
423 StructMode::ObjectOnly => out.push(b'}'),
424 StructMode::ListOnly => out.push(b']'),
425 }
426 }
427}
428
429trait PrimitiveEncode: ArrowNativeType {
430 type Buffer;
431
432 fn init_buffer() -> Self::Buffer;
434
435 fn encode(self, buf: &mut Self::Buffer) -> &[u8];
439}
440
441macro_rules! integer_encode {
442 ($($t:ty),*) => {
443 $(
444 impl PrimitiveEncode for $t {
445 type Buffer = [u8; Self::FORMATTED_SIZE];
446
447 fn init_buffer() -> Self::Buffer {
448 [0; Self::FORMATTED_SIZE]
449 }
450
451 fn encode(self, buf: &mut Self::Buffer) -> &[u8] {
452 lexical_core::write(self, buf)
453 }
454 }
455 )*
456 };
457}
458integer_encode!(i8, i16, i32, i64, u8, u16, u32, u64);
459
460macro_rules! float_encode {
461 ($($t:ty),*) => {
462 $(
463 impl PrimitiveEncode for $t {
464 type Buffer = [u8; Self::FORMATTED_SIZE];
465
466 fn init_buffer() -> Self::Buffer {
467 [0; Self::FORMATTED_SIZE]
468 }
469
470 fn encode(self, buf: &mut Self::Buffer) -> &[u8] {
471 if self.is_infinite() || self.is_nan() {
472 b"null"
473 } else {
474 lexical_core::write(self, buf)
475 }
476 }
477 }
478 )*
479 };
480}
481float_encode!(f32, f64);
482
483impl PrimitiveEncode for f16 {
484 type Buffer = <f32 as PrimitiveEncode>::Buffer;
485
486 fn init_buffer() -> Self::Buffer {
487 f32::init_buffer()
488 }
489
490 fn encode(self, buf: &mut Self::Buffer) -> &[u8] {
491 self.to_f32().encode(buf)
492 }
493}
494
495struct PrimitiveEncoder<N: PrimitiveEncode> {
496 values: ScalarBuffer<N>,
497 buffer: N::Buffer,
498}
499
500impl<N: PrimitiveEncode> PrimitiveEncoder<N> {
501 fn new<P: ArrowPrimitiveType<Native = N>>(array: &PrimitiveArray<P>) -> Self {
502 Self {
503 values: array.values().clone(),
504 buffer: N::init_buffer(),
505 }
506 }
507}
508
509impl<N: PrimitiveEncode> Encoder for PrimitiveEncoder<N> {
510 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
511 out.extend_from_slice(self.values[idx].encode(&mut self.buffer));
512 }
513}
514
515struct BooleanEncoder<'a>(&'a BooleanArray);
516
517impl Encoder for BooleanEncoder<'_> {
518 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
519 match self.0.value(idx) {
520 true => out.extend_from_slice(b"true"),
521 false => out.extend_from_slice(b"false"),
522 }
523 }
524}
525
526struct StringEncoder<'a, O: OffsetSizeTrait>(&'a GenericStringArray<O>);
527
528impl<O: OffsetSizeTrait> Encoder for StringEncoder<'_, O> {
529 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
530 encode_string(self.0.value(idx), out);
531 }
532}
533
534struct StringViewEncoder<'a>(&'a StringViewArray);
535
536impl Encoder for StringViewEncoder<'_> {
537 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
538 encode_string(self.0.value(idx), out);
539 }
540}
541
542struct ListEncoder<'a, O: OffsetSizeTrait> {
543 offsets: OffsetBuffer<O>,
544 encoder: NullableEncoder<'a>,
545}
546
547impl<'a, O: OffsetSizeTrait> ListEncoder<'a, O> {
548 fn try_new(
549 field: &'a FieldRef,
550 array: &'a GenericListArray<O>,
551 options: &'a EncoderOptions,
552 ) -> Result<Self, ArrowError> {
553 let encoder = make_encoder(field, array.values().as_ref(), options)?;
554 Ok(Self {
555 offsets: array.offsets().clone(),
556 encoder,
557 })
558 }
559}
560
561impl<O: OffsetSizeTrait> Encoder for ListEncoder<'_, O> {
562 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
563 let end = self.offsets[idx + 1].as_usize();
564 let start = self.offsets[idx].as_usize();
565 out.push(b'[');
566
567 if self.encoder.has_nulls() {
568 for idx in start..end {
569 if idx != start {
570 out.push(b',')
571 }
572 if self.encoder.is_null(idx) {
573 out.extend_from_slice(b"null");
574 } else {
575 self.encoder.encode(idx, out);
576 }
577 }
578 } else {
579 for idx in start..end {
580 if idx != start {
581 out.push(b',')
582 }
583 self.encoder.encode(idx, out);
584 }
585 }
586 out.push(b']');
587 }
588}
589
590struct FixedSizeListEncoder<'a> {
591 value_length: usize,
592 encoder: NullableEncoder<'a>,
593}
594
595impl<'a> FixedSizeListEncoder<'a> {
596 fn try_new(
597 field: &'a FieldRef,
598 array: &'a FixedSizeListArray,
599 options: &'a EncoderOptions,
600 ) -> Result<Self, ArrowError> {
601 let encoder = make_encoder(field, array.values().as_ref(), options)?;
602 Ok(Self {
603 encoder,
604 value_length: array.value_length().as_usize(),
605 })
606 }
607}
608
609impl Encoder for FixedSizeListEncoder<'_> {
610 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
611 let start = idx * self.value_length;
612 let end = start + self.value_length;
613 out.push(b'[');
614 if self.encoder.has_nulls() {
615 for idx in start..end {
616 if idx != start {
617 out.push(b',')
618 }
619 if self.encoder.is_null(idx) {
620 out.extend_from_slice(b"null");
621 } else {
622 self.encoder.encode(idx, out);
623 }
624 }
625 } else {
626 for idx in start..end {
627 if idx != start {
628 out.push(b',')
629 }
630 self.encoder.encode(idx, out);
631 }
632 }
633 out.push(b']');
634 }
635}
636
637struct DictionaryEncoder<'a, K: ArrowDictionaryKeyType> {
638 keys: ScalarBuffer<K::Native>,
639 encoder: NullableEncoder<'a>,
640}
641
642impl<'a, K: ArrowDictionaryKeyType> DictionaryEncoder<'a, K> {
643 fn try_new(
644 field: &'a FieldRef,
645 array: &'a DictionaryArray<K>,
646 options: &'a EncoderOptions,
647 ) -> Result<Self, ArrowError> {
648 let encoder = make_encoder(field, array.values().as_ref(), options)?;
649
650 Ok(Self {
651 keys: array.keys().values().clone(),
652 encoder,
653 })
654 }
655}
656
657impl<K: ArrowDictionaryKeyType> Encoder for DictionaryEncoder<'_, K> {
658 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
659 self.encoder.encode(self.keys[idx].as_usize(), out)
660 }
661}
662
663struct JsonArrayFormatter<'a> {
665 formatter: ArrayFormatter<'a>,
666}
667
668impl<'a> JsonArrayFormatter<'a> {
669 fn new(formatter: ArrayFormatter<'a>) -> Self {
670 Self { formatter }
671 }
672}
673
674impl Encoder for JsonArrayFormatter<'_> {
675 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
676 out.push(b'"');
677 let _ = write!(out, "{}", self.formatter.value(idx));
680 out.push(b'"')
681 }
682}
683
684struct RawArrayFormatter<'a>(JsonArrayFormatter<'a>);
686
687impl Encoder for RawArrayFormatter<'_> {
688 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
689 let _ = write!(out, "{}", self.0.formatter.value(idx));
690 }
691}
692
693struct NullEncoder;
694
695impl Encoder for NullEncoder {
696 fn encode(&mut self, _idx: usize, _out: &mut Vec<u8>) {
697 unreachable!()
698 }
699}
700
701struct MapEncoder<'a> {
702 offsets: OffsetBuffer<i32>,
703 keys: NullableEncoder<'a>,
704 values: NullableEncoder<'a>,
705 explicit_nulls: bool,
706}
707
708impl<'a> MapEncoder<'a> {
709 fn try_new(
710 field: &'a FieldRef,
711 array: &'a MapArray,
712 options: &'a EncoderOptions,
713 ) -> Result<Self, ArrowError> {
714 let values = array.values();
715 let keys = array.keys();
716
717 if !matches!(keys.data_type(), DataType::Utf8 | DataType::LargeUtf8) {
718 return Err(ArrowError::JsonError(format!(
719 "Only UTF8 keys supported by JSON MapArray Writer: got {:?}",
720 keys.data_type()
721 )));
722 }
723
724 let keys = make_encoder(field, keys, options)?;
725 let values = make_encoder(field, values, options)?;
726
727 if keys.has_nulls() {
729 return Err(ArrowError::InvalidArgumentError(
730 "Encountered nulls in MapArray keys".to_string(),
731 ));
732 }
733
734 if array.entries().nulls().is_some_and(|x| x.null_count() != 0) {
735 return Err(ArrowError::InvalidArgumentError(
736 "Encountered nulls in MapArray entries".to_string(),
737 ));
738 }
739
740 Ok(Self {
741 offsets: array.offsets().clone(),
742 keys,
743 values,
744 explicit_nulls: options.explicit_nulls(),
745 })
746 }
747}
748
749impl Encoder for MapEncoder<'_> {
750 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
751 let end = self.offsets[idx + 1].as_usize();
752 let start = self.offsets[idx].as_usize();
753
754 let mut is_first = true;
755
756 out.push(b'{');
757
758 for idx in start..end {
759 let is_null = self.values.is_null(idx);
760 if is_null && !self.explicit_nulls {
761 continue;
762 }
763
764 if !is_first {
765 out.push(b',');
766 }
767 is_first = false;
768
769 self.keys.encode(idx, out);
770 out.push(b':');
771
772 if is_null {
773 out.extend_from_slice(b"null");
774 } else {
775 self.values.encode(idx, out);
776 }
777 }
778 out.push(b'}');
779 }
780}
781
782struct BinaryEncoder<B>(B);
785
786impl<'a, B> BinaryEncoder<B>
787where
788 B: ArrayAccessor<Item = &'a [u8]>,
789{
790 fn new(array: B) -> Self {
791 Self(array)
792 }
793}
794
795impl<'a, B> Encoder for BinaryEncoder<B>
796where
797 B: ArrayAccessor<Item = &'a [u8]>,
798{
799 fn encode(&mut self, idx: usize, out: &mut Vec<u8>) {
800 out.push(b'"');
801 for byte in self.0.value(idx) {
802 write!(out, "{byte:02x}").unwrap();
804 }
805 out.push(b'"');
806 }
807}