1#![doc(
157 html_logo_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_white-bg.svg",
158 html_favicon_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_transparent-bg.svg"
159)]
160#![cfg_attr(docsrs, feature(doc_cfg))]
161#![warn(missing_docs)]
162use std::cmp::Ordering;
163use std::hash::{Hash, Hasher};
164use std::sync::Arc;
165
166use arrow_array::cast::*;
167use arrow_array::types::ArrowDictionaryKeyType;
168use arrow_array::*;
169use arrow_buffer::{ArrowNativeType, Buffer, OffsetBuffer, ScalarBuffer};
170use arrow_data::{ArrayData, ArrayDataBuilder};
171use arrow_schema::*;
172use variable::{decode_binary_view, decode_string_view};
173
174use crate::fixed::{decode_bool, decode_fixed_size_binary, decode_primitive};
175use crate::list::{compute_lengths_fixed_size_list, encode_fixed_size_list};
176use crate::variable::{decode_binary, decode_string};
177use arrow_array::types::{Int16Type, Int32Type, Int64Type};
178
179mod fixed;
180mod list;
181mod run;
182mod variable;
183
184#[derive(Debug)]
441pub struct RowConverter {
442 fields: Arc<[SortField]>,
443 codecs: Vec<Codec>,
445}
446
447#[derive(Debug)]
448enum Codec {
449 Stateless,
451 Dictionary(RowConverter, OwnedRow),
454 Struct(RowConverter, OwnedRow),
457 List(RowConverter),
459 RunEndEncoded(RowConverter),
461 Union(Vec<RowConverter>, Vec<OwnedRow>),
464}
465
466impl Codec {
467 fn new(sort_field: &SortField) -> Result<Self, ArrowError> {
468 match &sort_field.data_type {
469 DataType::Dictionary(_, values) => {
470 let sort_field =
471 SortField::new_with_options(values.as_ref().clone(), sort_field.options);
472
473 let converter = RowConverter::new(vec![sort_field])?;
474 let null_array = new_null_array(values.as_ref(), 1);
475 let nulls = converter.convert_columns(&[null_array])?;
476
477 let owned = OwnedRow {
478 data: nulls.buffer.into(),
479 config: nulls.config,
480 };
481 Ok(Self::Dictionary(converter, owned))
482 }
483 DataType::RunEndEncoded(_, values) => {
484 let options = SortOptions {
486 descending: false,
487 nulls_first: sort_field.options.nulls_first != sort_field.options.descending,
488 };
489
490 let field = SortField::new_with_options(values.data_type().clone(), options);
491 let converter = RowConverter::new(vec![field])?;
492 Ok(Self::RunEndEncoded(converter))
493 }
494 d if !d.is_nested() => Ok(Self::Stateless),
495 DataType::List(f) | DataType::LargeList(f) => {
496 let options = SortOptions {
500 descending: false,
501 nulls_first: sort_field.options.nulls_first != sort_field.options.descending,
502 };
503
504 let field = SortField::new_with_options(f.data_type().clone(), options);
505 let converter = RowConverter::new(vec![field])?;
506 Ok(Self::List(converter))
507 }
508 DataType::FixedSizeList(f, _) => {
509 let field = SortField::new_with_options(f.data_type().clone(), sort_field.options);
510 let converter = RowConverter::new(vec![field])?;
511 Ok(Self::List(converter))
512 }
513 DataType::Struct(f) => {
514 let sort_fields = f
515 .iter()
516 .map(|x| SortField::new_with_options(x.data_type().clone(), sort_field.options))
517 .collect();
518
519 let converter = RowConverter::new(sort_fields)?;
520 let nulls: Vec<_> = f.iter().map(|x| new_null_array(x.data_type(), 1)).collect();
521
522 let nulls = converter.convert_columns(&nulls)?;
523 let owned = OwnedRow {
524 data: nulls.buffer.into(),
525 config: nulls.config,
526 };
527
528 Ok(Self::Struct(converter, owned))
529 }
530 DataType::Union(fields, _mode) => {
531 let options = SortOptions {
534 descending: false,
535 nulls_first: sort_field.options.nulls_first != sort_field.options.descending,
536 };
537
538 let mut converters = Vec::with_capacity(fields.len());
539 let mut null_rows = Vec::with_capacity(fields.len());
540
541 for (_type_id, field) in fields.iter() {
542 let sort_field =
543 SortField::new_with_options(field.data_type().clone(), options);
544 let converter = RowConverter::new(vec![sort_field])?;
545
546 let null_array = new_null_array(field.data_type(), 1);
547 let nulls = converter.convert_columns(&[null_array])?;
548 let owned = OwnedRow {
549 data: nulls.buffer.into(),
550 config: nulls.config,
551 };
552
553 converters.push(converter);
554 null_rows.push(owned);
555 }
556
557 Ok(Self::Union(converters, null_rows))
558 }
559 _ => Err(ArrowError::NotYetImplemented(format!(
560 "not yet implemented: {:?}",
561 sort_field.data_type
562 ))),
563 }
564 }
565
566 fn encoder(&self, array: &dyn Array) -> Result<Encoder<'_>, ArrowError> {
567 match self {
568 Codec::Stateless => Ok(Encoder::Stateless),
569 Codec::Dictionary(converter, nulls) => {
570 let values = array.as_any_dictionary().values().clone();
571 let rows = converter.convert_columns(&[values])?;
572 Ok(Encoder::Dictionary(rows, nulls.row()))
573 }
574 Codec::Struct(converter, null) => {
575 let v = as_struct_array(array);
576 let rows = converter.convert_columns(v.columns())?;
577 Ok(Encoder::Struct(rows, null.row()))
578 }
579 Codec::List(converter) => {
580 let values = match array.data_type() {
581 DataType::List(_) => {
582 let list_array = as_list_array(array);
583 let first_offset = list_array.offsets()[0] as usize;
584 let last_offset =
585 list_array.offsets()[list_array.offsets().len() - 1] as usize;
586
587 list_array
590 .values()
591 .slice(first_offset, last_offset - first_offset)
592 }
593 DataType::LargeList(_) => {
594 let list_array = as_large_list_array(array);
595
596 let first_offset = list_array.offsets()[0] as usize;
597 let last_offset =
598 list_array.offsets()[list_array.offsets().len() - 1] as usize;
599
600 list_array
603 .values()
604 .slice(first_offset, last_offset - first_offset)
605 }
606 DataType::FixedSizeList(_, _) => {
607 as_fixed_size_list_array(array).values().clone()
608 }
609 _ => unreachable!(),
610 };
611 let rows = converter.convert_columns(&[values])?;
612 Ok(Encoder::List(rows))
613 }
614 Codec::RunEndEncoded(converter) => {
615 let values = match array.data_type() {
616 DataType::RunEndEncoded(r, _) => match r.data_type() {
617 DataType::Int16 => array.as_run::<Int16Type>().values(),
618 DataType::Int32 => array.as_run::<Int32Type>().values(),
619 DataType::Int64 => array.as_run::<Int64Type>().values(),
620 _ => unreachable!("Unsupported run end index type: {r:?}"),
621 },
622 _ => unreachable!(),
623 };
624 let rows = converter.convert_columns(std::slice::from_ref(values))?;
625 Ok(Encoder::RunEndEncoded(rows))
626 }
627 Codec::Union(converters, _) => {
628 let union_array = array
629 .as_any()
630 .downcast_ref::<UnionArray>()
631 .expect("expected Union array");
632
633 let type_ids = union_array.type_ids().clone();
634 let offsets = union_array.offsets().cloned();
635
636 let mut child_rows = Vec::with_capacity(converters.len());
637 for (type_id, converter) in converters.iter().enumerate() {
638 let child_array = union_array.child(type_id as i8);
639 let rows = converter.convert_columns(std::slice::from_ref(child_array))?;
640 child_rows.push(rows);
641 }
642
643 Ok(Encoder::Union {
644 child_rows,
645 type_ids,
646 offsets,
647 })
648 }
649 }
650 }
651
652 fn size(&self) -> usize {
653 match self {
654 Codec::Stateless => 0,
655 Codec::Dictionary(converter, nulls) => converter.size() + nulls.data.len(),
656 Codec::Struct(converter, nulls) => converter.size() + nulls.data.len(),
657 Codec::List(converter) => converter.size(),
658 Codec::RunEndEncoded(converter) => converter.size(),
659 Codec::Union(converters, null_rows) => {
660 converters.iter().map(|c| c.size()).sum::<usize>()
661 + null_rows.iter().map(|n| n.data.len()).sum::<usize>()
662 }
663 }
664 }
665}
666
667#[derive(Debug)]
668enum Encoder<'a> {
669 Stateless,
671 Dictionary(Rows, Row<'a>),
673 Struct(Rows, Row<'a>),
679 List(Rows),
681 RunEndEncoded(Rows),
683 Union {
685 child_rows: Vec<Rows>,
686 type_ids: ScalarBuffer<i8>,
687 offsets: Option<ScalarBuffer<i32>>,
688 },
689}
690
691#[derive(Debug, Clone, PartialEq, Eq)]
693pub struct SortField {
694 options: SortOptions,
696 data_type: DataType,
698}
699
700impl SortField {
701 pub fn new(data_type: DataType) -> Self {
703 Self::new_with_options(data_type, Default::default())
704 }
705
706 pub fn new_with_options(data_type: DataType, options: SortOptions) -> Self {
708 Self { options, data_type }
709 }
710
711 pub fn size(&self) -> usize {
715 self.data_type.size() + std::mem::size_of::<Self>() - std::mem::size_of::<DataType>()
716 }
717}
718
719impl RowConverter {
720 pub fn new(fields: Vec<SortField>) -> Result<Self, ArrowError> {
722 if !Self::supports_fields(&fields) {
723 return Err(ArrowError::NotYetImplemented(format!(
724 "Row format support not yet implemented for: {fields:?}"
725 )));
726 }
727
728 let codecs = fields.iter().map(Codec::new).collect::<Result<_, _>>()?;
729 Ok(Self {
730 fields: fields.into(),
731 codecs,
732 })
733 }
734
735 pub fn supports_fields(fields: &[SortField]) -> bool {
737 fields.iter().all(|x| Self::supports_datatype(&x.data_type))
738 }
739
740 fn supports_datatype(d: &DataType) -> bool {
741 match d {
742 _ if !d.is_nested() => true,
743 DataType::List(f) | DataType::LargeList(f) | DataType::FixedSizeList(f, _) => {
744 Self::supports_datatype(f.data_type())
745 }
746 DataType::Struct(f) => f.iter().all(|x| Self::supports_datatype(x.data_type())),
747 DataType::RunEndEncoded(_, values) => Self::supports_datatype(values.data_type()),
748 DataType::Union(fs, _mode) => fs
749 .iter()
750 .all(|(_, f)| Self::supports_datatype(f.data_type())),
751 _ => false,
752 }
753 }
754
755 pub fn convert_columns(&self, columns: &[ArrayRef]) -> Result<Rows, ArrowError> {
765 let num_rows = columns.first().map(|x| x.len()).unwrap_or(0);
766 let mut rows = self.empty_rows(num_rows, 0);
767 self.append(&mut rows, columns)?;
768 Ok(rows)
769 }
770
771 pub fn append(&self, rows: &mut Rows, columns: &[ArrayRef]) -> Result<(), ArrowError> {
802 assert!(
803 Arc::ptr_eq(&rows.config.fields, &self.fields),
804 "rows were not produced by this RowConverter"
805 );
806
807 if columns.len() != self.fields.len() {
808 return Err(ArrowError::InvalidArgumentError(format!(
809 "Incorrect number of arrays provided to RowConverter, expected {} got {}",
810 self.fields.len(),
811 columns.len()
812 )));
813 }
814 for colum in columns.iter().skip(1) {
815 if colum.len() != columns[0].len() {
816 return Err(ArrowError::InvalidArgumentError(format!(
817 "RowConverter columns must all have the same length, expected {} got {}",
818 columns[0].len(),
819 colum.len()
820 )));
821 }
822 }
823
824 let encoders = columns
825 .iter()
826 .zip(&self.codecs)
827 .zip(self.fields.iter())
828 .map(|((column, codec), field)| {
829 if !column.data_type().equals_datatype(&field.data_type) {
830 return Err(ArrowError::InvalidArgumentError(format!(
831 "RowConverter column schema mismatch, expected {} got {}",
832 field.data_type,
833 column.data_type()
834 )));
835 }
836 codec.encoder(column.as_ref())
837 })
838 .collect::<Result<Vec<_>, _>>()?;
839
840 let write_offset = rows.num_rows();
841 let lengths = row_lengths(columns, &encoders);
842 let total = lengths.extend_offsets(rows.offsets[write_offset], &mut rows.offsets);
843 rows.buffer.resize(total, 0);
844
845 for ((column, field), encoder) in columns.iter().zip(self.fields.iter()).zip(encoders) {
846 encode_column(
848 &mut rows.buffer,
849 &mut rows.offsets[write_offset..],
850 column.as_ref(),
851 field.options,
852 &encoder,
853 )
854 }
855
856 if cfg!(debug_assertions) {
857 assert_eq!(*rows.offsets.last().unwrap(), rows.buffer.len());
858 rows.offsets
859 .windows(2)
860 .for_each(|w| assert!(w[0] <= w[1], "offsets should be monotonic"));
861 }
862
863 Ok(())
864 }
865
866 pub fn convert_rows<'a, I>(&self, rows: I) -> Result<Vec<ArrayRef>, ArrowError>
874 where
875 I: IntoIterator<Item = Row<'a>>,
876 {
877 let mut validate_utf8 = false;
878 let mut rows: Vec<_> = rows
879 .into_iter()
880 .map(|row| {
881 assert!(
882 Arc::ptr_eq(&row.config.fields, &self.fields),
883 "rows were not produced by this RowConverter"
884 );
885 validate_utf8 |= row.config.validate_utf8;
886 row.data
887 })
888 .collect();
889
890 let result = unsafe { self.convert_raw(&mut rows, validate_utf8) }?;
894
895 if cfg!(test) {
896 for (i, row) in rows.iter().enumerate() {
897 if !row.is_empty() {
898 return Err(ArrowError::InvalidArgumentError(format!(
899 "Codecs {codecs:?} did not consume all bytes for row {i}, remaining bytes: {row:?}",
900 codecs = &self.codecs
901 )));
902 }
903 }
904 }
905
906 Ok(result)
907 }
908
909 pub fn empty_rows(&self, row_capacity: usize, data_capacity: usize) -> Rows {
938 let mut offsets = Vec::with_capacity(row_capacity.saturating_add(1));
939 offsets.push(0);
940
941 Rows {
942 offsets,
943 buffer: Vec::with_capacity(data_capacity),
944 config: RowConfig {
945 fields: self.fields.clone(),
946 validate_utf8: false,
947 },
948 }
949 }
950
951 pub fn from_binary(&self, array: BinaryArray) -> Rows {
978 assert_eq!(
979 array.null_count(),
980 0,
981 "can't construct Rows instance from array with nulls"
982 );
983 let (offsets, values, _) = array.into_parts();
984 let offsets = offsets.iter().map(|&i| i.as_usize()).collect();
985 let buffer = values.into_vec().unwrap_or_else(|values| values.to_vec());
987 Rows {
988 buffer,
989 offsets,
990 config: RowConfig {
991 fields: Arc::clone(&self.fields),
992 validate_utf8: true,
993 },
994 }
995 }
996
997 unsafe fn convert_raw(
1003 &self,
1004 rows: &mut [&[u8]],
1005 validate_utf8: bool,
1006 ) -> Result<Vec<ArrayRef>, ArrowError> {
1007 self.fields
1008 .iter()
1009 .zip(&self.codecs)
1010 .map(|(field, codec)| unsafe { decode_column(field, rows, codec, validate_utf8) })
1011 .collect()
1012 }
1013
1014 pub fn parser(&self) -> RowParser {
1016 RowParser::new(Arc::clone(&self.fields))
1017 }
1018
1019 pub fn size(&self) -> usize {
1023 std::mem::size_of::<Self>()
1024 + self.fields.iter().map(|x| x.size()).sum::<usize>()
1025 + self.codecs.capacity() * std::mem::size_of::<Codec>()
1026 + self.codecs.iter().map(Codec::size).sum::<usize>()
1027 }
1028}
1029
1030#[derive(Debug)]
1032pub struct RowParser {
1033 config: RowConfig,
1034}
1035
1036impl RowParser {
1037 fn new(fields: Arc<[SortField]>) -> Self {
1038 Self {
1039 config: RowConfig {
1040 fields,
1041 validate_utf8: true,
1042 },
1043 }
1044 }
1045
1046 pub fn parse<'a>(&'a self, bytes: &'a [u8]) -> Row<'a> {
1051 Row {
1052 data: bytes,
1053 config: &self.config,
1054 }
1055 }
1056}
1057
1058#[derive(Debug, Clone)]
1060struct RowConfig {
1061 fields: Arc<[SortField]>,
1063 validate_utf8: bool,
1065}
1066
1067#[derive(Debug)]
1071pub struct Rows {
1072 buffer: Vec<u8>,
1074 offsets: Vec<usize>,
1076 config: RowConfig,
1078}
1079
1080impl Rows {
1081 pub fn push(&mut self, row: Row<'_>) {
1083 assert!(
1084 Arc::ptr_eq(&row.config.fields, &self.config.fields),
1085 "row was not produced by this RowConverter"
1086 );
1087 self.config.validate_utf8 |= row.config.validate_utf8;
1088 self.buffer.extend_from_slice(row.data);
1089 self.offsets.push(self.buffer.len())
1090 }
1091
1092 pub fn row(&self, row: usize) -> Row<'_> {
1094 assert!(row + 1 < self.offsets.len());
1095 unsafe { self.row_unchecked(row) }
1096 }
1097
1098 pub unsafe fn row_unchecked(&self, index: usize) -> Row<'_> {
1103 let end = unsafe { self.offsets.get_unchecked(index + 1) };
1104 let start = unsafe { self.offsets.get_unchecked(index) };
1105 let data = unsafe { self.buffer.get_unchecked(*start..*end) };
1106 Row {
1107 data,
1108 config: &self.config,
1109 }
1110 }
1111
1112 pub fn clear(&mut self) {
1114 self.offsets.truncate(1);
1115 self.buffer.clear();
1116 }
1117
1118 pub fn num_rows(&self) -> usize {
1120 self.offsets.len() - 1
1121 }
1122
1123 pub fn iter(&self) -> RowsIter<'_> {
1125 self.into_iter()
1126 }
1127
1128 pub fn size(&self) -> usize {
1132 std::mem::size_of::<Self>()
1134 + self.buffer.len()
1135 + self.offsets.len() * std::mem::size_of::<usize>()
1136 }
1137
1138 pub fn try_into_binary(self) -> Result<BinaryArray, ArrowError> {
1168 if self.buffer.len() > i32::MAX as usize {
1169 return Err(ArrowError::InvalidArgumentError(format!(
1170 "{}-byte rows buffer too long to convert into a i32-indexed BinaryArray",
1171 self.buffer.len()
1172 )));
1173 }
1174 let offsets_scalar = ScalarBuffer::from_iter(self.offsets.into_iter().map(i32::usize_as));
1176 let array = unsafe {
1178 BinaryArray::new_unchecked(
1179 OffsetBuffer::new_unchecked(offsets_scalar),
1180 Buffer::from_vec(self.buffer),
1181 None,
1182 )
1183 };
1184 Ok(array)
1185 }
1186}
1187
1188impl<'a> IntoIterator for &'a Rows {
1189 type Item = Row<'a>;
1190 type IntoIter = RowsIter<'a>;
1191
1192 fn into_iter(self) -> Self::IntoIter {
1193 RowsIter {
1194 rows: self,
1195 start: 0,
1196 end: self.num_rows(),
1197 }
1198 }
1199}
1200
1201#[derive(Debug)]
1203pub struct RowsIter<'a> {
1204 rows: &'a Rows,
1205 start: usize,
1206 end: usize,
1207}
1208
1209impl<'a> Iterator for RowsIter<'a> {
1210 type Item = Row<'a>;
1211
1212 fn next(&mut self) -> Option<Self::Item> {
1213 if self.end == self.start {
1214 return None;
1215 }
1216
1217 let row = unsafe { self.rows.row_unchecked(self.start) };
1219 self.start += 1;
1220 Some(row)
1221 }
1222
1223 fn size_hint(&self) -> (usize, Option<usize>) {
1224 let len = self.len();
1225 (len, Some(len))
1226 }
1227}
1228
1229impl ExactSizeIterator for RowsIter<'_> {
1230 fn len(&self) -> usize {
1231 self.end - self.start
1232 }
1233}
1234
1235impl DoubleEndedIterator for RowsIter<'_> {
1236 fn next_back(&mut self) -> Option<Self::Item> {
1237 if self.end == self.start {
1238 return None;
1239 }
1240 let row = unsafe { self.rows.row_unchecked(self.end) };
1242 self.end -= 1;
1243 Some(row)
1244 }
1245}
1246
1247#[derive(Debug, Copy, Clone)]
1256pub struct Row<'a> {
1257 data: &'a [u8],
1258 config: &'a RowConfig,
1259}
1260
1261impl<'a> Row<'a> {
1262 pub fn owned(&self) -> OwnedRow {
1264 OwnedRow {
1265 data: self.data.into(),
1266 config: self.config.clone(),
1267 }
1268 }
1269
1270 pub fn data(&self) -> &'a [u8] {
1272 self.data
1273 }
1274}
1275
1276impl PartialEq for Row<'_> {
1279 #[inline]
1280 fn eq(&self, other: &Self) -> bool {
1281 self.data.eq(other.data)
1282 }
1283}
1284
1285impl Eq for Row<'_> {}
1286
1287impl PartialOrd for Row<'_> {
1288 #[inline]
1289 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1290 Some(self.cmp(other))
1291 }
1292}
1293
1294impl Ord for Row<'_> {
1295 #[inline]
1296 fn cmp(&self, other: &Self) -> Ordering {
1297 self.data.cmp(other.data)
1298 }
1299}
1300
1301impl Hash for Row<'_> {
1302 #[inline]
1303 fn hash<H: Hasher>(&self, state: &mut H) {
1304 self.data.hash(state)
1305 }
1306}
1307
1308impl AsRef<[u8]> for Row<'_> {
1309 #[inline]
1310 fn as_ref(&self) -> &[u8] {
1311 self.data
1312 }
1313}
1314
1315#[derive(Debug, Clone)]
1319pub struct OwnedRow {
1320 data: Box<[u8]>,
1321 config: RowConfig,
1322}
1323
1324impl OwnedRow {
1325 pub fn row(&self) -> Row<'_> {
1329 Row {
1330 data: &self.data,
1331 config: &self.config,
1332 }
1333 }
1334}
1335
1336impl PartialEq for OwnedRow {
1339 #[inline]
1340 fn eq(&self, other: &Self) -> bool {
1341 self.row().eq(&other.row())
1342 }
1343}
1344
1345impl Eq for OwnedRow {}
1346
1347impl PartialOrd for OwnedRow {
1348 #[inline]
1349 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1350 Some(self.cmp(other))
1351 }
1352}
1353
1354impl Ord for OwnedRow {
1355 #[inline]
1356 fn cmp(&self, other: &Self) -> Ordering {
1357 self.row().cmp(&other.row())
1358 }
1359}
1360
1361impl Hash for OwnedRow {
1362 #[inline]
1363 fn hash<H: Hasher>(&self, state: &mut H) {
1364 self.row().hash(state)
1365 }
1366}
1367
1368impl AsRef<[u8]> for OwnedRow {
1369 #[inline]
1370 fn as_ref(&self) -> &[u8] {
1371 &self.data
1372 }
1373}
1374
1375#[inline]
1377fn null_sentinel(options: SortOptions) -> u8 {
1378 match options.nulls_first {
1379 true => 0,
1380 false => 0xFF,
1381 }
1382}
1383
1384enum LengthTracker {
1386 Fixed { length: usize, num_rows: usize },
1388 Variable {
1390 fixed_length: usize,
1391 lengths: Vec<usize>,
1392 },
1393}
1394
1395impl LengthTracker {
1396 fn new(num_rows: usize) -> Self {
1397 Self::Fixed {
1398 length: 0,
1399 num_rows,
1400 }
1401 }
1402
1403 fn push_fixed(&mut self, new_length: usize) {
1405 match self {
1406 LengthTracker::Fixed { length, .. } => *length += new_length,
1407 LengthTracker::Variable { fixed_length, .. } => *fixed_length += new_length,
1408 }
1409 }
1410
1411 fn push_variable(&mut self, new_lengths: impl ExactSizeIterator<Item = usize>) {
1413 match self {
1414 LengthTracker::Fixed { length, .. } => {
1415 *self = LengthTracker::Variable {
1416 fixed_length: *length,
1417 lengths: new_lengths.collect(),
1418 }
1419 }
1420 LengthTracker::Variable { lengths, .. } => {
1421 assert_eq!(lengths.len(), new_lengths.len());
1422 lengths
1423 .iter_mut()
1424 .zip(new_lengths)
1425 .for_each(|(length, new_length)| *length += new_length);
1426 }
1427 }
1428 }
1429
1430 fn materialized(&mut self) -> &mut [usize] {
1432 if let LengthTracker::Fixed { length, num_rows } = *self {
1433 *self = LengthTracker::Variable {
1434 fixed_length: length,
1435 lengths: vec![0; num_rows],
1436 };
1437 }
1438
1439 match self {
1440 LengthTracker::Variable { lengths, .. } => lengths,
1441 LengthTracker::Fixed { .. } => unreachable!(),
1442 }
1443 }
1444
1445 fn extend_offsets(&self, initial_offset: usize, offsets: &mut Vec<usize>) -> usize {
1463 match self {
1464 LengthTracker::Fixed { length, num_rows } => {
1465 offsets.extend((0..*num_rows).map(|i| initial_offset + i * length));
1466
1467 initial_offset + num_rows * length
1468 }
1469 LengthTracker::Variable {
1470 fixed_length,
1471 lengths,
1472 } => {
1473 let mut acc = initial_offset;
1474
1475 offsets.extend(lengths.iter().map(|length| {
1476 let current = acc;
1477 acc += length + fixed_length;
1478 current
1479 }));
1480
1481 acc
1482 }
1483 }
1484 }
1485}
1486
1487fn row_lengths(cols: &[ArrayRef], encoders: &[Encoder]) -> LengthTracker {
1489 use fixed::FixedLengthEncoding;
1490
1491 let num_rows = cols.first().map(|x| x.len()).unwrap_or(0);
1492 let mut tracker = LengthTracker::new(num_rows);
1493
1494 for (array, encoder) in cols.iter().zip(encoders) {
1495 match encoder {
1496 Encoder::Stateless => {
1497 downcast_primitive_array! {
1498 array => tracker.push_fixed(fixed::encoded_len(array)),
1499 DataType::Null => {},
1500 DataType::Boolean => tracker.push_fixed(bool::ENCODED_LEN),
1501 DataType::Binary => tracker.push_variable(
1502 as_generic_binary_array::<i32>(array)
1503 .iter()
1504 .map(|slice| variable::encoded_len(slice))
1505 ),
1506 DataType::LargeBinary => tracker.push_variable(
1507 as_generic_binary_array::<i64>(array)
1508 .iter()
1509 .map(|slice| variable::encoded_len(slice))
1510 ),
1511 DataType::BinaryView => tracker.push_variable(
1512 array.as_binary_view()
1513 .iter()
1514 .map(|slice| variable::encoded_len(slice))
1515 ),
1516 DataType::Utf8 => tracker.push_variable(
1517 array.as_string::<i32>()
1518 .iter()
1519 .map(|slice| variable::encoded_len(slice.map(|x| x.as_bytes())))
1520 ),
1521 DataType::LargeUtf8 => tracker.push_variable(
1522 array.as_string::<i64>()
1523 .iter()
1524 .map(|slice| variable::encoded_len(slice.map(|x| x.as_bytes())))
1525 ),
1526 DataType::Utf8View => tracker.push_variable(
1527 array.as_string_view()
1528 .iter()
1529 .map(|slice| variable::encoded_len(slice.map(|x| x.as_bytes())))
1530 ),
1531 DataType::FixedSizeBinary(len) => {
1532 let len = len.to_usize().unwrap();
1533 tracker.push_fixed(1 + len)
1534 }
1535 _ => unimplemented!("unsupported data type: {}", array.data_type()),
1536 }
1537 }
1538 Encoder::Dictionary(values, null) => {
1539 downcast_dictionary_array! {
1540 array => {
1541 tracker.push_variable(
1542 array.keys().iter().map(|v| match v {
1543 Some(k) => values.row(k.as_usize()).data.len(),
1544 None => null.data.len(),
1545 })
1546 )
1547 }
1548 _ => unreachable!(),
1549 }
1550 }
1551 Encoder::Struct(rows, null) => {
1552 let array = as_struct_array(array);
1553 tracker.push_variable((0..array.len()).map(|idx| match array.is_valid(idx) {
1554 true => 1 + rows.row(idx).as_ref().len(),
1555 false => 1 + null.data.len(),
1556 }));
1557 }
1558 Encoder::List(rows) => match array.data_type() {
1559 DataType::List(_) => {
1560 list::compute_lengths(tracker.materialized(), rows, as_list_array(array))
1561 }
1562 DataType::LargeList(_) => {
1563 list::compute_lengths(tracker.materialized(), rows, as_large_list_array(array))
1564 }
1565 DataType::FixedSizeList(_, _) => compute_lengths_fixed_size_list(
1566 &mut tracker,
1567 rows,
1568 as_fixed_size_list_array(array),
1569 ),
1570 _ => unreachable!(),
1571 },
1572 Encoder::RunEndEncoded(rows) => match array.data_type() {
1573 DataType::RunEndEncoded(r, _) => match r.data_type() {
1574 DataType::Int16 => run::compute_lengths(
1575 tracker.materialized(),
1576 rows,
1577 array.as_run::<Int16Type>(),
1578 ),
1579 DataType::Int32 => run::compute_lengths(
1580 tracker.materialized(),
1581 rows,
1582 array.as_run::<Int32Type>(),
1583 ),
1584 DataType::Int64 => run::compute_lengths(
1585 tracker.materialized(),
1586 rows,
1587 array.as_run::<Int64Type>(),
1588 ),
1589 _ => unreachable!("Unsupported run end index type: {r:?}"),
1590 },
1591 _ => unreachable!(),
1592 },
1593 Encoder::Union {
1594 child_rows,
1595 type_ids,
1596 offsets,
1597 } => {
1598 let union_array = array
1599 .as_any()
1600 .downcast_ref::<UnionArray>()
1601 .expect("expected UnionArray");
1602
1603 let lengths = (0..union_array.len()).map(|i| {
1604 let type_id = type_ids[i];
1605 let child_row_i = offsets.as_ref().map(|o| o[i] as usize).unwrap_or(i);
1606 let child_row = child_rows[type_id as usize].row(child_row_i);
1607
1608 1 + child_row.as_ref().len()
1610 });
1611
1612 tracker.push_variable(lengths);
1613 }
1614 }
1615 }
1616
1617 tracker
1618}
1619
1620fn encode_column(
1622 data: &mut [u8],
1623 offsets: &mut [usize],
1624 column: &dyn Array,
1625 opts: SortOptions,
1626 encoder: &Encoder<'_>,
1627) {
1628 match encoder {
1629 Encoder::Stateless => {
1630 downcast_primitive_array! {
1631 column => {
1632 if let Some(nulls) = column.nulls().filter(|n| n.null_count() > 0){
1633 fixed::encode(data, offsets, column.values(), nulls, opts)
1634 } else {
1635 fixed::encode_not_null(data, offsets, column.values(), opts)
1636 }
1637 }
1638 DataType::Null => {}
1639 DataType::Boolean => {
1640 if let Some(nulls) = column.nulls().filter(|n| n.null_count() > 0){
1641 fixed::encode_boolean(data, offsets, column.as_boolean().values(), nulls, opts)
1642 } else {
1643 fixed::encode_boolean_not_null(data, offsets, column.as_boolean().values(), opts)
1644 }
1645 }
1646 DataType::Binary => {
1647 variable::encode(data, offsets, as_generic_binary_array::<i32>(column).iter(), opts)
1648 }
1649 DataType::BinaryView => {
1650 variable::encode(data, offsets, column.as_binary_view().iter(), opts)
1651 }
1652 DataType::LargeBinary => {
1653 variable::encode(data, offsets, as_generic_binary_array::<i64>(column).iter(), opts)
1654 }
1655 DataType::Utf8 => variable::encode(
1656 data, offsets,
1657 column.as_string::<i32>().iter().map(|x| x.map(|x| x.as_bytes())),
1658 opts,
1659 ),
1660 DataType::LargeUtf8 => variable::encode(
1661 data, offsets,
1662 column.as_string::<i64>()
1663 .iter()
1664 .map(|x| x.map(|x| x.as_bytes())),
1665 opts,
1666 ),
1667 DataType::Utf8View => variable::encode(
1668 data, offsets,
1669 column.as_string_view().iter().map(|x| x.map(|x| x.as_bytes())),
1670 opts,
1671 ),
1672 DataType::FixedSizeBinary(_) => {
1673 let array = column.as_any().downcast_ref().unwrap();
1674 fixed::encode_fixed_size_binary(data, offsets, array, opts)
1675 }
1676 _ => unimplemented!("unsupported data type: {}", column.data_type()),
1677 }
1678 }
1679 Encoder::Dictionary(values, nulls) => {
1680 downcast_dictionary_array! {
1681 column => encode_dictionary_values(data, offsets, column, values, nulls),
1682 _ => unreachable!()
1683 }
1684 }
1685 Encoder::Struct(rows, null) => {
1686 let array = as_struct_array(column);
1687 let null_sentinel = null_sentinel(opts);
1688 offsets
1689 .iter_mut()
1690 .skip(1)
1691 .enumerate()
1692 .for_each(|(idx, offset)| {
1693 let (row, sentinel) = match array.is_valid(idx) {
1694 true => (rows.row(idx), 0x01),
1695 false => (*null, null_sentinel),
1696 };
1697 let end_offset = *offset + 1 + row.as_ref().len();
1698 data[*offset] = sentinel;
1699 data[*offset + 1..end_offset].copy_from_slice(row.as_ref());
1700 *offset = end_offset;
1701 })
1702 }
1703 Encoder::List(rows) => match column.data_type() {
1704 DataType::List(_) => list::encode(data, offsets, rows, opts, as_list_array(column)),
1705 DataType::LargeList(_) => {
1706 list::encode(data, offsets, rows, opts, as_large_list_array(column))
1707 }
1708 DataType::FixedSizeList(_, _) => {
1709 encode_fixed_size_list(data, offsets, rows, opts, as_fixed_size_list_array(column))
1710 }
1711 _ => unreachable!(),
1712 },
1713 Encoder::RunEndEncoded(rows) => match column.data_type() {
1714 DataType::RunEndEncoded(r, _) => match r.data_type() {
1715 DataType::Int16 => {
1716 run::encode(data, offsets, rows, opts, column.as_run::<Int16Type>())
1717 }
1718 DataType::Int32 => {
1719 run::encode(data, offsets, rows, opts, column.as_run::<Int32Type>())
1720 }
1721 DataType::Int64 => {
1722 run::encode(data, offsets, rows, opts, column.as_run::<Int64Type>())
1723 }
1724 _ => unreachable!("Unsupported run end index type: {r:?}"),
1725 },
1726 _ => unreachable!(),
1727 },
1728 Encoder::Union {
1729 child_rows,
1730 type_ids,
1731 offsets: offsets_buf,
1732 } => {
1733 offsets
1734 .iter_mut()
1735 .skip(1)
1736 .enumerate()
1737 .for_each(|(i, offset)| {
1738 let type_id = type_ids[i];
1739
1740 let child_row_idx = offsets_buf.as_ref().map(|o| o[i] as usize).unwrap_or(i);
1741 let child_row = child_rows[type_id as usize].row(child_row_idx);
1742 let child_bytes = child_row.as_ref();
1743
1744 let type_id_byte = if opts.descending {
1745 !(type_id as u8)
1746 } else {
1747 type_id as u8
1748 };
1749 data[*offset] = type_id_byte;
1750
1751 let child_start = *offset + 1;
1752 let child_end = child_start + child_bytes.len();
1753 data[child_start..child_end].copy_from_slice(child_bytes);
1754
1755 *offset = child_end;
1756 });
1757 }
1758 }
1759}
1760
1761pub fn encode_dictionary_values<K: ArrowDictionaryKeyType>(
1763 data: &mut [u8],
1764 offsets: &mut [usize],
1765 column: &DictionaryArray<K>,
1766 values: &Rows,
1767 null: &Row<'_>,
1768) {
1769 for (offset, k) in offsets.iter_mut().skip(1).zip(column.keys()) {
1770 let row = match k {
1771 Some(k) => values.row(k.as_usize()).data,
1772 None => null.data,
1773 };
1774 let end_offset = *offset + row.len();
1775 data[*offset..end_offset].copy_from_slice(row);
1776 *offset = end_offset;
1777 }
1778}
1779
1780macro_rules! decode_primitive_helper {
1781 ($t:ty, $rows:ident, $data_type:ident, $options:ident) => {
1782 Arc::new(decode_primitive::<$t>($rows, $data_type, $options))
1783 };
1784}
1785
1786unsafe fn decode_column(
1792 field: &SortField,
1793 rows: &mut [&[u8]],
1794 codec: &Codec,
1795 validate_utf8: bool,
1796) -> Result<ArrayRef, ArrowError> {
1797 let options = field.options;
1798
1799 let array: ArrayRef = match codec {
1800 Codec::Stateless => {
1801 let data_type = field.data_type.clone();
1802 downcast_primitive! {
1803 data_type => (decode_primitive_helper, rows, data_type, options),
1804 DataType::Null => Arc::new(NullArray::new(rows.len())),
1805 DataType::Boolean => Arc::new(decode_bool(rows, options)),
1806 DataType::Binary => Arc::new(decode_binary::<i32>(rows, options)),
1807 DataType::LargeBinary => Arc::new(decode_binary::<i64>(rows, options)),
1808 DataType::BinaryView => Arc::new(decode_binary_view(rows, options)),
1809 DataType::FixedSizeBinary(size) => Arc::new(decode_fixed_size_binary(rows, size, options)),
1810 DataType::Utf8 => Arc::new(unsafe{ decode_string::<i32>(rows, options, validate_utf8) }),
1811 DataType::LargeUtf8 => Arc::new(unsafe { decode_string::<i64>(rows, options, validate_utf8) }),
1812 DataType::Utf8View => Arc::new(unsafe { decode_string_view(rows, options, validate_utf8) }),
1813 _ => return Err(ArrowError::NotYetImplemented(format!("unsupported data type: {data_type}" )))
1814 }
1815 }
1816 Codec::Dictionary(converter, _) => {
1817 let cols = unsafe { converter.convert_raw(rows, validate_utf8) }?;
1818 cols.into_iter().next().unwrap()
1819 }
1820 Codec::Struct(converter, _) => {
1821 let (null_count, nulls) = fixed::decode_nulls(rows);
1822 rows.iter_mut().for_each(|row| *row = &row[1..]);
1823 let children = unsafe { converter.convert_raw(rows, validate_utf8) }?;
1824
1825 let child_data: Vec<ArrayData> = children.iter().map(|c| c.to_data()).collect();
1826 let corrected_fields: Vec<Field> = match &field.data_type {
1829 DataType::Struct(struct_fields) => struct_fields
1830 .iter()
1831 .zip(child_data.iter())
1832 .map(|(orig_field, child_array)| {
1833 orig_field
1834 .as_ref()
1835 .clone()
1836 .with_data_type(child_array.data_type().clone())
1837 })
1838 .collect(),
1839 _ => unreachable!("Only Struct types should be corrected here"),
1840 };
1841 let corrected_struct_type = DataType::Struct(corrected_fields.into());
1842 let builder = ArrayDataBuilder::new(corrected_struct_type)
1843 .len(rows.len())
1844 .null_count(null_count)
1845 .null_bit_buffer(Some(nulls))
1846 .child_data(child_data);
1847
1848 Arc::new(StructArray::from(unsafe { builder.build_unchecked() }))
1849 }
1850 Codec::List(converter) => match &field.data_type {
1851 DataType::List(_) => {
1852 Arc::new(unsafe { list::decode::<i32>(converter, rows, field, validate_utf8) }?)
1853 }
1854 DataType::LargeList(_) => {
1855 Arc::new(unsafe { list::decode::<i64>(converter, rows, field, validate_utf8) }?)
1856 }
1857 DataType::FixedSizeList(_, value_length) => Arc::new(unsafe {
1858 list::decode_fixed_size_list(
1859 converter,
1860 rows,
1861 field,
1862 validate_utf8,
1863 value_length.as_usize(),
1864 )
1865 }?),
1866 _ => unreachable!(),
1867 },
1868 Codec::RunEndEncoded(converter) => match &field.data_type {
1869 DataType::RunEndEncoded(run_ends, _) => match run_ends.data_type() {
1870 DataType::Int16 => Arc::new(unsafe {
1871 run::decode::<Int16Type>(converter, rows, field, validate_utf8)
1872 }?),
1873 DataType::Int32 => Arc::new(unsafe {
1874 run::decode::<Int32Type>(converter, rows, field, validate_utf8)
1875 }?),
1876 DataType::Int64 => Arc::new(unsafe {
1877 run::decode::<Int64Type>(converter, rows, field, validate_utf8)
1878 }?),
1879 _ => unreachable!(),
1880 },
1881 _ => unreachable!(),
1882 },
1883 Codec::Union(converters, null_rows) => {
1884 let len = rows.len();
1885
1886 let DataType::Union(union_fields, mode) = &field.data_type else {
1887 unreachable!()
1888 };
1889
1890 let mut type_ids = Vec::with_capacity(len);
1891 let mut rows_by_field: Vec<Vec<(usize, &[u8])>> = vec![Vec::new(); converters.len()];
1892
1893 for (idx, row) in rows.iter_mut().enumerate() {
1894 let type_id_byte = {
1895 let id = row[0];
1896 if options.descending { !id } else { id }
1897 };
1898
1899 let type_id = type_id_byte as i8;
1900 type_ids.push(type_id);
1901
1902 let field_idx = type_id as usize;
1903
1904 let child_row = &row[1..];
1905 rows_by_field[field_idx].push((idx, child_row));
1906
1907 *row = &row[row.len()..];
1908 }
1909
1910 let mut child_arrays: Vec<ArrayRef> = Vec::with_capacity(converters.len());
1911
1912 let mut offsets = (*mode == UnionMode::Dense).then(|| Vec::with_capacity(len));
1913
1914 for (field_idx, converter) in converters.iter().enumerate() {
1915 let field_rows = &rows_by_field[field_idx];
1916
1917 match &mode {
1918 UnionMode::Dense => {
1919 if field_rows.is_empty() {
1920 let (_, field) = union_fields.iter().nth(field_idx).unwrap();
1921 child_arrays.push(arrow_array::new_empty_array(field.data_type()));
1922 continue;
1923 }
1924
1925 let mut child_data = field_rows
1926 .iter()
1927 .map(|(_, bytes)| *bytes)
1928 .collect::<Vec<_>>();
1929
1930 let child_array =
1931 unsafe { converter.convert_raw(&mut child_data, validate_utf8) }?;
1932
1933 child_arrays.push(child_array.into_iter().next().unwrap());
1934 }
1935 UnionMode::Sparse => {
1936 let mut sparse_data: Vec<&[u8]> = Vec::with_capacity(len);
1937 let mut field_row_iter = field_rows.iter().peekable();
1938 let null_row_bytes: &[u8] = &null_rows[field_idx].data;
1939
1940 for idx in 0..len {
1941 if let Some((next_idx, bytes)) = field_row_iter.peek() {
1942 if *next_idx == idx {
1943 sparse_data.push(*bytes);
1944
1945 field_row_iter.next();
1946 continue;
1947 }
1948 }
1949 sparse_data.push(null_row_bytes);
1950 }
1951
1952 let child_array =
1953 unsafe { converter.convert_raw(&mut sparse_data, validate_utf8) }?;
1954 child_arrays.push(child_array.into_iter().next().unwrap());
1955 }
1956 }
1957 }
1958
1959 if let Some(ref mut offsets_vec) = offsets {
1961 let mut count = vec![0i32; converters.len()];
1962 for type_id in &type_ids {
1963 let field_idx = *type_id as usize;
1964 offsets_vec.push(count[field_idx]);
1965
1966 count[field_idx] += 1;
1967 }
1968 }
1969
1970 let type_ids_buffer = ScalarBuffer::from(type_ids);
1971 let offsets_buffer = offsets.map(ScalarBuffer::from);
1972
1973 let union_array = UnionArray::try_new(
1974 union_fields.clone(),
1975 type_ids_buffer,
1976 offsets_buffer,
1977 child_arrays,
1978 )?;
1979
1980 Arc::new(union_array)
1983 }
1984 };
1985 Ok(array)
1986}
1987
1988#[cfg(test)]
1989mod tests {
1990 use rand::distr::uniform::SampleUniform;
1991 use rand::distr::{Distribution, StandardUniform};
1992 use rand::{Rng, rng};
1993
1994 use arrow_array::builder::*;
1995 use arrow_array::types::*;
1996 use arrow_array::*;
1997 use arrow_buffer::{Buffer, OffsetBuffer};
1998 use arrow_buffer::{NullBuffer, i256};
1999 use arrow_cast::display::{ArrayFormatter, FormatOptions};
2000 use arrow_ord::sort::{LexicographicalComparator, SortColumn};
2001
2002 use super::*;
2003
2004 #[test]
2005 fn test_fixed_width() {
2006 let cols = [
2007 Arc::new(Int16Array::from_iter([
2008 Some(1),
2009 Some(2),
2010 None,
2011 Some(-5),
2012 Some(2),
2013 Some(2),
2014 Some(0),
2015 ])) as ArrayRef,
2016 Arc::new(Float32Array::from_iter([
2017 Some(1.3),
2018 Some(2.5),
2019 None,
2020 Some(4.),
2021 Some(0.1),
2022 Some(-4.),
2023 Some(-0.),
2024 ])) as ArrayRef,
2025 ];
2026
2027 let converter = RowConverter::new(vec![
2028 SortField::new(DataType::Int16),
2029 SortField::new(DataType::Float32),
2030 ])
2031 .unwrap();
2032 let rows = converter.convert_columns(&cols).unwrap();
2033
2034 assert_eq!(rows.offsets, &[0, 8, 16, 24, 32, 40, 48, 56]);
2035 assert_eq!(
2036 rows.buffer,
2037 &[
2038 1, 128, 1, 1, 191, 166, 102, 102, 1, 128, 2, 1, 192, 32, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 127, 251, 1, 192, 128, 0, 0, 1, 128, 2, 1, 189, 204, 204, 205, 1, 128, 2, 1, 63, 127, 255, 255, 1, 128, 0, 1, 127, 255, 255, 255 ]
2053 );
2054
2055 assert!(rows.row(3) < rows.row(6));
2056 assert!(rows.row(0) < rows.row(1));
2057 assert!(rows.row(3) < rows.row(0));
2058 assert!(rows.row(4) < rows.row(1));
2059 assert!(rows.row(5) < rows.row(4));
2060
2061 let back = converter.convert_rows(&rows).unwrap();
2062 for (expected, actual) in cols.iter().zip(&back) {
2063 assert_eq!(expected, actual);
2064 }
2065 }
2066
2067 #[test]
2068 fn test_decimal32() {
2069 let converter = RowConverter::new(vec![SortField::new(DataType::Decimal32(
2070 DECIMAL32_MAX_PRECISION,
2071 7,
2072 ))])
2073 .unwrap();
2074 let col = Arc::new(
2075 Decimal32Array::from_iter([
2076 None,
2077 Some(i32::MIN),
2078 Some(-13),
2079 Some(46_i32),
2080 Some(5456_i32),
2081 Some(i32::MAX),
2082 ])
2083 .with_precision_and_scale(9, 7)
2084 .unwrap(),
2085 ) as ArrayRef;
2086
2087 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2088 for i in 0..rows.num_rows() - 1 {
2089 assert!(rows.row(i) < rows.row(i + 1));
2090 }
2091
2092 let back = converter.convert_rows(&rows).unwrap();
2093 assert_eq!(back.len(), 1);
2094 assert_eq!(col.as_ref(), back[0].as_ref())
2095 }
2096
2097 #[test]
2098 fn test_decimal64() {
2099 let converter = RowConverter::new(vec![SortField::new(DataType::Decimal64(
2100 DECIMAL64_MAX_PRECISION,
2101 7,
2102 ))])
2103 .unwrap();
2104 let col = Arc::new(
2105 Decimal64Array::from_iter([
2106 None,
2107 Some(i64::MIN),
2108 Some(-13),
2109 Some(46_i64),
2110 Some(5456_i64),
2111 Some(i64::MAX),
2112 ])
2113 .with_precision_and_scale(18, 7)
2114 .unwrap(),
2115 ) as ArrayRef;
2116
2117 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2118 for i in 0..rows.num_rows() - 1 {
2119 assert!(rows.row(i) < rows.row(i + 1));
2120 }
2121
2122 let back = converter.convert_rows(&rows).unwrap();
2123 assert_eq!(back.len(), 1);
2124 assert_eq!(col.as_ref(), back[0].as_ref())
2125 }
2126
2127 #[test]
2128 fn test_decimal128() {
2129 let converter = RowConverter::new(vec![SortField::new(DataType::Decimal128(
2130 DECIMAL128_MAX_PRECISION,
2131 7,
2132 ))])
2133 .unwrap();
2134 let col = Arc::new(
2135 Decimal128Array::from_iter([
2136 None,
2137 Some(i128::MIN),
2138 Some(-13),
2139 Some(46_i128),
2140 Some(5456_i128),
2141 Some(i128::MAX),
2142 ])
2143 .with_precision_and_scale(38, 7)
2144 .unwrap(),
2145 ) as ArrayRef;
2146
2147 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2148 for i in 0..rows.num_rows() - 1 {
2149 assert!(rows.row(i) < rows.row(i + 1));
2150 }
2151
2152 let back = converter.convert_rows(&rows).unwrap();
2153 assert_eq!(back.len(), 1);
2154 assert_eq!(col.as_ref(), back[0].as_ref())
2155 }
2156
2157 #[test]
2158 fn test_decimal256() {
2159 let converter = RowConverter::new(vec![SortField::new(DataType::Decimal256(
2160 DECIMAL256_MAX_PRECISION,
2161 7,
2162 ))])
2163 .unwrap();
2164 let col = Arc::new(
2165 Decimal256Array::from_iter([
2166 None,
2167 Some(i256::MIN),
2168 Some(i256::from_parts(0, -1)),
2169 Some(i256::from_parts(u128::MAX, -1)),
2170 Some(i256::from_parts(u128::MAX, 0)),
2171 Some(i256::from_parts(0, 46_i128)),
2172 Some(i256::from_parts(5, 46_i128)),
2173 Some(i256::MAX),
2174 ])
2175 .with_precision_and_scale(DECIMAL256_MAX_PRECISION, 7)
2176 .unwrap(),
2177 ) as ArrayRef;
2178
2179 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2180 for i in 0..rows.num_rows() - 1 {
2181 assert!(rows.row(i) < rows.row(i + 1));
2182 }
2183
2184 let back = converter.convert_rows(&rows).unwrap();
2185 assert_eq!(back.len(), 1);
2186 assert_eq!(col.as_ref(), back[0].as_ref())
2187 }
2188
2189 #[test]
2190 fn test_bool() {
2191 let converter = RowConverter::new(vec![SortField::new(DataType::Boolean)]).unwrap();
2192
2193 let col = Arc::new(BooleanArray::from_iter([None, Some(false), Some(true)])) as ArrayRef;
2194
2195 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2196 assert!(rows.row(2) > rows.row(1));
2197 assert!(rows.row(2) > rows.row(0));
2198 assert!(rows.row(1) > rows.row(0));
2199
2200 let cols = converter.convert_rows(&rows).unwrap();
2201 assert_eq!(&cols[0], &col);
2202
2203 let converter = RowConverter::new(vec![SortField::new_with_options(
2204 DataType::Boolean,
2205 SortOptions::default().desc().with_nulls_first(false),
2206 )])
2207 .unwrap();
2208
2209 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2210 assert!(rows.row(2) < rows.row(1));
2211 assert!(rows.row(2) < rows.row(0));
2212 assert!(rows.row(1) < rows.row(0));
2213 let cols = converter.convert_rows(&rows).unwrap();
2214 assert_eq!(&cols[0], &col);
2215 }
2216
2217 #[test]
2218 fn test_timezone() {
2219 let a =
2220 TimestampNanosecondArray::from(vec![1, 2, 3, 4, 5]).with_timezone("+01:00".to_string());
2221 let d = a.data_type().clone();
2222
2223 let converter = RowConverter::new(vec![SortField::new(a.data_type().clone())]).unwrap();
2224 let rows = converter.convert_columns(&[Arc::new(a) as _]).unwrap();
2225 let back = converter.convert_rows(&rows).unwrap();
2226 assert_eq!(back.len(), 1);
2227 assert_eq!(back[0].data_type(), &d);
2228
2229 let mut a = PrimitiveDictionaryBuilder::<Int32Type, TimestampNanosecondType>::new();
2231 a.append(34).unwrap();
2232 a.append_null();
2233 a.append(345).unwrap();
2234
2235 let dict = a.finish();
2237 let values = TimestampNanosecondArray::from(dict.values().to_data());
2238 let dict_with_tz = dict.with_values(Arc::new(values.with_timezone("+02:00")));
2239 let v = DataType::Timestamp(TimeUnit::Nanosecond, Some("+02:00".into()));
2240 let d = DataType::Dictionary(Box::new(DataType::Int32), Box::new(v.clone()));
2241
2242 assert_eq!(dict_with_tz.data_type(), &d);
2243 let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
2244 let rows = converter
2245 .convert_columns(&[Arc::new(dict_with_tz) as _])
2246 .unwrap();
2247 let back = converter.convert_rows(&rows).unwrap();
2248 assert_eq!(back.len(), 1);
2249 assert_eq!(back[0].data_type(), &v);
2250 }
2251
2252 #[test]
2253 fn test_null_encoding() {
2254 let col = Arc::new(NullArray::new(10));
2255 let converter = RowConverter::new(vec![SortField::new(DataType::Null)]).unwrap();
2256 let rows = converter.convert_columns(&[col]).unwrap();
2257 assert_eq!(rows.num_rows(), 10);
2258 assert_eq!(rows.row(1).data.len(), 0);
2259 }
2260
2261 #[test]
2262 fn test_variable_width() {
2263 let col = Arc::new(StringArray::from_iter([
2264 Some("hello"),
2265 Some("he"),
2266 None,
2267 Some("foo"),
2268 Some(""),
2269 ])) as ArrayRef;
2270
2271 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2272 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2273
2274 assert!(rows.row(1) < rows.row(0));
2275 assert!(rows.row(2) < rows.row(4));
2276 assert!(rows.row(3) < rows.row(0));
2277 assert!(rows.row(3) < rows.row(1));
2278
2279 let cols = converter.convert_rows(&rows).unwrap();
2280 assert_eq!(&cols[0], &col);
2281
2282 let col = Arc::new(BinaryArray::from_iter([
2283 None,
2284 Some(vec![0_u8; 0]),
2285 Some(vec![0_u8; 6]),
2286 Some(vec![0_u8; variable::MINI_BLOCK_SIZE]),
2287 Some(vec![0_u8; variable::MINI_BLOCK_SIZE + 1]),
2288 Some(vec![0_u8; variable::BLOCK_SIZE]),
2289 Some(vec![0_u8; variable::BLOCK_SIZE + 1]),
2290 Some(vec![1_u8; 6]),
2291 Some(vec![1_u8; variable::MINI_BLOCK_SIZE]),
2292 Some(vec![1_u8; variable::MINI_BLOCK_SIZE + 1]),
2293 Some(vec![1_u8; variable::BLOCK_SIZE]),
2294 Some(vec![1_u8; variable::BLOCK_SIZE + 1]),
2295 Some(vec![0xFF_u8; 6]),
2296 Some(vec![0xFF_u8; variable::MINI_BLOCK_SIZE]),
2297 Some(vec![0xFF_u8; variable::MINI_BLOCK_SIZE + 1]),
2298 Some(vec![0xFF_u8; variable::BLOCK_SIZE]),
2299 Some(vec![0xFF_u8; variable::BLOCK_SIZE + 1]),
2300 ])) as ArrayRef;
2301
2302 let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
2303 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2304
2305 for i in 0..rows.num_rows() {
2306 for j in i + 1..rows.num_rows() {
2307 assert!(
2308 rows.row(i) < rows.row(j),
2309 "{} < {} - {:?} < {:?}",
2310 i,
2311 j,
2312 rows.row(i),
2313 rows.row(j)
2314 );
2315 }
2316 }
2317
2318 let cols = converter.convert_rows(&rows).unwrap();
2319 assert_eq!(&cols[0], &col);
2320
2321 let converter = RowConverter::new(vec![SortField::new_with_options(
2322 DataType::Binary,
2323 SortOptions::default().desc().with_nulls_first(false),
2324 )])
2325 .unwrap();
2326 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2327
2328 for i in 0..rows.num_rows() {
2329 for j in i + 1..rows.num_rows() {
2330 assert!(
2331 rows.row(i) > rows.row(j),
2332 "{} > {} - {:?} > {:?}",
2333 i,
2334 j,
2335 rows.row(i),
2336 rows.row(j)
2337 );
2338 }
2339 }
2340
2341 let cols = converter.convert_rows(&rows).unwrap();
2342 assert_eq!(&cols[0], &col);
2343 }
2344
2345 fn dictionary_eq(a: &dyn Array, b: &dyn Array) {
2347 match b.data_type() {
2348 DataType::Dictionary(_, v) => {
2349 assert_eq!(a.data_type(), v.as_ref());
2350 let b = arrow_cast::cast(b, v).unwrap();
2351 assert_eq!(a, b.as_ref())
2352 }
2353 _ => assert_eq!(a, b),
2354 }
2355 }
2356
2357 #[test]
2358 fn test_string_dictionary() {
2359 let a = Arc::new(DictionaryArray::<Int32Type>::from_iter([
2360 Some("foo"),
2361 Some("hello"),
2362 Some("he"),
2363 None,
2364 Some("hello"),
2365 Some(""),
2366 Some("hello"),
2367 Some("hello"),
2368 ])) as ArrayRef;
2369
2370 let field = SortField::new(a.data_type().clone());
2371 let converter = RowConverter::new(vec![field]).unwrap();
2372 let rows_a = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
2373
2374 assert!(rows_a.row(3) < rows_a.row(5));
2375 assert!(rows_a.row(2) < rows_a.row(1));
2376 assert!(rows_a.row(0) < rows_a.row(1));
2377 assert!(rows_a.row(3) < rows_a.row(0));
2378
2379 assert_eq!(rows_a.row(1), rows_a.row(4));
2380 assert_eq!(rows_a.row(1), rows_a.row(6));
2381 assert_eq!(rows_a.row(1), rows_a.row(7));
2382
2383 let cols = converter.convert_rows(&rows_a).unwrap();
2384 dictionary_eq(&cols[0], &a);
2385
2386 let b = Arc::new(DictionaryArray::<Int32Type>::from_iter([
2387 Some("hello"),
2388 None,
2389 Some("cupcakes"),
2390 ])) as ArrayRef;
2391
2392 let rows_b = converter.convert_columns(&[Arc::clone(&b)]).unwrap();
2393 assert_eq!(rows_a.row(1), rows_b.row(0));
2394 assert_eq!(rows_a.row(3), rows_b.row(1));
2395 assert!(rows_b.row(2) < rows_a.row(0));
2396
2397 let cols = converter.convert_rows(&rows_b).unwrap();
2398 dictionary_eq(&cols[0], &b);
2399
2400 let converter = RowConverter::new(vec![SortField::new_with_options(
2401 a.data_type().clone(),
2402 SortOptions::default().desc().with_nulls_first(false),
2403 )])
2404 .unwrap();
2405
2406 let rows_c = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
2407 assert!(rows_c.row(3) > rows_c.row(5));
2408 assert!(rows_c.row(2) > rows_c.row(1));
2409 assert!(rows_c.row(0) > rows_c.row(1));
2410 assert!(rows_c.row(3) > rows_c.row(0));
2411
2412 let cols = converter.convert_rows(&rows_c).unwrap();
2413 dictionary_eq(&cols[0], &a);
2414
2415 let converter = RowConverter::new(vec![SortField::new_with_options(
2416 a.data_type().clone(),
2417 SortOptions::default().desc().with_nulls_first(true),
2418 )])
2419 .unwrap();
2420
2421 let rows_c = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
2422 assert!(rows_c.row(3) < rows_c.row(5));
2423 assert!(rows_c.row(2) > rows_c.row(1));
2424 assert!(rows_c.row(0) > rows_c.row(1));
2425 assert!(rows_c.row(3) < rows_c.row(0));
2426
2427 let cols = converter.convert_rows(&rows_c).unwrap();
2428 dictionary_eq(&cols[0], &a);
2429 }
2430
2431 #[test]
2432 fn test_struct() {
2433 let a = Arc::new(Int32Array::from(vec![1, 1, 2, 2])) as ArrayRef;
2435 let a_f = Arc::new(Field::new("int", DataType::Int32, false));
2436 let u = Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as ArrayRef;
2437 let u_f = Arc::new(Field::new("s", DataType::Utf8, false));
2438 let s1 = Arc::new(StructArray::from(vec![(a_f, a), (u_f, u)])) as ArrayRef;
2439
2440 let sort_fields = vec![SortField::new(s1.data_type().clone())];
2441 let converter = RowConverter::new(sort_fields).unwrap();
2442 let r1 = converter.convert_columns(&[Arc::clone(&s1)]).unwrap();
2443
2444 for (a, b) in r1.iter().zip(r1.iter().skip(1)) {
2445 assert!(a < b);
2446 }
2447
2448 let back = converter.convert_rows(&r1).unwrap();
2449 assert_eq!(back.len(), 1);
2450 assert_eq!(&back[0], &s1);
2451
2452 let data = s1
2454 .to_data()
2455 .into_builder()
2456 .null_bit_buffer(Some(Buffer::from_slice_ref([0b00001010])))
2457 .null_count(2)
2458 .build()
2459 .unwrap();
2460
2461 let s2 = Arc::new(StructArray::from(data)) as ArrayRef;
2462 let r2 = converter.convert_columns(&[Arc::clone(&s2)]).unwrap();
2463 assert_eq!(r2.row(0), r2.row(2)); assert!(r2.row(0) < r2.row(1)); assert_ne!(r1.row(0), r2.row(0)); assert_eq!(r1.row(1), r2.row(1)); let back = converter.convert_rows(&r2).unwrap();
2469 assert_eq!(back.len(), 1);
2470 assert_eq!(&back[0], &s2);
2471
2472 back[0].to_data().validate_full().unwrap();
2473 }
2474
2475 #[test]
2476 fn test_dictionary_in_struct() {
2477 let builder = StringDictionaryBuilder::<Int32Type>::new();
2478 let mut struct_builder = StructBuilder::new(
2479 vec![Field::new_dictionary(
2480 "foo",
2481 DataType::Int32,
2482 DataType::Utf8,
2483 true,
2484 )],
2485 vec![Box::new(builder)],
2486 );
2487
2488 let dict_builder = struct_builder
2489 .field_builder::<StringDictionaryBuilder<Int32Type>>(0)
2490 .unwrap();
2491
2492 dict_builder.append_value("a");
2494 dict_builder.append_null();
2495 dict_builder.append_value("a");
2496 dict_builder.append_value("b");
2497
2498 for _ in 0..4 {
2499 struct_builder.append(true);
2500 }
2501
2502 let s = Arc::new(struct_builder.finish()) as ArrayRef;
2503 let sort_fields = vec![SortField::new(s.data_type().clone())];
2504 let converter = RowConverter::new(sort_fields).unwrap();
2505 let r = converter.convert_columns(&[Arc::clone(&s)]).unwrap();
2506
2507 let back = converter.convert_rows(&r).unwrap();
2508 let [s2] = back.try_into().unwrap();
2509
2510 assert_ne!(&s.data_type(), &s2.data_type());
2513 s2.to_data().validate_full().unwrap();
2514
2515 let s1_struct = s.as_struct();
2519 let s1_0 = s1_struct.column(0);
2520 let s1_idx_0 = s1_0.as_dictionary::<Int32Type>();
2521 let keys = s1_idx_0.keys();
2522 let values = s1_idx_0.values().as_string::<i32>();
2523 let s2_struct = s2.as_struct();
2525 let s2_0 = s2_struct.column(0);
2526 let s2_idx_0 = s2_0.as_string::<i32>();
2527
2528 for i in 0..keys.len() {
2529 if keys.is_null(i) {
2530 assert!(s2_idx_0.is_null(i));
2531 } else {
2532 let dict_index = keys.value(i) as usize;
2533 assert_eq!(values.value(dict_index), s2_idx_0.value(i));
2534 }
2535 }
2536 }
2537
2538 #[test]
2539 fn test_dictionary_in_struct_empty() {
2540 let ty = DataType::Struct(
2541 vec![Field::new_dictionary(
2542 "foo",
2543 DataType::Int32,
2544 DataType::Int32,
2545 false,
2546 )]
2547 .into(),
2548 );
2549 let s = arrow_array::new_empty_array(&ty);
2550
2551 let sort_fields = vec![SortField::new(s.data_type().clone())];
2552 let converter = RowConverter::new(sort_fields).unwrap();
2553 let r = converter.convert_columns(&[Arc::clone(&s)]).unwrap();
2554
2555 let back = converter.convert_rows(&r).unwrap();
2556 let [s2] = back.try_into().unwrap();
2557
2558 assert_ne!(&s.data_type(), &s2.data_type());
2561 s2.to_data().validate_full().unwrap();
2562 assert_eq!(s.len(), 0);
2563 assert_eq!(s2.len(), 0);
2564 }
2565
2566 #[test]
2567 fn test_list_of_string_dictionary() {
2568 let mut builder = ListBuilder::<StringDictionaryBuilder<Int32Type>>::default();
2569 builder.values().append("a").unwrap();
2571 builder.values().append("b").unwrap();
2572 builder.values().append("zero").unwrap();
2573 builder.values().append_null();
2574 builder.values().append("c").unwrap();
2575 builder.values().append("b").unwrap();
2576 builder.values().append("d").unwrap();
2577 builder.append(true);
2578 builder.append(false);
2580 builder.values().append("e").unwrap();
2582 builder.values().append("zero").unwrap();
2583 builder.values().append("a").unwrap();
2584 builder.append(true);
2585
2586 let a = Arc::new(builder.finish()) as ArrayRef;
2587 let data_type = a.data_type().clone();
2588
2589 let field = SortField::new(data_type.clone());
2590 let converter = RowConverter::new(vec![field]).unwrap();
2591 let rows = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
2592
2593 let back = converter.convert_rows(&rows).unwrap();
2594 assert_eq!(back.len(), 1);
2595 let [a2] = back.try_into().unwrap();
2596
2597 assert_ne!(&a.data_type(), &a2.data_type());
2600
2601 a2.to_data().validate_full().unwrap();
2602
2603 let a2_list = a2.as_list::<i32>();
2604 let a1_list = a.as_list::<i32>();
2605
2606 let a1_0 = a1_list.value(0);
2609 let a1_idx_0 = a1_0.as_dictionary::<Int32Type>();
2610 let keys = a1_idx_0.keys();
2611 let values = a1_idx_0.values().as_string::<i32>();
2612 let a2_0 = a2_list.value(0);
2613 let a2_idx_0 = a2_0.as_string::<i32>();
2614
2615 for i in 0..keys.len() {
2616 if keys.is_null(i) {
2617 assert!(a2_idx_0.is_null(i));
2618 } else {
2619 let dict_index = keys.value(i) as usize;
2620 assert_eq!(values.value(dict_index), a2_idx_0.value(i));
2621 }
2622 }
2623
2624 assert!(a1_list.is_null(1));
2626 assert!(a2_list.is_null(1));
2627
2628 let a1_2 = a1_list.value(2);
2630 let a1_idx_2 = a1_2.as_dictionary::<Int32Type>();
2631 let keys = a1_idx_2.keys();
2632 let values = a1_idx_2.values().as_string::<i32>();
2633 let a2_2 = a2_list.value(2);
2634 let a2_idx_2 = a2_2.as_string::<i32>();
2635
2636 for i in 0..keys.len() {
2637 if keys.is_null(i) {
2638 assert!(a2_idx_2.is_null(i));
2639 } else {
2640 let dict_index = keys.value(i) as usize;
2641 assert_eq!(values.value(dict_index), a2_idx_2.value(i));
2642 }
2643 }
2644 }
2645
2646 #[test]
2647 fn test_primitive_dictionary() {
2648 let mut builder = PrimitiveDictionaryBuilder::<Int32Type, Int32Type>::new();
2649 builder.append(2).unwrap();
2650 builder.append(3).unwrap();
2651 builder.append(0).unwrap();
2652 builder.append_null();
2653 builder.append(5).unwrap();
2654 builder.append(3).unwrap();
2655 builder.append(-1).unwrap();
2656
2657 let a = builder.finish();
2658 let data_type = a.data_type().clone();
2659 let columns = [Arc::new(a) as ArrayRef];
2660
2661 let field = SortField::new(data_type.clone());
2662 let converter = RowConverter::new(vec![field]).unwrap();
2663 let rows = converter.convert_columns(&columns).unwrap();
2664 assert!(rows.row(0) < rows.row(1));
2665 assert!(rows.row(2) < rows.row(0));
2666 assert!(rows.row(3) < rows.row(2));
2667 assert!(rows.row(6) < rows.row(2));
2668 assert!(rows.row(3) < rows.row(6));
2669
2670 let back = converter.convert_rows(&rows).unwrap();
2671 assert_eq!(back.len(), 1);
2672 back[0].to_data().validate_full().unwrap();
2673 }
2674
2675 #[test]
2676 fn test_dictionary_nulls() {
2677 let values = Int32Array::from_iter([Some(1), Some(-1), None, Some(4), None]).into_data();
2678 let keys =
2679 Int32Array::from_iter([Some(0), Some(0), Some(1), Some(2), Some(4), None]).into_data();
2680
2681 let data_type = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Int32));
2682 let data = keys
2683 .into_builder()
2684 .data_type(data_type.clone())
2685 .child_data(vec![values])
2686 .build()
2687 .unwrap();
2688
2689 let columns = [Arc::new(DictionaryArray::<Int32Type>::from(data)) as ArrayRef];
2690 let field = SortField::new(data_type.clone());
2691 let converter = RowConverter::new(vec![field]).unwrap();
2692 let rows = converter.convert_columns(&columns).unwrap();
2693
2694 assert_eq!(rows.row(0), rows.row(1));
2695 assert_eq!(rows.row(3), rows.row(4));
2696 assert_eq!(rows.row(4), rows.row(5));
2697 assert!(rows.row(3) < rows.row(0));
2698 }
2699
2700 #[test]
2701 fn test_from_binary_shared_buffer() {
2702 let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
2703 let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _;
2704 let rows = converter.convert_columns(&[array]).unwrap();
2705 let binary_rows = rows.try_into_binary().expect("known-small rows");
2706 let _binary_rows_shared_buffer = binary_rows.clone();
2707
2708 let parsed = converter.from_binary(binary_rows);
2709
2710 converter.convert_rows(parsed.iter()).unwrap();
2711 }
2712
2713 #[test]
2714 #[should_panic(expected = "Encountered non UTF-8 data")]
2715 fn test_invalid_utf8() {
2716 let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
2717 let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _;
2718 let rows = converter.convert_columns(&[array]).unwrap();
2719 let binary_row = rows.row(0);
2720
2721 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2722 let parser = converter.parser();
2723 let utf8_row = parser.parse(binary_row.as_ref());
2724
2725 converter.convert_rows(std::iter::once(utf8_row)).unwrap();
2726 }
2727
2728 #[test]
2729 #[should_panic(expected = "Encountered non UTF-8 data")]
2730 fn test_invalid_utf8_array() {
2731 let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
2732 let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _;
2733 let rows = converter.convert_columns(&[array]).unwrap();
2734 let binary_rows = rows.try_into_binary().expect("known-small rows");
2735
2736 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2737 let parsed = converter.from_binary(binary_rows);
2738
2739 converter.convert_rows(parsed.iter()).unwrap();
2740 }
2741
2742 #[test]
2743 #[should_panic(expected = "index out of bounds")]
2744 fn test_invalid_empty() {
2745 let binary_row: &[u8] = &[];
2746
2747 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2748 let parser = converter.parser();
2749 let utf8_row = parser.parse(binary_row.as_ref());
2750
2751 converter.convert_rows(std::iter::once(utf8_row)).unwrap();
2752 }
2753
2754 #[test]
2755 #[should_panic(expected = "index out of bounds")]
2756 fn test_invalid_empty_array() {
2757 let row: &[u8] = &[];
2758 let binary_rows = BinaryArray::from(vec![row]);
2759
2760 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2761 let parsed = converter.from_binary(binary_rows);
2762
2763 converter.convert_rows(parsed.iter()).unwrap();
2764 }
2765
2766 #[test]
2767 #[should_panic(expected = "index out of bounds")]
2768 fn test_invalid_truncated() {
2769 let binary_row: &[u8] = &[0x02];
2770
2771 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2772 let parser = converter.parser();
2773 let utf8_row = parser.parse(binary_row.as_ref());
2774
2775 converter.convert_rows(std::iter::once(utf8_row)).unwrap();
2776 }
2777
2778 #[test]
2779 #[should_panic(expected = "index out of bounds")]
2780 fn test_invalid_truncated_array() {
2781 let row: &[u8] = &[0x02];
2782 let binary_rows = BinaryArray::from(vec![row]);
2783
2784 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2785 let parsed = converter.from_binary(binary_rows);
2786
2787 converter.convert_rows(parsed.iter()).unwrap();
2788 }
2789
2790 #[test]
2791 #[should_panic(expected = "rows were not produced by this RowConverter")]
2792 fn test_different_converter() {
2793 let values = Arc::new(Int32Array::from_iter([Some(1), Some(-1)]));
2794 let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
2795 let rows = converter.convert_columns(&[values]).unwrap();
2796
2797 let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
2798 let _ = converter.convert_rows(&rows);
2799 }
2800
2801 fn test_single_list<O: OffsetSizeTrait>() {
2802 let mut builder = GenericListBuilder::<O, _>::new(Int32Builder::new());
2803 builder.values().append_value(32);
2804 builder.values().append_value(52);
2805 builder.values().append_value(32);
2806 builder.append(true);
2807 builder.values().append_value(32);
2808 builder.values().append_value(52);
2809 builder.values().append_value(12);
2810 builder.append(true);
2811 builder.values().append_value(32);
2812 builder.values().append_value(52);
2813 builder.append(true);
2814 builder.values().append_value(32); builder.values().append_value(52); builder.append(false);
2817 builder.values().append_value(32);
2818 builder.values().append_null();
2819 builder.append(true);
2820 builder.append(true);
2821 builder.values().append_value(17); builder.values().append_null(); builder.append(false);
2824
2825 let list = Arc::new(builder.finish()) as ArrayRef;
2826 let d = list.data_type().clone();
2827
2828 let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
2829
2830 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2831 assert!(rows.row(0) > rows.row(1)); assert!(rows.row(2) < rows.row(1)); assert!(rows.row(3) < rows.row(2)); assert!(rows.row(4) < rows.row(2)); assert!(rows.row(5) < rows.row(2)); assert!(rows.row(3) < rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
2840 assert_eq!(back.len(), 1);
2841 back[0].to_data().validate_full().unwrap();
2842 assert_eq!(&back[0], &list);
2843
2844 let options = SortOptions::default().asc().with_nulls_first(false);
2845 let field = SortField::new_with_options(d.clone(), options);
2846 let converter = RowConverter::new(vec![field]).unwrap();
2847 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2848
2849 assert!(rows.row(0) > rows.row(1)); assert!(rows.row(2) < rows.row(1)); assert!(rows.row(3) > rows.row(2)); assert!(rows.row(4) > rows.row(2)); assert!(rows.row(5) < rows.row(2)); assert!(rows.row(3) > rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
2858 assert_eq!(back.len(), 1);
2859 back[0].to_data().validate_full().unwrap();
2860 assert_eq!(&back[0], &list);
2861
2862 let options = SortOptions::default().desc().with_nulls_first(false);
2863 let field = SortField::new_with_options(d.clone(), options);
2864 let converter = RowConverter::new(vec![field]).unwrap();
2865 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2866
2867 assert!(rows.row(0) < rows.row(1)); assert!(rows.row(2) > rows.row(1)); assert!(rows.row(3) > rows.row(2)); assert!(rows.row(4) > rows.row(2)); assert!(rows.row(5) > rows.row(2)); assert!(rows.row(3) > rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
2876 assert_eq!(back.len(), 1);
2877 back[0].to_data().validate_full().unwrap();
2878 assert_eq!(&back[0], &list);
2879
2880 let options = SortOptions::default().desc().with_nulls_first(true);
2881 let field = SortField::new_with_options(d, options);
2882 let converter = RowConverter::new(vec![field]).unwrap();
2883 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2884
2885 assert!(rows.row(0) < rows.row(1)); assert!(rows.row(2) > rows.row(1)); assert!(rows.row(3) < rows.row(2)); assert!(rows.row(4) < rows.row(2)); assert!(rows.row(5) > rows.row(2)); assert!(rows.row(3) < rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
2894 assert_eq!(back.len(), 1);
2895 back[0].to_data().validate_full().unwrap();
2896 assert_eq!(&back[0], &list);
2897
2898 let sliced_list = list.slice(1, 5);
2899 let rows_on_sliced_list = converter
2900 .convert_columns(&[Arc::clone(&sliced_list)])
2901 .unwrap();
2902
2903 assert!(rows_on_sliced_list.row(1) > rows_on_sliced_list.row(0)); assert!(rows_on_sliced_list.row(2) < rows_on_sliced_list.row(1)); assert!(rows_on_sliced_list.row(3) < rows_on_sliced_list.row(1)); assert!(rows_on_sliced_list.row(4) > rows_on_sliced_list.row(1)); assert!(rows_on_sliced_list.row(2) < rows_on_sliced_list.row(4)); let back = converter.convert_rows(&rows_on_sliced_list).unwrap();
2910 assert_eq!(back.len(), 1);
2911 back[0].to_data().validate_full().unwrap();
2912 assert_eq!(&back[0], &sliced_list);
2913 }
2914
2915 fn test_nested_list<O: OffsetSizeTrait>() {
2916 let mut builder =
2917 GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(Int32Builder::new()));
2918
2919 builder.values().values().append_value(1);
2920 builder.values().values().append_value(2);
2921 builder.values().append(true);
2922 builder.values().values().append_value(1);
2923 builder.values().values().append_null();
2924 builder.values().append(true);
2925 builder.append(true);
2926
2927 builder.values().values().append_value(1);
2928 builder.values().values().append_null();
2929 builder.values().append(true);
2930 builder.values().values().append_value(1);
2931 builder.values().values().append_null();
2932 builder.values().append(true);
2933 builder.append(true);
2934
2935 builder.values().values().append_value(1);
2936 builder.values().values().append_null();
2937 builder.values().append(true);
2938 builder.values().append(false);
2939 builder.append(true);
2940 builder.append(false);
2941
2942 builder.values().values().append_value(1);
2943 builder.values().values().append_value(2);
2944 builder.values().append(true);
2945 builder.append(true);
2946
2947 let list = Arc::new(builder.finish()) as ArrayRef;
2948 let d = list.data_type().clone();
2949
2950 let options = SortOptions::default().asc().with_nulls_first(true);
2958 let field = SortField::new_with_options(d.clone(), options);
2959 let converter = RowConverter::new(vec![field]).unwrap();
2960 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2961
2962 assert!(rows.row(0) > rows.row(1));
2963 assert!(rows.row(1) > rows.row(2));
2964 assert!(rows.row(2) > rows.row(3));
2965 assert!(rows.row(4) < rows.row(0));
2966 assert!(rows.row(4) > rows.row(1));
2967
2968 let back = converter.convert_rows(&rows).unwrap();
2969 assert_eq!(back.len(), 1);
2970 back[0].to_data().validate_full().unwrap();
2971 assert_eq!(&back[0], &list);
2972
2973 let options = SortOptions::default().desc().with_nulls_first(true);
2974 let field = SortField::new_with_options(d.clone(), options);
2975 let converter = RowConverter::new(vec![field]).unwrap();
2976 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2977
2978 assert!(rows.row(0) > rows.row(1));
2979 assert!(rows.row(1) > rows.row(2));
2980 assert!(rows.row(2) > rows.row(3));
2981 assert!(rows.row(4) > rows.row(0));
2982 assert!(rows.row(4) > rows.row(1));
2983
2984 let back = converter.convert_rows(&rows).unwrap();
2985 assert_eq!(back.len(), 1);
2986 back[0].to_data().validate_full().unwrap();
2987 assert_eq!(&back[0], &list);
2988
2989 let options = SortOptions::default().desc().with_nulls_first(false);
2990 let field = SortField::new_with_options(d, options);
2991 let converter = RowConverter::new(vec![field]).unwrap();
2992 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2993
2994 assert!(rows.row(0) < rows.row(1));
2995 assert!(rows.row(1) < rows.row(2));
2996 assert!(rows.row(2) < rows.row(3));
2997 assert!(rows.row(4) > rows.row(0));
2998 assert!(rows.row(4) < rows.row(1));
2999
3000 let back = converter.convert_rows(&rows).unwrap();
3001 assert_eq!(back.len(), 1);
3002 back[0].to_data().validate_full().unwrap();
3003 assert_eq!(&back[0], &list);
3004
3005 let sliced_list = list.slice(1, 3);
3006 let rows = converter
3007 .convert_columns(&[Arc::clone(&sliced_list)])
3008 .unwrap();
3009
3010 assert!(rows.row(0) < rows.row(1));
3011 assert!(rows.row(1) < rows.row(2));
3012
3013 let back = converter.convert_rows(&rows).unwrap();
3014 assert_eq!(back.len(), 1);
3015 back[0].to_data().validate_full().unwrap();
3016 assert_eq!(&back[0], &sliced_list);
3017 }
3018
3019 #[test]
3020 fn test_list() {
3021 test_single_list::<i32>();
3022 test_nested_list::<i32>();
3023 }
3024
3025 #[test]
3026 fn test_large_list() {
3027 test_single_list::<i64>();
3028 test_nested_list::<i64>();
3029 }
3030
3031 #[test]
3032 fn test_fixed_size_list() {
3033 let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 3);
3034 builder.values().append_value(32);
3035 builder.values().append_value(52);
3036 builder.values().append_value(32);
3037 builder.append(true);
3038 builder.values().append_value(32);
3039 builder.values().append_value(52);
3040 builder.values().append_value(12);
3041 builder.append(true);
3042 builder.values().append_value(32);
3043 builder.values().append_value(52);
3044 builder.values().append_null();
3045 builder.append(true);
3046 builder.values().append_value(32); builder.values().append_value(52); builder.values().append_value(13); builder.append(false);
3050 builder.values().append_value(32);
3051 builder.values().append_null();
3052 builder.values().append_null();
3053 builder.append(true);
3054 builder.values().append_null();
3055 builder.values().append_null();
3056 builder.values().append_null();
3057 builder.append(true);
3058 builder.values().append_value(17); builder.values().append_null(); builder.values().append_value(77); builder.append(false);
3062
3063 let list = Arc::new(builder.finish()) as ArrayRef;
3064 let d = list.data_type().clone();
3065
3066 let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
3068
3069 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3070 assert!(rows.row(0) > rows.row(1)); assert!(rows.row(2) < rows.row(1)); assert!(rows.row(3) < rows.row(2)); assert!(rows.row(4) < rows.row(2)); assert!(rows.row(5) < rows.row(2)); assert!(rows.row(3) < rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
3079 assert_eq!(back.len(), 1);
3080 back[0].to_data().validate_full().unwrap();
3081 assert_eq!(&back[0], &list);
3082
3083 let options = SortOptions::default().asc().with_nulls_first(false);
3085 let field = SortField::new_with_options(d.clone(), options);
3086 let converter = RowConverter::new(vec![field]).unwrap();
3087 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3088 assert!(rows.row(0) > rows.row(1)); assert!(rows.row(2) > rows.row(1)); assert!(rows.row(3) > rows.row(2)); assert!(rows.row(4) > rows.row(2)); assert!(rows.row(5) > rows.row(2)); assert!(rows.row(3) > rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
3097 assert_eq!(back.len(), 1);
3098 back[0].to_data().validate_full().unwrap();
3099 assert_eq!(&back[0], &list);
3100
3101 let options = SortOptions::default().desc().with_nulls_first(false);
3103 let field = SortField::new_with_options(d.clone(), options);
3104 let converter = RowConverter::new(vec![field]).unwrap();
3105 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3106 assert!(rows.row(0) < rows.row(1)); assert!(rows.row(2) > rows.row(1)); assert!(rows.row(3) > rows.row(2)); assert!(rows.row(4) > rows.row(2)); assert!(rows.row(5) > rows.row(2)); assert!(rows.row(3) > rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
3115 assert_eq!(back.len(), 1);
3116 back[0].to_data().validate_full().unwrap();
3117 assert_eq!(&back[0], &list);
3118
3119 let options = SortOptions::default().desc().with_nulls_first(true);
3121 let field = SortField::new_with_options(d, options);
3122 let converter = RowConverter::new(vec![field]).unwrap();
3123 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3124
3125 assert!(rows.row(0) < rows.row(1)); assert!(rows.row(2) < rows.row(1)); assert!(rows.row(3) < rows.row(2)); assert!(rows.row(4) < rows.row(2)); assert!(rows.row(5) < rows.row(2)); assert!(rows.row(3) < rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
3134 assert_eq!(back.len(), 1);
3135 back[0].to_data().validate_full().unwrap();
3136 assert_eq!(&back[0], &list);
3137
3138 let sliced_list = list.slice(1, 5);
3139 let rows_on_sliced_list = converter
3140 .convert_columns(&[Arc::clone(&sliced_list)])
3141 .unwrap();
3142
3143 assert!(rows_on_sliced_list.row(2) < rows_on_sliced_list.row(1)); assert!(rows_on_sliced_list.row(3) < rows_on_sliced_list.row(1)); assert!(rows_on_sliced_list.row(4) < rows_on_sliced_list.row(1)); assert!(rows_on_sliced_list.row(2) < rows_on_sliced_list.row(4)); let back = converter.convert_rows(&rows_on_sliced_list).unwrap();
3149 assert_eq!(back.len(), 1);
3150 back[0].to_data().validate_full().unwrap();
3151 assert_eq!(&back[0], &sliced_list);
3152 }
3153
3154 #[test]
3155 fn test_two_fixed_size_lists() {
3156 let mut first = FixedSizeListBuilder::new(UInt8Builder::new(), 1);
3157 first.values().append_value(100);
3159 first.append(true);
3160 first.values().append_value(101);
3162 first.append(true);
3163 first.values().append_value(102);
3165 first.append(true);
3166 first.values().append_null();
3168 first.append(true);
3169 first.values().append_null(); first.append(false);
3172 let first = Arc::new(first.finish()) as ArrayRef;
3173 let first_type = first.data_type().clone();
3174
3175 let mut second = FixedSizeListBuilder::new(UInt8Builder::new(), 1);
3176 second.values().append_value(200);
3178 second.append(true);
3179 second.values().append_value(201);
3181 second.append(true);
3182 second.values().append_value(202);
3184 second.append(true);
3185 second.values().append_null();
3187 second.append(true);
3188 second.values().append_null(); second.append(false);
3191 let second = Arc::new(second.finish()) as ArrayRef;
3192 let second_type = second.data_type().clone();
3193
3194 let converter = RowConverter::new(vec![
3195 SortField::new(first_type.clone()),
3196 SortField::new(second_type.clone()),
3197 ])
3198 .unwrap();
3199
3200 let rows = converter
3201 .convert_columns(&[Arc::clone(&first), Arc::clone(&second)])
3202 .unwrap();
3203
3204 let back = converter.convert_rows(&rows).unwrap();
3205 assert_eq!(back.len(), 2);
3206 back[0].to_data().validate_full().unwrap();
3207 assert_eq!(&back[0], &first);
3208 back[1].to_data().validate_full().unwrap();
3209 assert_eq!(&back[1], &second);
3210 }
3211
3212 #[test]
3213 fn test_fixed_size_list_with_variable_width_content() {
3214 let mut first = FixedSizeListBuilder::new(
3215 StructBuilder::from_fields(
3216 vec![
3217 Field::new(
3218 "timestamp",
3219 DataType::Timestamp(TimeUnit::Microsecond, Some(Arc::from("UTC"))),
3220 false,
3221 ),
3222 Field::new("offset_minutes", DataType::Int16, false),
3223 Field::new("time_zone", DataType::Utf8, false),
3224 ],
3225 1,
3226 ),
3227 1,
3228 );
3229 first
3231 .values()
3232 .field_builder::<TimestampMicrosecondBuilder>(0)
3233 .unwrap()
3234 .append_null();
3235 first
3236 .values()
3237 .field_builder::<Int16Builder>(1)
3238 .unwrap()
3239 .append_null();
3240 first
3241 .values()
3242 .field_builder::<StringBuilder>(2)
3243 .unwrap()
3244 .append_null();
3245 first.values().append(false);
3246 first.append(false);
3247 first
3249 .values()
3250 .field_builder::<TimestampMicrosecondBuilder>(0)
3251 .unwrap()
3252 .append_null();
3253 first
3254 .values()
3255 .field_builder::<Int16Builder>(1)
3256 .unwrap()
3257 .append_null();
3258 first
3259 .values()
3260 .field_builder::<StringBuilder>(2)
3261 .unwrap()
3262 .append_null();
3263 first.values().append(false);
3264 first.append(true);
3265 first
3267 .values()
3268 .field_builder::<TimestampMicrosecondBuilder>(0)
3269 .unwrap()
3270 .append_value(0);
3271 first
3272 .values()
3273 .field_builder::<Int16Builder>(1)
3274 .unwrap()
3275 .append_value(0);
3276 first
3277 .values()
3278 .field_builder::<StringBuilder>(2)
3279 .unwrap()
3280 .append_value("UTC");
3281 first.values().append(true);
3282 first.append(true);
3283 first
3285 .values()
3286 .field_builder::<TimestampMicrosecondBuilder>(0)
3287 .unwrap()
3288 .append_value(1126351800123456);
3289 first
3290 .values()
3291 .field_builder::<Int16Builder>(1)
3292 .unwrap()
3293 .append_value(120);
3294 first
3295 .values()
3296 .field_builder::<StringBuilder>(2)
3297 .unwrap()
3298 .append_value("Europe/Warsaw");
3299 first.values().append(true);
3300 first.append(true);
3301 let first = Arc::new(first.finish()) as ArrayRef;
3302 let first_type = first.data_type().clone();
3303
3304 let mut second = StringBuilder::new();
3305 second.append_value("somewhere near");
3306 second.append_null();
3307 second.append_value("Greenwich");
3308 second.append_value("Warsaw");
3309 let second = Arc::new(second.finish()) as ArrayRef;
3310 let second_type = second.data_type().clone();
3311
3312 let converter = RowConverter::new(vec![
3313 SortField::new(first_type.clone()),
3314 SortField::new(second_type.clone()),
3315 ])
3316 .unwrap();
3317
3318 let rows = converter
3319 .convert_columns(&[Arc::clone(&first), Arc::clone(&second)])
3320 .unwrap();
3321
3322 let back = converter.convert_rows(&rows).unwrap();
3323 assert_eq!(back.len(), 2);
3324 back[0].to_data().validate_full().unwrap();
3325 assert_eq!(&back[0], &first);
3326 back[1].to_data().validate_full().unwrap();
3327 assert_eq!(&back[1], &second);
3328 }
3329
3330 fn generate_primitive_array<K>(len: usize, valid_percent: f64) -> PrimitiveArray<K>
3331 where
3332 K: ArrowPrimitiveType,
3333 StandardUniform: Distribution<K::Native>,
3334 {
3335 let mut rng = rng();
3336 (0..len)
3337 .map(|_| rng.random_bool(valid_percent).then(|| rng.random()))
3338 .collect()
3339 }
3340
3341 fn generate_strings<O: OffsetSizeTrait>(
3342 len: usize,
3343 valid_percent: f64,
3344 ) -> GenericStringArray<O> {
3345 let mut rng = rng();
3346 (0..len)
3347 .map(|_| {
3348 rng.random_bool(valid_percent).then(|| {
3349 let len = rng.random_range(0..100);
3350 let bytes = (0..len).map(|_| rng.random_range(0..128)).collect();
3351 String::from_utf8(bytes).unwrap()
3352 })
3353 })
3354 .collect()
3355 }
3356
3357 fn generate_string_view(len: usize, valid_percent: f64) -> StringViewArray {
3358 let mut rng = rng();
3359 (0..len)
3360 .map(|_| {
3361 rng.random_bool(valid_percent).then(|| {
3362 let len = rng.random_range(0..100);
3363 let bytes = (0..len).map(|_| rng.random_range(0..128)).collect();
3364 String::from_utf8(bytes).unwrap()
3365 })
3366 })
3367 .collect()
3368 }
3369
3370 fn generate_byte_view(len: usize, valid_percent: f64) -> BinaryViewArray {
3371 let mut rng = rng();
3372 (0..len)
3373 .map(|_| {
3374 rng.random_bool(valid_percent).then(|| {
3375 let len = rng.random_range(0..100);
3376 let bytes: Vec<_> = (0..len).map(|_| rng.random_range(0..128)).collect();
3377 bytes
3378 })
3379 })
3380 .collect()
3381 }
3382
3383 fn generate_fixed_stringview_column(len: usize) -> StringViewArray {
3384 let edge_cases = vec![
3385 Some("bar".to_string()),
3386 Some("bar\0".to_string()),
3387 Some("LongerThan12Bytes".to_string()),
3388 Some("LongerThan12Bytez".to_string()),
3389 Some("LongerThan12Bytes\0".to_string()),
3390 Some("LongerThan12Byt".to_string()),
3391 Some("backend one".to_string()),
3392 Some("backend two".to_string()),
3393 Some("a".repeat(257)),
3394 Some("a".repeat(300)),
3395 ];
3396
3397 let mut values = Vec::with_capacity(len);
3399 for i in 0..len {
3400 values.push(
3401 edge_cases
3402 .get(i % edge_cases.len())
3403 .cloned()
3404 .unwrap_or(None),
3405 );
3406 }
3407
3408 StringViewArray::from(values)
3409 }
3410
3411 fn generate_dictionary<K>(
3412 values: ArrayRef,
3413 len: usize,
3414 valid_percent: f64,
3415 ) -> DictionaryArray<K>
3416 where
3417 K: ArrowDictionaryKeyType,
3418 K::Native: SampleUniform,
3419 {
3420 let mut rng = rng();
3421 let min_key = K::Native::from_usize(0).unwrap();
3422 let max_key = K::Native::from_usize(values.len()).unwrap();
3423 let keys: PrimitiveArray<K> = (0..len)
3424 .map(|_| {
3425 rng.random_bool(valid_percent)
3426 .then(|| rng.random_range(min_key..max_key))
3427 })
3428 .collect();
3429
3430 let data_type =
3431 DataType::Dictionary(Box::new(K::DATA_TYPE), Box::new(values.data_type().clone()));
3432
3433 let data = keys
3434 .into_data()
3435 .into_builder()
3436 .data_type(data_type)
3437 .add_child_data(values.to_data())
3438 .build()
3439 .unwrap();
3440
3441 DictionaryArray::from(data)
3442 }
3443
3444 fn generate_fixed_size_binary(len: usize, valid_percent: f64) -> FixedSizeBinaryArray {
3445 let mut rng = rng();
3446 let width = rng.random_range(0..20);
3447 let mut builder = FixedSizeBinaryBuilder::new(width);
3448
3449 let mut b = vec![0; width as usize];
3450 for _ in 0..len {
3451 match rng.random_bool(valid_percent) {
3452 true => {
3453 b.iter_mut().for_each(|x| *x = rng.random());
3454 builder.append_value(&b).unwrap();
3455 }
3456 false => builder.append_null(),
3457 }
3458 }
3459
3460 builder.finish()
3461 }
3462
3463 fn generate_struct(len: usize, valid_percent: f64) -> StructArray {
3464 let mut rng = rng();
3465 let nulls = NullBuffer::from_iter((0..len).map(|_| rng.random_bool(valid_percent)));
3466 let a = generate_primitive_array::<Int32Type>(len, valid_percent);
3467 let b = generate_strings::<i32>(len, valid_percent);
3468 let fields = Fields::from(vec![
3469 Field::new("a", DataType::Int32, true),
3470 Field::new("b", DataType::Utf8, true),
3471 ]);
3472 let values = vec![Arc::new(a) as _, Arc::new(b) as _];
3473 StructArray::new(fields, values, Some(nulls))
3474 }
3475
3476 fn generate_list<F>(len: usize, valid_percent: f64, values: F) -> ListArray
3477 where
3478 F: FnOnce(usize) -> ArrayRef,
3479 {
3480 let mut rng = rng();
3481 let offsets = OffsetBuffer::<i32>::from_lengths((0..len).map(|_| rng.random_range(0..10)));
3482 let values_len = offsets.last().unwrap().to_usize().unwrap();
3483 let values = values(values_len);
3484 let nulls = NullBuffer::from_iter((0..len).map(|_| rng.random_bool(valid_percent)));
3485 let field = Arc::new(Field::new_list_field(values.data_type().clone(), true));
3486 ListArray::new(field, offsets, values, Some(nulls))
3487 }
3488
3489 fn generate_column(len: usize) -> ArrayRef {
3490 let mut rng = rng();
3491 match rng.random_range(0..18) {
3492 0 => Arc::new(generate_primitive_array::<Int32Type>(len, 0.8)),
3493 1 => Arc::new(generate_primitive_array::<UInt32Type>(len, 0.8)),
3494 2 => Arc::new(generate_primitive_array::<Int64Type>(len, 0.8)),
3495 3 => Arc::new(generate_primitive_array::<UInt64Type>(len, 0.8)),
3496 4 => Arc::new(generate_primitive_array::<Float32Type>(len, 0.8)),
3497 5 => Arc::new(generate_primitive_array::<Float64Type>(len, 0.8)),
3498 6 => Arc::new(generate_strings::<i32>(len, 0.8)),
3499 7 => Arc::new(generate_dictionary::<Int64Type>(
3500 Arc::new(generate_strings::<i32>(rng.random_range(1..len), 1.0)),
3502 len,
3503 0.8,
3504 )),
3505 8 => Arc::new(generate_dictionary::<Int64Type>(
3506 Arc::new(generate_primitive_array::<Int64Type>(
3508 rng.random_range(1..len),
3509 1.0,
3510 )),
3511 len,
3512 0.8,
3513 )),
3514 9 => Arc::new(generate_fixed_size_binary(len, 0.8)),
3515 10 => Arc::new(generate_struct(len, 0.8)),
3516 11 => Arc::new(generate_list(len, 0.8, |values_len| {
3517 Arc::new(generate_primitive_array::<Int64Type>(values_len, 0.8))
3518 })),
3519 12 => Arc::new(generate_list(len, 0.8, |values_len| {
3520 Arc::new(generate_strings::<i32>(values_len, 0.8))
3521 })),
3522 13 => Arc::new(generate_list(len, 0.8, |values_len| {
3523 Arc::new(generate_struct(values_len, 0.8))
3524 })),
3525 14 => Arc::new(generate_string_view(len, 0.8)),
3526 15 => Arc::new(generate_byte_view(len, 0.8)),
3527 16 => Arc::new(generate_fixed_stringview_column(len)),
3528 17 => Arc::new(
3529 generate_list(len + 1000, 0.8, |values_len| {
3530 Arc::new(generate_primitive_array::<Int64Type>(values_len, 0.8))
3531 })
3532 .slice(500, len),
3533 ),
3534 _ => unreachable!(),
3535 }
3536 }
3537
3538 fn print_row(cols: &[SortColumn], row: usize) -> String {
3539 let t: Vec<_> = cols
3540 .iter()
3541 .map(|x| match x.values.is_valid(row) {
3542 true => {
3543 let opts = FormatOptions::default().with_null("NULL");
3544 let formatter = ArrayFormatter::try_new(x.values.as_ref(), &opts).unwrap();
3545 formatter.value(row).to_string()
3546 }
3547 false => "NULL".to_string(),
3548 })
3549 .collect();
3550 t.join(",")
3551 }
3552
3553 fn print_col_types(cols: &[SortColumn]) -> String {
3554 let t: Vec<_> = cols
3555 .iter()
3556 .map(|x| x.values.data_type().to_string())
3557 .collect();
3558 t.join(",")
3559 }
3560
3561 #[test]
3562 #[cfg_attr(miri, ignore)]
3563 fn fuzz_test() {
3564 for _ in 0..100 {
3565 let mut rng = rng();
3566 let num_columns = rng.random_range(1..5);
3567 let len = rng.random_range(5..100);
3568 let arrays: Vec<_> = (0..num_columns).map(|_| generate_column(len)).collect();
3569
3570 let options: Vec<_> = (0..num_columns)
3571 .map(|_| SortOptions {
3572 descending: rng.random_bool(0.5),
3573 nulls_first: rng.random_bool(0.5),
3574 })
3575 .collect();
3576
3577 let sort_columns: Vec<_> = options
3578 .iter()
3579 .zip(&arrays)
3580 .map(|(o, c)| SortColumn {
3581 values: Arc::clone(c),
3582 options: Some(*o),
3583 })
3584 .collect();
3585
3586 let comparator = LexicographicalComparator::try_new(&sort_columns).unwrap();
3587
3588 let columns: Vec<SortField> = options
3589 .into_iter()
3590 .zip(&arrays)
3591 .map(|(o, a)| SortField::new_with_options(a.data_type().clone(), o))
3592 .collect();
3593
3594 let converter = RowConverter::new(columns).unwrap();
3595 let rows = converter.convert_columns(&arrays).unwrap();
3596
3597 for i in 0..len {
3598 for j in 0..len {
3599 let row_i = rows.row(i);
3600 let row_j = rows.row(j);
3601 let row_cmp = row_i.cmp(&row_j);
3602 let lex_cmp = comparator.compare(i, j);
3603 assert_eq!(
3604 row_cmp,
3605 lex_cmp,
3606 "({:?} vs {:?}) vs ({:?} vs {:?}) for types {}",
3607 print_row(&sort_columns, i),
3608 print_row(&sort_columns, j),
3609 row_i,
3610 row_j,
3611 print_col_types(&sort_columns)
3612 );
3613 }
3614 }
3615
3616 let back = converter.convert_rows(&rows).unwrap();
3619 for (actual, expected) in back.iter().zip(&arrays) {
3620 actual.to_data().validate_full().unwrap();
3621 dictionary_eq(actual, expected)
3622 }
3623
3624 let rows = rows.try_into_binary().expect("reasonable size");
3627 let parser = converter.parser();
3628 let back = converter
3629 .convert_rows(rows.iter().map(|b| parser.parse(b.expect("valid bytes"))))
3630 .unwrap();
3631 for (actual, expected) in back.iter().zip(&arrays) {
3632 actual.to_data().validate_full().unwrap();
3633 dictionary_eq(actual, expected)
3634 }
3635
3636 let rows = converter.from_binary(rows);
3637 let back = converter.convert_rows(&rows).unwrap();
3638 for (actual, expected) in back.iter().zip(&arrays) {
3639 actual.to_data().validate_full().unwrap();
3640 dictionary_eq(actual, expected)
3641 }
3642 }
3643 }
3644
3645 #[test]
3646 fn test_clear() {
3647 let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
3648 let mut rows = converter.empty_rows(3, 128);
3649
3650 let first = Int32Array::from(vec![None, Some(2), Some(4)]);
3651 let second = Int32Array::from(vec![Some(2), None, Some(4)]);
3652 let arrays = [Arc::new(first) as ArrayRef, Arc::new(second) as ArrayRef];
3653
3654 for array in arrays.iter() {
3655 rows.clear();
3656 converter
3657 .append(&mut rows, std::slice::from_ref(array))
3658 .unwrap();
3659 let back = converter.convert_rows(&rows).unwrap();
3660 assert_eq!(&back[0], array);
3661 }
3662
3663 let mut rows_expected = converter.empty_rows(3, 128);
3664 converter.append(&mut rows_expected, &arrays[1..]).unwrap();
3665
3666 for (i, (actual, expected)) in rows.iter().zip(rows_expected.iter()).enumerate() {
3667 assert_eq!(
3668 actual, expected,
3669 "For row {i}: expected {expected:?}, actual: {actual:?}",
3670 );
3671 }
3672 }
3673
3674 #[test]
3675 fn test_append_codec_dictionary_binary() {
3676 use DataType::*;
3677 let converter = RowConverter::new(vec![SortField::new(Dictionary(
3679 Box::new(Int32),
3680 Box::new(Binary),
3681 ))])
3682 .unwrap();
3683 let mut rows = converter.empty_rows(4, 128);
3684
3685 let keys = Int32Array::from_iter_values([0, 1, 2, 3]);
3686 let values = BinaryArray::from(vec![
3687 Some("a".as_bytes()),
3688 Some(b"b"),
3689 Some(b"c"),
3690 Some(b"d"),
3691 ]);
3692 let dict_array = DictionaryArray::new(keys, Arc::new(values));
3693
3694 rows.clear();
3695 let array = Arc::new(dict_array) as ArrayRef;
3696 converter
3697 .append(&mut rows, std::slice::from_ref(&array))
3698 .unwrap();
3699 let back = converter.convert_rows(&rows).unwrap();
3700
3701 dictionary_eq(&back[0], &array);
3702 }
3703
3704 #[test]
3705 fn test_list_prefix() {
3706 let mut a = ListBuilder::new(Int8Builder::new());
3707 a.append_value([None]);
3708 a.append_value([None, None]);
3709 let a = a.finish();
3710
3711 let converter = RowConverter::new(vec![SortField::new(a.data_type().clone())]).unwrap();
3712 let rows = converter.convert_columns(&[Arc::new(a) as _]).unwrap();
3713 assert_eq!(rows.row(0).cmp(&rows.row(1)), Ordering::Less);
3714 }
3715
3716 #[test]
3717 fn map_should_be_marked_as_unsupported() {
3718 let map_data_type = Field::new_map(
3719 "map",
3720 "entries",
3721 Field::new("key", DataType::Utf8, false),
3722 Field::new("value", DataType::Utf8, true),
3723 false,
3724 true,
3725 )
3726 .data_type()
3727 .clone();
3728
3729 let is_supported = RowConverter::supports_fields(&[SortField::new(map_data_type)]);
3730
3731 assert!(!is_supported, "Map should not be supported");
3732 }
3733
3734 #[test]
3735 fn should_fail_to_create_row_converter_for_unsupported_map_type() {
3736 let map_data_type = Field::new_map(
3737 "map",
3738 "entries",
3739 Field::new("key", DataType::Utf8, false),
3740 Field::new("value", DataType::Utf8, true),
3741 false,
3742 true,
3743 )
3744 .data_type()
3745 .clone();
3746
3747 let converter = RowConverter::new(vec![SortField::new(map_data_type)]);
3748
3749 match converter {
3750 Err(ArrowError::NotYetImplemented(message)) => {
3751 assert!(
3752 message.contains("Row format support not yet implemented for"),
3753 "Expected NotYetImplemented error for map data type, got: {message}",
3754 );
3755 }
3756 Err(e) => panic!("Expected NotYetImplemented error, got: {e}"),
3757 Ok(_) => panic!("Expected NotYetImplemented error for map data type"),
3758 }
3759 }
3760
3761 #[test]
3762 fn test_values_buffer_smaller_when_utf8_validation_disabled() {
3763 fn get_values_buffer_len(col: ArrayRef) -> (usize, usize) {
3764 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8View)]).unwrap();
3766
3767 let rows = converter.convert_columns(&[col]).unwrap();
3769 let converted = converter.convert_rows(&rows).unwrap();
3770 let unchecked_values_len = converted[0].as_string_view().data_buffers()[0].len();
3771
3772 let rows = rows.try_into_binary().expect("reasonable size");
3774 let parser = converter.parser();
3775 let converted = converter
3776 .convert_rows(rows.iter().map(|b| parser.parse(b.expect("valid bytes"))))
3777 .unwrap();
3778 let checked_values_len = converted[0].as_string_view().data_buffers()[0].len();
3779 (unchecked_values_len, checked_values_len)
3780 }
3781
3782 let col = Arc::new(StringViewArray::from_iter([
3784 Some("hello"), None, Some("short"), Some("tiny"), ])) as ArrayRef;
3789
3790 let (unchecked_values_len, checked_values_len) = get_values_buffer_len(col);
3791 assert_eq!(unchecked_values_len, 0);
3793 assert_eq!(checked_values_len, 14);
3795
3796 let col = Arc::new(StringViewArray::from_iter([
3798 Some("this is a very long string over 12 bytes"),
3799 Some("another long string to test the buffer"),
3800 ])) as ArrayRef;
3801
3802 let (unchecked_values_len, checked_values_len) = get_values_buffer_len(col);
3803 assert!(unchecked_values_len > 0);
3805 assert_eq!(unchecked_values_len, checked_values_len);
3806
3807 let col = Arc::new(StringViewArray::from_iter([
3809 Some("tiny"), Some("thisisexact13"), None,
3812 Some("short"), ])) as ArrayRef;
3814
3815 let (unchecked_values_len, checked_values_len) = get_values_buffer_len(col);
3816 assert_eq!(unchecked_values_len, 13);
3818 assert!(checked_values_len > unchecked_values_len);
3819 }
3820
3821 #[test]
3822 fn test_sparse_union() {
3823 let int_array = Int32Array::from(vec![Some(1), None, Some(3), None, Some(5)]);
3825 let str_array = StringArray::from(vec![None, Some("b"), None, Some("d"), None]);
3826
3827 let type_ids = vec![0, 1, 0, 1, 0].into();
3829
3830 let union_fields = [
3831 (0, Arc::new(Field::new("int", DataType::Int32, false))),
3832 (1, Arc::new(Field::new("str", DataType::Utf8, false))),
3833 ]
3834 .into_iter()
3835 .collect();
3836
3837 let union_array = UnionArray::try_new(
3838 union_fields,
3839 type_ids,
3840 None,
3841 vec![Arc::new(int_array) as ArrayRef, Arc::new(str_array)],
3842 )
3843 .unwrap();
3844
3845 let union_type = union_array.data_type().clone();
3846 let converter = RowConverter::new(vec![SortField::new(union_type)]).unwrap();
3847
3848 let rows = converter
3849 .convert_columns(&[Arc::new(union_array.clone())])
3850 .unwrap();
3851
3852 let back = converter.convert_rows(&rows).unwrap();
3854 let back_union = back[0].as_any().downcast_ref::<UnionArray>().unwrap();
3855
3856 assert_eq!(union_array.len(), back_union.len());
3857 for i in 0..union_array.len() {
3858 assert_eq!(union_array.type_id(i), back_union.type_id(i));
3859 }
3860 }
3861
3862 #[test]
3863 fn test_sparse_union_with_nulls() {
3864 let int_array = Int32Array::from(vec![Some(1), None, Some(3), None, Some(5)]);
3866 let str_array = StringArray::from(vec![None::<&str>; 5]);
3867
3868 let type_ids = vec![0, 1, 0, 1, 0].into();
3870
3871 let union_fields = [
3872 (0, Arc::new(Field::new("int", DataType::Int32, true))),
3873 (1, Arc::new(Field::new("str", DataType::Utf8, true))),
3874 ]
3875 .into_iter()
3876 .collect();
3877
3878 let union_array = UnionArray::try_new(
3879 union_fields,
3880 type_ids,
3881 None,
3882 vec![Arc::new(int_array) as ArrayRef, Arc::new(str_array)],
3883 )
3884 .unwrap();
3885
3886 let union_type = union_array.data_type().clone();
3887 let converter = RowConverter::new(vec![SortField::new(union_type)]).unwrap();
3888
3889 let rows = converter
3890 .convert_columns(&[Arc::new(union_array.clone())])
3891 .unwrap();
3892
3893 let back = converter.convert_rows(&rows).unwrap();
3895 let back_union = back[0].as_any().downcast_ref::<UnionArray>().unwrap();
3896
3897 assert_eq!(union_array.len(), back_union.len());
3898 for i in 0..union_array.len() {
3899 let expected_null = union_array.is_null(i);
3900 let actual_null = back_union.is_null(i);
3901 assert_eq!(expected_null, actual_null, "Null mismatch at index {i}");
3902 if !expected_null {
3903 assert_eq!(union_array.type_id(i), back_union.type_id(i));
3904 }
3905 }
3906 }
3907
3908 #[test]
3909 fn test_dense_union() {
3910 let int_array = Int32Array::from(vec![1, 3, 5]);
3912 let str_array = StringArray::from(vec!["a", "b"]);
3913
3914 let type_ids = vec![0, 1, 0, 1, 0].into();
3915
3916 let offsets = vec![0, 0, 1, 1, 2].into();
3918
3919 let union_fields = [
3920 (0, Arc::new(Field::new("int", DataType::Int32, false))),
3921 (1, Arc::new(Field::new("str", DataType::Utf8, false))),
3922 ]
3923 .into_iter()
3924 .collect();
3925
3926 let union_array = UnionArray::try_new(
3927 union_fields,
3928 type_ids,
3929 Some(offsets), vec![Arc::new(int_array) as ArrayRef, Arc::new(str_array)],
3931 )
3932 .unwrap();
3933
3934 let union_type = union_array.data_type().clone();
3935 let converter = RowConverter::new(vec![SortField::new(union_type)]).unwrap();
3936
3937 let rows = converter
3938 .convert_columns(&[Arc::new(union_array.clone())])
3939 .unwrap();
3940
3941 let back = converter.convert_rows(&rows).unwrap();
3943 let back_union = back[0].as_any().downcast_ref::<UnionArray>().unwrap();
3944
3945 assert_eq!(union_array.len(), back_union.len());
3946 for i in 0..union_array.len() {
3947 assert_eq!(union_array.type_id(i), back_union.type_id(i));
3948 }
3949 }
3950
3951 #[test]
3952 fn test_dense_union_with_nulls() {
3953 let int_array = Int32Array::from(vec![Some(1), None, Some(5)]);
3955 let str_array = StringArray::from(vec![Some("a"), None]);
3956
3957 let type_ids = vec![0, 1, 0, 1, 0].into();
3959 let offsets = vec![0, 0, 1, 1, 2].into();
3960
3961 let union_fields = [
3962 (0, Arc::new(Field::new("int", DataType::Int32, true))),
3963 (1, Arc::new(Field::new("str", DataType::Utf8, true))),
3964 ]
3965 .into_iter()
3966 .collect();
3967
3968 let union_array = UnionArray::try_new(
3969 union_fields,
3970 type_ids,
3971 Some(offsets),
3972 vec![Arc::new(int_array) as ArrayRef, Arc::new(str_array)],
3973 )
3974 .unwrap();
3975
3976 let union_type = union_array.data_type().clone();
3977 let converter = RowConverter::new(vec![SortField::new(union_type)]).unwrap();
3978
3979 let rows = converter
3980 .convert_columns(&[Arc::new(union_array.clone())])
3981 .unwrap();
3982
3983 let back = converter.convert_rows(&rows).unwrap();
3985 let back_union = back[0].as_any().downcast_ref::<UnionArray>().unwrap();
3986
3987 assert_eq!(union_array.len(), back_union.len());
3988 for i in 0..union_array.len() {
3989 let expected_null = union_array.is_null(i);
3990 let actual_null = back_union.is_null(i);
3991 assert_eq!(expected_null, actual_null, "Null mismatch at index {i}");
3992 if !expected_null {
3993 assert_eq!(union_array.type_id(i), back_union.type_id(i));
3994 }
3995 }
3996 }
3997
3998 #[test]
3999 fn test_union_ordering() {
4000 let int_array = Int32Array::from(vec![100, 5, 20]);
4001 let str_array = StringArray::from(vec!["z", "a"]);
4002
4003 let type_ids = vec![0, 1, 0, 1, 0].into();
4005 let offsets = vec![0, 0, 1, 1, 2].into();
4006
4007 let union_fields = [
4008 (0, Arc::new(Field::new("int", DataType::Int32, false))),
4009 (1, Arc::new(Field::new("str", DataType::Utf8, false))),
4010 ]
4011 .into_iter()
4012 .collect();
4013
4014 let union_array = UnionArray::try_new(
4015 union_fields,
4016 type_ids,
4017 Some(offsets),
4018 vec![Arc::new(int_array) as ArrayRef, Arc::new(str_array)],
4019 )
4020 .unwrap();
4021
4022 let union_type = union_array.data_type().clone();
4023 let converter = RowConverter::new(vec![SortField::new(union_type)]).unwrap();
4024
4025 let rows = converter.convert_columns(&[Arc::new(union_array)]).unwrap();
4026
4027 assert!(rows.row(2) < rows.row(1));
4039
4040 assert!(rows.row(0) < rows.row(3));
4042
4043 assert!(rows.row(2) < rows.row(4));
4046 assert!(rows.row(4) < rows.row(0));
4048
4049 assert!(rows.row(3) < rows.row(1));
4052 }
4053}