1#![doc(
157 html_logo_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_white-bg.svg",
158 html_favicon_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_transparent-bg.svg"
159)]
160#![cfg_attr(docsrs, feature(doc_cfg))]
161#![warn(missing_docs)]
162use std::cmp::Ordering;
163use std::hash::{Hash, Hasher};
164use std::sync::Arc;
165
166use arrow_array::cast::*;
167use arrow_array::types::{ArrowDictionaryKeyType, ByteViewType};
168use arrow_array::*;
169use arrow_buffer::{ArrowNativeType, Buffer, OffsetBuffer, ScalarBuffer};
170use arrow_data::{ArrayData, ArrayDataBuilder};
171use arrow_schema::*;
172use variable::{decode_binary_view, decode_string_view};
173
174use crate::fixed::{decode_bool, decode_fixed_size_binary, decode_primitive};
175use crate::list::{compute_lengths_fixed_size_list, encode_fixed_size_list};
176use crate::variable::{decode_binary, decode_string};
177use arrow_array::types::{Int16Type, Int32Type, Int64Type};
178
179mod fixed;
180mod list;
181mod run;
182mod variable;
183
184#[derive(Debug)]
482pub struct RowConverter {
483 fields: Arc<[SortField]>,
484 codecs: Vec<Codec>,
486}
487
488#[derive(Debug)]
489enum Codec {
490 Stateless,
492 Dictionary(RowConverter, OwnedRow),
495 Struct(RowConverter, OwnedRow),
498 List(RowConverter),
500 RunEndEncoded(RowConverter),
502 Union(Vec<RowConverter>, Vec<OwnedRow>),
505}
506
507impl Codec {
508 fn new(sort_field: &SortField) -> Result<Self, ArrowError> {
509 match &sort_field.data_type {
510 DataType::Dictionary(_, values) => {
511 let sort_field =
512 SortField::new_with_options(values.as_ref().clone(), sort_field.options);
513
514 let converter = RowConverter::new(vec![sort_field])?;
515 let null_array = new_null_array(values.as_ref(), 1);
516 let nulls = converter.convert_columns(&[null_array])?;
517
518 let owned = OwnedRow {
519 data: nulls.buffer.into(),
520 config: nulls.config,
521 };
522 Ok(Self::Dictionary(converter, owned))
523 }
524 DataType::RunEndEncoded(_, values) => {
525 let options = SortOptions {
527 descending: false,
528 nulls_first: sort_field.options.nulls_first != sort_field.options.descending,
529 };
530
531 let field = SortField::new_with_options(values.data_type().clone(), options);
532 let converter = RowConverter::new(vec![field])?;
533 Ok(Self::RunEndEncoded(converter))
534 }
535 d if !d.is_nested() => Ok(Self::Stateless),
536 DataType::List(f) | DataType::LargeList(f) => {
537 let options = SortOptions {
541 descending: false,
542 nulls_first: sort_field.options.nulls_first != sort_field.options.descending,
543 };
544
545 let field = SortField::new_with_options(f.data_type().clone(), options);
546 let converter = RowConverter::new(vec![field])?;
547 Ok(Self::List(converter))
548 }
549 DataType::FixedSizeList(f, _) => {
550 let field = SortField::new_with_options(f.data_type().clone(), sort_field.options);
551 let converter = RowConverter::new(vec![field])?;
552 Ok(Self::List(converter))
553 }
554 DataType::Struct(f) => {
555 let sort_fields = f
556 .iter()
557 .map(|x| SortField::new_with_options(x.data_type().clone(), sort_field.options))
558 .collect();
559
560 let converter = RowConverter::new(sort_fields)?;
561 let nulls: Vec<_> = f.iter().map(|x| new_null_array(x.data_type(), 1)).collect();
562
563 let nulls = converter.convert_columns(&nulls)?;
564 let owned = OwnedRow {
565 data: nulls.buffer.into(),
566 config: nulls.config,
567 };
568
569 Ok(Self::Struct(converter, owned))
570 }
571 DataType::Union(fields, _mode) => {
572 let options = SortOptions {
575 descending: false,
576 nulls_first: sort_field.options.nulls_first != sort_field.options.descending,
577 };
578
579 let mut converters = Vec::with_capacity(fields.len());
580 let mut null_rows = Vec::with_capacity(fields.len());
581
582 for (_type_id, field) in fields.iter() {
583 let sort_field =
584 SortField::new_with_options(field.data_type().clone(), options);
585 let converter = RowConverter::new(vec![sort_field])?;
586
587 let null_array = new_null_array(field.data_type(), 1);
588 let nulls = converter.convert_columns(&[null_array])?;
589 let owned = OwnedRow {
590 data: nulls.buffer.into(),
591 config: nulls.config,
592 };
593
594 converters.push(converter);
595 null_rows.push(owned);
596 }
597
598 Ok(Self::Union(converters, null_rows))
599 }
600 _ => Err(ArrowError::NotYetImplemented(format!(
601 "not yet implemented: {:?}",
602 sort_field.data_type
603 ))),
604 }
605 }
606
607 fn encoder(&self, array: &dyn Array) -> Result<Encoder<'_>, ArrowError> {
608 match self {
609 Codec::Stateless => Ok(Encoder::Stateless),
610 Codec::Dictionary(converter, nulls) => {
611 let values = array.as_any_dictionary().values().clone();
612 let rows = converter.convert_columns(&[values])?;
613 Ok(Encoder::Dictionary(rows, nulls.row()))
614 }
615 Codec::Struct(converter, null) => {
616 let v = as_struct_array(array);
617 let rows = converter.convert_columns(v.columns())?;
618 Ok(Encoder::Struct(rows, null.row()))
619 }
620 Codec::List(converter) => {
621 let values = match array.data_type() {
622 DataType::List(_) => {
623 let list_array = as_list_array(array);
624 let first_offset = list_array.offsets()[0] as usize;
625 let last_offset =
626 list_array.offsets()[list_array.offsets().len() - 1] as usize;
627
628 list_array
631 .values()
632 .slice(first_offset, last_offset - first_offset)
633 }
634 DataType::LargeList(_) => {
635 let list_array = as_large_list_array(array);
636
637 let first_offset = list_array.offsets()[0] as usize;
638 let last_offset =
639 list_array.offsets()[list_array.offsets().len() - 1] as usize;
640
641 list_array
644 .values()
645 .slice(first_offset, last_offset - first_offset)
646 }
647 DataType::FixedSizeList(_, _) => {
648 as_fixed_size_list_array(array).values().clone()
649 }
650 _ => unreachable!(),
651 };
652 let rows = converter.convert_columns(&[values])?;
653 Ok(Encoder::List(rows))
654 }
655 Codec::RunEndEncoded(converter) => {
656 let values = match array.data_type() {
657 DataType::RunEndEncoded(r, _) => match r.data_type() {
658 DataType::Int16 => array.as_run::<Int16Type>().values(),
659 DataType::Int32 => array.as_run::<Int32Type>().values(),
660 DataType::Int64 => array.as_run::<Int64Type>().values(),
661 _ => unreachable!("Unsupported run end index type: {r:?}"),
662 },
663 _ => unreachable!(),
664 };
665 let rows = converter.convert_columns(std::slice::from_ref(values))?;
666 Ok(Encoder::RunEndEncoded(rows))
667 }
668 Codec::Union(converters, _) => {
669 let union_array = array
670 .as_any()
671 .downcast_ref::<UnionArray>()
672 .expect("expected Union array");
673
674 let type_ids = union_array.type_ids().clone();
675 let offsets = union_array.offsets().cloned();
676
677 let mut child_rows = Vec::with_capacity(converters.len());
678 for (type_id, converter) in converters.iter().enumerate() {
679 let child_array = union_array.child(type_id as i8);
680 let rows = converter.convert_columns(std::slice::from_ref(child_array))?;
681 child_rows.push(rows);
682 }
683
684 Ok(Encoder::Union {
685 child_rows,
686 type_ids,
687 offsets,
688 })
689 }
690 }
691 }
692
693 fn size(&self) -> usize {
694 match self {
695 Codec::Stateless => 0,
696 Codec::Dictionary(converter, nulls) => converter.size() + nulls.data.len(),
697 Codec::Struct(converter, nulls) => converter.size() + nulls.data.len(),
698 Codec::List(converter) => converter.size(),
699 Codec::RunEndEncoded(converter) => converter.size(),
700 Codec::Union(converters, null_rows) => {
701 converters.iter().map(|c| c.size()).sum::<usize>()
702 + null_rows.iter().map(|n| n.data.len()).sum::<usize>()
703 }
704 }
705 }
706}
707
708#[derive(Debug)]
709enum Encoder<'a> {
710 Stateless,
712 Dictionary(Rows, Row<'a>),
714 Struct(Rows, Row<'a>),
720 List(Rows),
722 RunEndEncoded(Rows),
724 Union {
726 child_rows: Vec<Rows>,
727 type_ids: ScalarBuffer<i8>,
728 offsets: Option<ScalarBuffer<i32>>,
729 },
730}
731
732#[derive(Debug, Clone, PartialEq, Eq)]
734pub struct SortField {
735 options: SortOptions,
737 data_type: DataType,
739}
740
741impl SortField {
742 pub fn new(data_type: DataType) -> Self {
744 Self::new_with_options(data_type, Default::default())
745 }
746
747 pub fn new_with_options(data_type: DataType, options: SortOptions) -> Self {
749 Self { options, data_type }
750 }
751
752 pub fn size(&self) -> usize {
756 self.data_type.size() + std::mem::size_of::<Self>() - std::mem::size_of::<DataType>()
757 }
758}
759
760impl RowConverter {
761 pub fn new(fields: Vec<SortField>) -> Result<Self, ArrowError> {
763 if !Self::supports_fields(&fields) {
764 return Err(ArrowError::NotYetImplemented(format!(
765 "Row format support not yet implemented for: {fields:?}"
766 )));
767 }
768
769 let codecs = fields.iter().map(Codec::new).collect::<Result<_, _>>()?;
770 Ok(Self {
771 fields: fields.into(),
772 codecs,
773 })
774 }
775
776 pub fn supports_fields(fields: &[SortField]) -> bool {
778 fields.iter().all(|x| Self::supports_datatype(&x.data_type))
779 }
780
781 fn supports_datatype(d: &DataType) -> bool {
782 match d {
783 _ if !d.is_nested() => true,
784 DataType::List(f) | DataType::LargeList(f) | DataType::FixedSizeList(f, _) => {
785 Self::supports_datatype(f.data_type())
786 }
787 DataType::Struct(f) => f.iter().all(|x| Self::supports_datatype(x.data_type())),
788 DataType::RunEndEncoded(_, values) => Self::supports_datatype(values.data_type()),
789 DataType::Union(fs, _mode) => fs
790 .iter()
791 .all(|(_, f)| Self::supports_datatype(f.data_type())),
792 _ => false,
793 }
794 }
795
796 pub fn convert_columns(&self, columns: &[ArrayRef]) -> Result<Rows, ArrowError> {
806 let num_rows = columns.first().map(|x| x.len()).unwrap_or(0);
807 let mut rows = self.empty_rows(num_rows, 0);
808 self.append(&mut rows, columns)?;
809 Ok(rows)
810 }
811
812 pub fn append(&self, rows: &mut Rows, columns: &[ArrayRef]) -> Result<(), ArrowError> {
843 assert!(
844 Arc::ptr_eq(&rows.config.fields, &self.fields),
845 "rows were not produced by this RowConverter"
846 );
847
848 if columns.len() != self.fields.len() {
849 return Err(ArrowError::InvalidArgumentError(format!(
850 "Incorrect number of arrays provided to RowConverter, expected {} got {}",
851 self.fields.len(),
852 columns.len()
853 )));
854 }
855 for colum in columns.iter().skip(1) {
856 if colum.len() != columns[0].len() {
857 return Err(ArrowError::InvalidArgumentError(format!(
858 "RowConverter columns must all have the same length, expected {} got {}",
859 columns[0].len(),
860 colum.len()
861 )));
862 }
863 }
864
865 let encoders = columns
866 .iter()
867 .zip(&self.codecs)
868 .zip(self.fields.iter())
869 .map(|((column, codec), field)| {
870 if !column.data_type().equals_datatype(&field.data_type) {
871 return Err(ArrowError::InvalidArgumentError(format!(
872 "RowConverter column schema mismatch, expected {} got {}",
873 field.data_type,
874 column.data_type()
875 )));
876 }
877 codec.encoder(column.as_ref())
878 })
879 .collect::<Result<Vec<_>, _>>()?;
880
881 let write_offset = rows.num_rows();
882 let lengths = row_lengths(columns, &encoders);
883 let total = lengths.extend_offsets(rows.offsets[write_offset], &mut rows.offsets);
884 rows.buffer.resize(total, 0);
885
886 for ((column, field), encoder) in columns.iter().zip(self.fields.iter()).zip(encoders) {
887 encode_column(
889 &mut rows.buffer,
890 &mut rows.offsets[write_offset..],
891 column.as_ref(),
892 field.options,
893 &encoder,
894 )
895 }
896
897 if cfg!(debug_assertions) {
898 assert_eq!(*rows.offsets.last().unwrap(), rows.buffer.len());
899 rows.offsets
900 .windows(2)
901 .for_each(|w| assert!(w[0] <= w[1], "offsets should be monotonic"));
902 }
903
904 Ok(())
905 }
906
907 pub fn convert_rows<'a, I>(&self, rows: I) -> Result<Vec<ArrayRef>, ArrowError>
915 where
916 I: IntoIterator<Item = Row<'a>>,
917 {
918 let mut validate_utf8 = false;
919 let mut rows: Vec<_> = rows
920 .into_iter()
921 .map(|row| {
922 assert!(
923 Arc::ptr_eq(&row.config.fields, &self.fields),
924 "rows were not produced by this RowConverter"
925 );
926 validate_utf8 |= row.config.validate_utf8;
927 row.data
928 })
929 .collect();
930
931 let result = unsafe { self.convert_raw(&mut rows, validate_utf8) }?;
935
936 if cfg!(debug_assertions) {
937 for (i, row) in rows.iter().enumerate() {
938 if !row.is_empty() {
939 return Err(ArrowError::InvalidArgumentError(format!(
940 "Codecs {codecs:?} did not consume all bytes for row {i}, remaining bytes: {row:?}",
941 codecs = &self.codecs
942 )));
943 }
944 }
945 }
946
947 Ok(result)
948 }
949
950 pub fn empty_rows(&self, row_capacity: usize, data_capacity: usize) -> Rows {
979 let mut offsets = Vec::with_capacity(row_capacity.saturating_add(1));
980 offsets.push(0);
981
982 Rows {
983 offsets,
984 buffer: Vec::with_capacity(data_capacity),
985 config: RowConfig {
986 fields: self.fields.clone(),
987 validate_utf8: false,
988 },
989 }
990 }
991
992 pub fn from_binary(&self, array: BinaryArray) -> Rows {
1019 assert_eq!(
1020 array.null_count(),
1021 0,
1022 "can't construct Rows instance from array with nulls"
1023 );
1024 let (offsets, values, _) = array.into_parts();
1025 let offsets = offsets.iter().map(|&i| i.as_usize()).collect();
1026 let buffer = values.into_vec().unwrap_or_else(|values| values.to_vec());
1028 Rows {
1029 buffer,
1030 offsets,
1031 config: RowConfig {
1032 fields: Arc::clone(&self.fields),
1033 validate_utf8: true,
1034 },
1035 }
1036 }
1037
1038 unsafe fn convert_raw(
1044 &self,
1045 rows: &mut [&[u8]],
1046 validate_utf8: bool,
1047 ) -> Result<Vec<ArrayRef>, ArrowError> {
1048 self.fields
1049 .iter()
1050 .zip(&self.codecs)
1051 .map(|(field, codec)| unsafe { decode_column(field, rows, codec, validate_utf8) })
1052 .collect()
1053 }
1054
1055 pub fn parser(&self) -> RowParser {
1057 RowParser::new(Arc::clone(&self.fields))
1058 }
1059
1060 pub fn size(&self) -> usize {
1064 std::mem::size_of::<Self>()
1065 + self.fields.iter().map(|x| x.size()).sum::<usize>()
1066 + self.codecs.capacity() * std::mem::size_of::<Codec>()
1067 + self.codecs.iter().map(Codec::size).sum::<usize>()
1068 }
1069}
1070
1071#[derive(Debug)]
1073pub struct RowParser {
1074 config: RowConfig,
1075}
1076
1077impl RowParser {
1078 fn new(fields: Arc<[SortField]>) -> Self {
1079 Self {
1080 config: RowConfig {
1081 fields,
1082 validate_utf8: true,
1083 },
1084 }
1085 }
1086
1087 pub fn parse<'a>(&'a self, bytes: &'a [u8]) -> Row<'a> {
1092 Row {
1093 data: bytes,
1094 config: &self.config,
1095 }
1096 }
1097}
1098
1099#[derive(Debug, Clone)]
1101struct RowConfig {
1102 fields: Arc<[SortField]>,
1104 validate_utf8: bool,
1106}
1107
1108#[derive(Debug)]
1112pub struct Rows {
1113 buffer: Vec<u8>,
1115 offsets: Vec<usize>,
1117 config: RowConfig,
1119}
1120
1121impl Rows {
1122 pub fn push(&mut self, row: Row<'_>) {
1124 assert!(
1125 Arc::ptr_eq(&row.config.fields, &self.config.fields),
1126 "row was not produced by this RowConverter"
1127 );
1128 self.config.validate_utf8 |= row.config.validate_utf8;
1129 self.buffer.extend_from_slice(row.data);
1130 self.offsets.push(self.buffer.len())
1131 }
1132
1133 pub fn reserve(&mut self, row_capacity: usize, data_capacity: usize) {
1135 self.buffer.reserve(data_capacity);
1136 self.offsets.reserve(row_capacity);
1137 }
1138
1139 pub fn row(&self, row: usize) -> Row<'_> {
1141 assert!(row + 1 < self.offsets.len());
1142 unsafe { self.row_unchecked(row) }
1143 }
1144
1145 pub unsafe fn row_unchecked(&self, index: usize) -> Row<'_> {
1150 let end = unsafe { self.offsets.get_unchecked(index + 1) };
1151 let start = unsafe { self.offsets.get_unchecked(index) };
1152 let data = unsafe { self.buffer.get_unchecked(*start..*end) };
1153 Row {
1154 data,
1155 config: &self.config,
1156 }
1157 }
1158
1159 pub fn clear(&mut self) {
1161 self.offsets.truncate(1);
1162 self.buffer.clear();
1163 }
1164
1165 pub fn num_rows(&self) -> usize {
1167 self.offsets.len() - 1
1168 }
1169
1170 pub fn iter(&self) -> RowsIter<'_> {
1172 self.into_iter()
1173 }
1174
1175 pub fn size(&self) -> usize {
1179 std::mem::size_of::<Self>()
1181 + self.buffer.capacity()
1182 + self.offsets.capacity() * std::mem::size_of::<usize>()
1183 }
1184
1185 pub fn try_into_binary(self) -> Result<BinaryArray, ArrowError> {
1215 if self.buffer.len() > i32::MAX as usize {
1216 return Err(ArrowError::InvalidArgumentError(format!(
1217 "{}-byte rows buffer too long to convert into a i32-indexed BinaryArray",
1218 self.buffer.len()
1219 )));
1220 }
1221 let offsets_scalar = ScalarBuffer::from_iter(self.offsets.into_iter().map(i32::usize_as));
1223 let array = unsafe {
1225 BinaryArray::new_unchecked(
1226 OffsetBuffer::new_unchecked(offsets_scalar),
1227 Buffer::from_vec(self.buffer),
1228 None,
1229 )
1230 };
1231 Ok(array)
1232 }
1233}
1234
1235impl<'a> IntoIterator for &'a Rows {
1236 type Item = Row<'a>;
1237 type IntoIter = RowsIter<'a>;
1238
1239 fn into_iter(self) -> Self::IntoIter {
1240 RowsIter {
1241 rows: self,
1242 start: 0,
1243 end: self.num_rows(),
1244 }
1245 }
1246}
1247
1248#[derive(Debug)]
1250pub struct RowsIter<'a> {
1251 rows: &'a Rows,
1252 start: usize,
1253 end: usize,
1254}
1255
1256impl<'a> Iterator for RowsIter<'a> {
1257 type Item = Row<'a>;
1258
1259 fn next(&mut self) -> Option<Self::Item> {
1260 if self.end == self.start {
1261 return None;
1262 }
1263
1264 let row = unsafe { self.rows.row_unchecked(self.start) };
1266 self.start += 1;
1267 Some(row)
1268 }
1269
1270 fn size_hint(&self) -> (usize, Option<usize>) {
1271 let len = self.len();
1272 (len, Some(len))
1273 }
1274}
1275
1276impl ExactSizeIterator for RowsIter<'_> {
1277 fn len(&self) -> usize {
1278 self.end - self.start
1279 }
1280}
1281
1282impl DoubleEndedIterator for RowsIter<'_> {
1283 fn next_back(&mut self) -> Option<Self::Item> {
1284 if self.end == self.start {
1285 return None;
1286 }
1287 let row = unsafe { self.rows.row_unchecked(self.end) };
1289 self.end -= 1;
1290 Some(row)
1291 }
1292}
1293
1294#[derive(Debug, Copy, Clone)]
1303pub struct Row<'a> {
1304 data: &'a [u8],
1305 config: &'a RowConfig,
1306}
1307
1308impl<'a> Row<'a> {
1309 pub fn owned(&self) -> OwnedRow {
1311 OwnedRow {
1312 data: self.data.into(),
1313 config: self.config.clone(),
1314 }
1315 }
1316
1317 pub fn data(&self) -> &'a [u8] {
1319 self.data
1320 }
1321}
1322
1323impl PartialEq for Row<'_> {
1326 #[inline]
1327 fn eq(&self, other: &Self) -> bool {
1328 self.data.eq(other.data)
1329 }
1330}
1331
1332impl Eq for Row<'_> {}
1333
1334impl PartialOrd for Row<'_> {
1335 #[inline]
1336 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1337 Some(self.cmp(other))
1338 }
1339}
1340
1341impl Ord for Row<'_> {
1342 #[inline]
1343 fn cmp(&self, other: &Self) -> Ordering {
1344 self.data.cmp(other.data)
1345 }
1346}
1347
1348impl Hash for Row<'_> {
1349 #[inline]
1350 fn hash<H: Hasher>(&self, state: &mut H) {
1351 self.data.hash(state)
1352 }
1353}
1354
1355impl AsRef<[u8]> for Row<'_> {
1356 #[inline]
1357 fn as_ref(&self) -> &[u8] {
1358 self.data
1359 }
1360}
1361
1362#[derive(Debug, Clone)]
1366pub struct OwnedRow {
1367 data: Box<[u8]>,
1368 config: RowConfig,
1369}
1370
1371impl OwnedRow {
1372 pub fn row(&self) -> Row<'_> {
1376 Row {
1377 data: &self.data,
1378 config: &self.config,
1379 }
1380 }
1381}
1382
1383impl PartialEq for OwnedRow {
1386 #[inline]
1387 fn eq(&self, other: &Self) -> bool {
1388 self.row().eq(&other.row())
1389 }
1390}
1391
1392impl Eq for OwnedRow {}
1393
1394impl PartialOrd for OwnedRow {
1395 #[inline]
1396 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1397 Some(self.cmp(other))
1398 }
1399}
1400
1401impl Ord for OwnedRow {
1402 #[inline]
1403 fn cmp(&self, other: &Self) -> Ordering {
1404 self.row().cmp(&other.row())
1405 }
1406}
1407
1408impl Hash for OwnedRow {
1409 #[inline]
1410 fn hash<H: Hasher>(&self, state: &mut H) {
1411 self.row().hash(state)
1412 }
1413}
1414
1415impl AsRef<[u8]> for OwnedRow {
1416 #[inline]
1417 fn as_ref(&self) -> &[u8] {
1418 &self.data
1419 }
1420}
1421
1422#[inline]
1424fn null_sentinel(options: SortOptions) -> u8 {
1425 match options.nulls_first {
1426 true => 0,
1427 false => 0xFF,
1428 }
1429}
1430
1431enum LengthTracker {
1433 Fixed { length: usize, num_rows: usize },
1435 Variable {
1437 fixed_length: usize,
1438 lengths: Vec<usize>,
1439 },
1440}
1441
1442impl LengthTracker {
1443 fn new(num_rows: usize) -> Self {
1444 Self::Fixed {
1445 length: 0,
1446 num_rows,
1447 }
1448 }
1449
1450 fn push_fixed(&mut self, new_length: usize) {
1452 match self {
1453 LengthTracker::Fixed { length, .. } => *length += new_length,
1454 LengthTracker::Variable { fixed_length, .. } => *fixed_length += new_length,
1455 }
1456 }
1457
1458 fn push_variable(&mut self, new_lengths: impl ExactSizeIterator<Item = usize>) {
1460 match self {
1461 LengthTracker::Fixed { length, .. } => {
1462 *self = LengthTracker::Variable {
1463 fixed_length: *length,
1464 lengths: new_lengths.collect(),
1465 }
1466 }
1467 LengthTracker::Variable { lengths, .. } => {
1468 assert_eq!(lengths.len(), new_lengths.len());
1469 lengths
1470 .iter_mut()
1471 .zip(new_lengths)
1472 .for_each(|(length, new_length)| *length += new_length);
1473 }
1474 }
1475 }
1476
1477 fn materialized(&mut self) -> &mut [usize] {
1479 if let LengthTracker::Fixed { length, num_rows } = *self {
1480 *self = LengthTracker::Variable {
1481 fixed_length: length,
1482 lengths: vec![0; num_rows],
1483 };
1484 }
1485
1486 match self {
1487 LengthTracker::Variable { lengths, .. } => lengths,
1488 LengthTracker::Fixed { .. } => unreachable!(),
1489 }
1490 }
1491
1492 fn extend_offsets(&self, initial_offset: usize, offsets: &mut Vec<usize>) -> usize {
1510 match self {
1511 LengthTracker::Fixed { length, num_rows } => {
1512 offsets.extend((0..*num_rows).map(|i| initial_offset + i * length));
1513
1514 initial_offset + num_rows * length
1515 }
1516 LengthTracker::Variable {
1517 fixed_length,
1518 lengths,
1519 } => {
1520 let mut acc = initial_offset;
1521
1522 offsets.extend(lengths.iter().map(|length| {
1523 let current = acc;
1524 acc += length + fixed_length;
1525 current
1526 }));
1527
1528 acc
1529 }
1530 }
1531 }
1532}
1533
1534fn row_lengths(cols: &[ArrayRef], encoders: &[Encoder]) -> LengthTracker {
1536 use fixed::FixedLengthEncoding;
1537
1538 let num_rows = cols.first().map(|x| x.len()).unwrap_or(0);
1539 let mut tracker = LengthTracker::new(num_rows);
1540
1541 for (array, encoder) in cols.iter().zip(encoders) {
1542 match encoder {
1543 Encoder::Stateless => {
1544 downcast_primitive_array! {
1545 array => tracker.push_fixed(fixed::encoded_len(array)),
1546 DataType::Null => {},
1547 DataType::Boolean => tracker.push_fixed(bool::ENCODED_LEN),
1548 DataType::Binary => tracker.push_variable(
1549 as_generic_binary_array::<i32>(array)
1550 .iter()
1551 .map(|slice| variable::encoded_len(slice))
1552 ),
1553 DataType::LargeBinary => tracker.push_variable(
1554 as_generic_binary_array::<i64>(array)
1555 .iter()
1556 .map(|slice| variable::encoded_len(slice))
1557 ),
1558 DataType::BinaryView => push_byte_view_array_lengths(&mut tracker, array.as_binary_view()),
1559 DataType::Utf8 => tracker.push_variable(
1560 array.as_string::<i32>()
1561 .iter()
1562 .map(|slice| variable::encoded_len(slice.map(|x| x.as_bytes())))
1563 ),
1564 DataType::LargeUtf8 => tracker.push_variable(
1565 array.as_string::<i64>()
1566 .iter()
1567 .map(|slice| variable::encoded_len(slice.map(|x| x.as_bytes())))
1568 ),
1569 DataType::Utf8View => push_byte_view_array_lengths(&mut tracker, array.as_string_view()),
1570 DataType::FixedSizeBinary(len) => {
1571 let len = len.to_usize().unwrap();
1572 tracker.push_fixed(1 + len)
1573 }
1574 _ => unimplemented!("unsupported data type: {}", array.data_type()),
1575 }
1576 }
1577 Encoder::Dictionary(values, null) => {
1578 downcast_dictionary_array! {
1579 array => {
1580 tracker.push_variable(
1581 array.keys().iter().map(|v| match v {
1582 Some(k) => values.row(k.as_usize()).data.len(),
1583 None => null.data.len(),
1584 })
1585 )
1586 }
1587 _ => unreachable!(),
1588 }
1589 }
1590 Encoder::Struct(rows, null) => {
1591 let array = as_struct_array(array);
1592 tracker.push_variable((0..array.len()).map(|idx| match array.is_valid(idx) {
1593 true => 1 + rows.row(idx).as_ref().len(),
1594 false => 1 + null.data.len(),
1595 }));
1596 }
1597 Encoder::List(rows) => match array.data_type() {
1598 DataType::List(_) => {
1599 list::compute_lengths(tracker.materialized(), rows, as_list_array(array))
1600 }
1601 DataType::LargeList(_) => {
1602 list::compute_lengths(tracker.materialized(), rows, as_large_list_array(array))
1603 }
1604 DataType::FixedSizeList(_, _) => compute_lengths_fixed_size_list(
1605 &mut tracker,
1606 rows,
1607 as_fixed_size_list_array(array),
1608 ),
1609 _ => unreachable!(),
1610 },
1611 Encoder::RunEndEncoded(rows) => match array.data_type() {
1612 DataType::RunEndEncoded(r, _) => match r.data_type() {
1613 DataType::Int16 => run::compute_lengths(
1614 tracker.materialized(),
1615 rows,
1616 array.as_run::<Int16Type>(),
1617 ),
1618 DataType::Int32 => run::compute_lengths(
1619 tracker.materialized(),
1620 rows,
1621 array.as_run::<Int32Type>(),
1622 ),
1623 DataType::Int64 => run::compute_lengths(
1624 tracker.materialized(),
1625 rows,
1626 array.as_run::<Int64Type>(),
1627 ),
1628 _ => unreachable!("Unsupported run end index type: {r:?}"),
1629 },
1630 _ => unreachable!(),
1631 },
1632 Encoder::Union {
1633 child_rows,
1634 type_ids,
1635 offsets,
1636 } => {
1637 let union_array = array
1638 .as_any()
1639 .downcast_ref::<UnionArray>()
1640 .expect("expected UnionArray");
1641
1642 let lengths = (0..union_array.len()).map(|i| {
1643 let type_id = type_ids[i];
1644 let child_row_i = offsets.as_ref().map(|o| o[i] as usize).unwrap_or(i);
1645 let child_row = child_rows[type_id as usize].row(child_row_i);
1646
1647 1 + child_row.as_ref().len()
1649 });
1650
1651 tracker.push_variable(lengths);
1652 }
1653 }
1654 }
1655
1656 tracker
1657}
1658
1659fn push_byte_view_array_lengths<T: ByteViewType>(
1661 tracker: &mut LengthTracker,
1662 array: &GenericByteViewArray<T>,
1663) {
1664 if let Some(nulls) = array.nulls().filter(|n| n.null_count() > 0) {
1665 tracker.push_variable(
1666 array
1667 .lengths()
1668 .zip(nulls.iter())
1669 .map(|(length, is_valid)| {
1670 if is_valid {
1671 Some(length as usize)
1672 } else {
1673 None
1674 }
1675 })
1676 .map(variable::padded_length),
1677 )
1678 } else {
1679 tracker.push_variable(
1680 array
1681 .lengths()
1682 .map(|len| variable::padded_length(Some(len as usize))),
1683 )
1684 }
1685}
1686
1687fn encode_column(
1689 data: &mut [u8],
1690 offsets: &mut [usize],
1691 column: &dyn Array,
1692 opts: SortOptions,
1693 encoder: &Encoder<'_>,
1694) {
1695 match encoder {
1696 Encoder::Stateless => {
1697 downcast_primitive_array! {
1698 column => {
1699 if let Some(nulls) = column.nulls().filter(|n| n.null_count() > 0){
1700 fixed::encode(data, offsets, column.values(), nulls, opts)
1701 } else {
1702 fixed::encode_not_null(data, offsets, column.values(), opts)
1703 }
1704 }
1705 DataType::Null => {}
1706 DataType::Boolean => {
1707 if let Some(nulls) = column.nulls().filter(|n| n.null_count() > 0){
1708 fixed::encode_boolean(data, offsets, column.as_boolean().values(), nulls, opts)
1709 } else {
1710 fixed::encode_boolean_not_null(data, offsets, column.as_boolean().values(), opts)
1711 }
1712 }
1713 DataType::Binary => {
1714 variable::encode_generic_byte_array(data, offsets, as_generic_binary_array::<i32>(column), opts)
1715 }
1716 DataType::BinaryView => {
1717 variable::encode(data, offsets, column.as_binary_view().iter(), opts)
1718 }
1719 DataType::LargeBinary => {
1720 variable::encode_generic_byte_array(data, offsets, as_generic_binary_array::<i64>(column), opts)
1721 }
1722 DataType::Utf8 => variable::encode_generic_byte_array(
1723 data, offsets,
1724 column.as_string::<i32>(),
1725 opts,
1726 ),
1727 DataType::LargeUtf8 => variable::encode_generic_byte_array(
1728 data, offsets,
1729 column.as_string::<i64>(),
1730 opts,
1731 ),
1732 DataType::Utf8View => variable::encode(
1733 data, offsets,
1734 column.as_string_view().iter().map(|x| x.map(|x| x.as_bytes())),
1735 opts,
1736 ),
1737 DataType::FixedSizeBinary(_) => {
1738 let array = column.as_any().downcast_ref().unwrap();
1739 fixed::encode_fixed_size_binary(data, offsets, array, opts)
1740 }
1741 _ => unimplemented!("unsupported data type: {}", column.data_type()),
1742 }
1743 }
1744 Encoder::Dictionary(values, nulls) => {
1745 downcast_dictionary_array! {
1746 column => encode_dictionary_values(data, offsets, column, values, nulls),
1747 _ => unreachable!()
1748 }
1749 }
1750 Encoder::Struct(rows, null) => {
1751 let array = as_struct_array(column);
1752 let null_sentinel = null_sentinel(opts);
1753 offsets
1754 .iter_mut()
1755 .skip(1)
1756 .enumerate()
1757 .for_each(|(idx, offset)| {
1758 let (row, sentinel) = match array.is_valid(idx) {
1759 true => (rows.row(idx), 0x01),
1760 false => (*null, null_sentinel),
1761 };
1762 let end_offset = *offset + 1 + row.as_ref().len();
1763 data[*offset] = sentinel;
1764 data[*offset + 1..end_offset].copy_from_slice(row.as_ref());
1765 *offset = end_offset;
1766 })
1767 }
1768 Encoder::List(rows) => match column.data_type() {
1769 DataType::List(_) => list::encode(data, offsets, rows, opts, as_list_array(column)),
1770 DataType::LargeList(_) => {
1771 list::encode(data, offsets, rows, opts, as_large_list_array(column))
1772 }
1773 DataType::FixedSizeList(_, _) => {
1774 encode_fixed_size_list(data, offsets, rows, opts, as_fixed_size_list_array(column))
1775 }
1776 _ => unreachable!(),
1777 },
1778 Encoder::RunEndEncoded(rows) => match column.data_type() {
1779 DataType::RunEndEncoded(r, _) => match r.data_type() {
1780 DataType::Int16 => {
1781 run::encode(data, offsets, rows, opts, column.as_run::<Int16Type>())
1782 }
1783 DataType::Int32 => {
1784 run::encode(data, offsets, rows, opts, column.as_run::<Int32Type>())
1785 }
1786 DataType::Int64 => {
1787 run::encode(data, offsets, rows, opts, column.as_run::<Int64Type>())
1788 }
1789 _ => unreachable!("Unsupported run end index type: {r:?}"),
1790 },
1791 _ => unreachable!(),
1792 },
1793 Encoder::Union {
1794 child_rows,
1795 type_ids,
1796 offsets: offsets_buf,
1797 } => {
1798 offsets
1799 .iter_mut()
1800 .skip(1)
1801 .enumerate()
1802 .for_each(|(i, offset)| {
1803 let type_id = type_ids[i];
1804
1805 let child_row_idx = offsets_buf.as_ref().map(|o| o[i] as usize).unwrap_or(i);
1806 let child_row = child_rows[type_id as usize].row(child_row_idx);
1807 let child_bytes = child_row.as_ref();
1808
1809 let type_id_byte = if opts.descending {
1810 !(type_id as u8)
1811 } else {
1812 type_id as u8
1813 };
1814 data[*offset] = type_id_byte;
1815
1816 let child_start = *offset + 1;
1817 let child_end = child_start + child_bytes.len();
1818 data[child_start..child_end].copy_from_slice(child_bytes);
1819
1820 *offset = child_end;
1821 });
1822 }
1823 }
1824}
1825
1826pub fn encode_dictionary_values<K: ArrowDictionaryKeyType>(
1828 data: &mut [u8],
1829 offsets: &mut [usize],
1830 column: &DictionaryArray<K>,
1831 values: &Rows,
1832 null: &Row<'_>,
1833) {
1834 for (offset, k) in offsets.iter_mut().skip(1).zip(column.keys()) {
1835 let row = match k {
1836 Some(k) => values.row(k.as_usize()).data,
1837 None => null.data,
1838 };
1839 let end_offset = *offset + row.len();
1840 data[*offset..end_offset].copy_from_slice(row);
1841 *offset = end_offset;
1842 }
1843}
1844
1845macro_rules! decode_primitive_helper {
1846 ($t:ty, $rows:ident, $data_type:ident, $options:ident) => {
1847 Arc::new(decode_primitive::<$t>($rows, $data_type, $options))
1848 };
1849}
1850
1851unsafe fn decode_column(
1857 field: &SortField,
1858 rows: &mut [&[u8]],
1859 codec: &Codec,
1860 validate_utf8: bool,
1861) -> Result<ArrayRef, ArrowError> {
1862 let options = field.options;
1863
1864 let array: ArrayRef = match codec {
1865 Codec::Stateless => {
1866 let data_type = field.data_type.clone();
1867 downcast_primitive! {
1868 data_type => (decode_primitive_helper, rows, data_type, options),
1869 DataType::Null => Arc::new(NullArray::new(rows.len())),
1870 DataType::Boolean => Arc::new(decode_bool(rows, options)),
1871 DataType::Binary => Arc::new(decode_binary::<i32>(rows, options)),
1872 DataType::LargeBinary => Arc::new(decode_binary::<i64>(rows, options)),
1873 DataType::BinaryView => Arc::new(decode_binary_view(rows, options)),
1874 DataType::FixedSizeBinary(size) => Arc::new(decode_fixed_size_binary(rows, size, options)),
1875 DataType::Utf8 => Arc::new(unsafe{ decode_string::<i32>(rows, options, validate_utf8) }),
1876 DataType::LargeUtf8 => Arc::new(unsafe { decode_string::<i64>(rows, options, validate_utf8) }),
1877 DataType::Utf8View => Arc::new(unsafe { decode_string_view(rows, options, validate_utf8) }),
1878 _ => return Err(ArrowError::NotYetImplemented(format!("unsupported data type: {data_type}" )))
1879 }
1880 }
1881 Codec::Dictionary(converter, _) => {
1882 let cols = unsafe { converter.convert_raw(rows, validate_utf8) }?;
1883 cols.into_iter().next().unwrap()
1884 }
1885 Codec::Struct(converter, _) => {
1886 let (null_count, nulls) = fixed::decode_nulls(rows);
1887 rows.iter_mut().for_each(|row| *row = &row[1..]);
1888 let children = unsafe { converter.convert_raw(rows, validate_utf8) }?;
1889
1890 let child_data: Vec<ArrayData> = children.iter().map(|c| c.to_data()).collect();
1891 let corrected_fields: Vec<Field> = match &field.data_type {
1894 DataType::Struct(struct_fields) => struct_fields
1895 .iter()
1896 .zip(child_data.iter())
1897 .map(|(orig_field, child_array)| {
1898 orig_field
1899 .as_ref()
1900 .clone()
1901 .with_data_type(child_array.data_type().clone())
1902 })
1903 .collect(),
1904 _ => unreachable!("Only Struct types should be corrected here"),
1905 };
1906 let corrected_struct_type = DataType::Struct(corrected_fields.into());
1907 let builder = ArrayDataBuilder::new(corrected_struct_type)
1908 .len(rows.len())
1909 .null_count(null_count)
1910 .null_bit_buffer(Some(nulls))
1911 .child_data(child_data);
1912
1913 Arc::new(StructArray::from(unsafe { builder.build_unchecked() }))
1914 }
1915 Codec::List(converter) => match &field.data_type {
1916 DataType::List(_) => {
1917 Arc::new(unsafe { list::decode::<i32>(converter, rows, field, validate_utf8) }?)
1918 }
1919 DataType::LargeList(_) => {
1920 Arc::new(unsafe { list::decode::<i64>(converter, rows, field, validate_utf8) }?)
1921 }
1922 DataType::FixedSizeList(_, value_length) => Arc::new(unsafe {
1923 list::decode_fixed_size_list(
1924 converter,
1925 rows,
1926 field,
1927 validate_utf8,
1928 value_length.as_usize(),
1929 )
1930 }?),
1931 _ => unreachable!(),
1932 },
1933 Codec::RunEndEncoded(converter) => match &field.data_type {
1934 DataType::RunEndEncoded(run_ends, _) => match run_ends.data_type() {
1935 DataType::Int16 => Arc::new(unsafe {
1936 run::decode::<Int16Type>(converter, rows, field, validate_utf8)
1937 }?),
1938 DataType::Int32 => Arc::new(unsafe {
1939 run::decode::<Int32Type>(converter, rows, field, validate_utf8)
1940 }?),
1941 DataType::Int64 => Arc::new(unsafe {
1942 run::decode::<Int64Type>(converter, rows, field, validate_utf8)
1943 }?),
1944 _ => unreachable!(),
1945 },
1946 _ => unreachable!(),
1947 },
1948 Codec::Union(converters, null_rows) => {
1949 let len = rows.len();
1950
1951 let DataType::Union(union_fields, mode) = &field.data_type else {
1952 unreachable!()
1953 };
1954
1955 let mut type_ids = Vec::with_capacity(len);
1956 let mut rows_by_field: Vec<Vec<(usize, &[u8])>> = vec![Vec::new(); converters.len()];
1957
1958 for (idx, row) in rows.iter_mut().enumerate() {
1959 let type_id_byte = {
1960 let id = row[0];
1961 if options.descending { !id } else { id }
1962 };
1963
1964 let type_id = type_id_byte as i8;
1965 type_ids.push(type_id);
1966
1967 let field_idx = type_id as usize;
1968
1969 let child_row = &row[1..];
1970 rows_by_field[field_idx].push((idx, child_row));
1971 }
1972
1973 let mut child_arrays: Vec<ArrayRef> = Vec::with_capacity(converters.len());
1974 let mut offsets = (*mode == UnionMode::Dense).then(|| Vec::with_capacity(len));
1975
1976 for (field_idx, converter) in converters.iter().enumerate() {
1977 let field_rows = &rows_by_field[field_idx];
1978
1979 match &mode {
1980 UnionMode::Dense => {
1981 if field_rows.is_empty() {
1982 let (_, field) = union_fields.iter().nth(field_idx).unwrap();
1983 child_arrays.push(arrow_array::new_empty_array(field.data_type()));
1984 continue;
1985 }
1986
1987 let mut child_data = field_rows
1988 .iter()
1989 .map(|(_, bytes)| *bytes)
1990 .collect::<Vec<_>>();
1991
1992 let child_array =
1993 unsafe { converter.convert_raw(&mut child_data, validate_utf8) }?;
1994
1995 for ((row_idx, original_bytes), remaining_bytes) in
1997 field_rows.iter().zip(child_data)
1998 {
1999 let consumed_length = 1 + original_bytes.len() - remaining_bytes.len();
2000 rows[*row_idx] = &rows[*row_idx][consumed_length..];
2001 }
2002
2003 child_arrays.push(child_array.into_iter().next().unwrap());
2004 }
2005 UnionMode::Sparse => {
2006 let mut sparse_data: Vec<&[u8]> = Vec::with_capacity(len);
2007 let mut field_row_iter = field_rows.iter().peekable();
2008 let null_row_bytes: &[u8] = &null_rows[field_idx].data;
2009
2010 for idx in 0..len {
2011 if let Some((next_idx, bytes)) = field_row_iter.peek() {
2012 if *next_idx == idx {
2013 sparse_data.push(*bytes);
2014
2015 field_row_iter.next();
2016 continue;
2017 }
2018 }
2019 sparse_data.push(null_row_bytes);
2020 }
2021
2022 let child_array =
2023 unsafe { converter.convert_raw(&mut sparse_data, validate_utf8) }?;
2024
2025 for (row_idx, child_row) in field_rows.iter() {
2027 let remaining_len = sparse_data[*row_idx].len();
2028 let consumed_length = 1 + child_row.len() - remaining_len;
2029 rows[*row_idx] = &rows[*row_idx][consumed_length..];
2030 }
2031
2032 child_arrays.push(child_array.into_iter().next().unwrap());
2033 }
2034 }
2035 }
2036
2037 if let Some(ref mut offsets_vec) = offsets {
2039 let mut count = vec![0i32; converters.len()];
2040 for type_id in &type_ids {
2041 let field_idx = *type_id as usize;
2042 offsets_vec.push(count[field_idx]);
2043
2044 count[field_idx] += 1;
2045 }
2046 }
2047
2048 let type_ids_buffer = ScalarBuffer::from(type_ids);
2049 let offsets_buffer = offsets.map(ScalarBuffer::from);
2050
2051 let union_array = UnionArray::try_new(
2052 union_fields.clone(),
2053 type_ids_buffer,
2054 offsets_buffer,
2055 child_arrays,
2056 )?;
2057
2058 Arc::new(union_array)
2061 }
2062 };
2063 Ok(array)
2064}
2065
2066#[cfg(test)]
2067mod tests {
2068 use rand::distr::uniform::SampleUniform;
2069 use rand::distr::{Distribution, StandardUniform};
2070 use rand::{Rng, rng};
2071
2072 use arrow_array::builder::*;
2073 use arrow_array::types::*;
2074 use arrow_array::*;
2075 use arrow_buffer::{Buffer, OffsetBuffer};
2076 use arrow_buffer::{NullBuffer, i256};
2077 use arrow_cast::display::{ArrayFormatter, FormatOptions};
2078 use arrow_ord::sort::{LexicographicalComparator, SortColumn};
2079
2080 use super::*;
2081
2082 #[test]
2083 fn test_fixed_width() {
2084 let cols = [
2085 Arc::new(Int16Array::from_iter([
2086 Some(1),
2087 Some(2),
2088 None,
2089 Some(-5),
2090 Some(2),
2091 Some(2),
2092 Some(0),
2093 ])) as ArrayRef,
2094 Arc::new(Float32Array::from_iter([
2095 Some(1.3),
2096 Some(2.5),
2097 None,
2098 Some(4.),
2099 Some(0.1),
2100 Some(-4.),
2101 Some(-0.),
2102 ])) as ArrayRef,
2103 ];
2104
2105 let converter = RowConverter::new(vec![
2106 SortField::new(DataType::Int16),
2107 SortField::new(DataType::Float32),
2108 ])
2109 .unwrap();
2110 let rows = converter.convert_columns(&cols).unwrap();
2111
2112 assert_eq!(rows.offsets, &[0, 8, 16, 24, 32, 40, 48, 56]);
2113 assert_eq!(
2114 rows.buffer,
2115 &[
2116 1, 128, 1, 1, 191, 166, 102, 102, 1, 128, 2, 1, 192, 32, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 127, 251, 1, 192, 128, 0, 0, 1, 128, 2, 1, 189, 204, 204, 205, 1, 128, 2, 1, 63, 127, 255, 255, 1, 128, 0, 1, 127, 255, 255, 255 ]
2131 );
2132
2133 assert!(rows.row(3) < rows.row(6));
2134 assert!(rows.row(0) < rows.row(1));
2135 assert!(rows.row(3) < rows.row(0));
2136 assert!(rows.row(4) < rows.row(1));
2137 assert!(rows.row(5) < rows.row(4));
2138
2139 let back = converter.convert_rows(&rows).unwrap();
2140 for (expected, actual) in cols.iter().zip(&back) {
2141 assert_eq!(expected, actual);
2142 }
2143 }
2144
2145 #[test]
2146 fn test_decimal32() {
2147 let converter = RowConverter::new(vec![SortField::new(DataType::Decimal32(
2148 DECIMAL32_MAX_PRECISION,
2149 7,
2150 ))])
2151 .unwrap();
2152 let col = Arc::new(
2153 Decimal32Array::from_iter([
2154 None,
2155 Some(i32::MIN),
2156 Some(-13),
2157 Some(46_i32),
2158 Some(5456_i32),
2159 Some(i32::MAX),
2160 ])
2161 .with_precision_and_scale(9, 7)
2162 .unwrap(),
2163 ) as ArrayRef;
2164
2165 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2166 for i in 0..rows.num_rows() - 1 {
2167 assert!(rows.row(i) < rows.row(i + 1));
2168 }
2169
2170 let back = converter.convert_rows(&rows).unwrap();
2171 assert_eq!(back.len(), 1);
2172 assert_eq!(col.as_ref(), back[0].as_ref())
2173 }
2174
2175 #[test]
2176 fn test_decimal64() {
2177 let converter = RowConverter::new(vec![SortField::new(DataType::Decimal64(
2178 DECIMAL64_MAX_PRECISION,
2179 7,
2180 ))])
2181 .unwrap();
2182 let col = Arc::new(
2183 Decimal64Array::from_iter([
2184 None,
2185 Some(i64::MIN),
2186 Some(-13),
2187 Some(46_i64),
2188 Some(5456_i64),
2189 Some(i64::MAX),
2190 ])
2191 .with_precision_and_scale(18, 7)
2192 .unwrap(),
2193 ) as ArrayRef;
2194
2195 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2196 for i in 0..rows.num_rows() - 1 {
2197 assert!(rows.row(i) < rows.row(i + 1));
2198 }
2199
2200 let back = converter.convert_rows(&rows).unwrap();
2201 assert_eq!(back.len(), 1);
2202 assert_eq!(col.as_ref(), back[0].as_ref())
2203 }
2204
2205 #[test]
2206 fn test_decimal128() {
2207 let converter = RowConverter::new(vec![SortField::new(DataType::Decimal128(
2208 DECIMAL128_MAX_PRECISION,
2209 7,
2210 ))])
2211 .unwrap();
2212 let col = Arc::new(
2213 Decimal128Array::from_iter([
2214 None,
2215 Some(i128::MIN),
2216 Some(-13),
2217 Some(46_i128),
2218 Some(5456_i128),
2219 Some(i128::MAX),
2220 ])
2221 .with_precision_and_scale(38, 7)
2222 .unwrap(),
2223 ) as ArrayRef;
2224
2225 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2226 for i in 0..rows.num_rows() - 1 {
2227 assert!(rows.row(i) < rows.row(i + 1));
2228 }
2229
2230 let back = converter.convert_rows(&rows).unwrap();
2231 assert_eq!(back.len(), 1);
2232 assert_eq!(col.as_ref(), back[0].as_ref())
2233 }
2234
2235 #[test]
2236 fn test_decimal256() {
2237 let converter = RowConverter::new(vec![SortField::new(DataType::Decimal256(
2238 DECIMAL256_MAX_PRECISION,
2239 7,
2240 ))])
2241 .unwrap();
2242 let col = Arc::new(
2243 Decimal256Array::from_iter([
2244 None,
2245 Some(i256::MIN),
2246 Some(i256::from_parts(0, -1)),
2247 Some(i256::from_parts(u128::MAX, -1)),
2248 Some(i256::from_parts(u128::MAX, 0)),
2249 Some(i256::from_parts(0, 46_i128)),
2250 Some(i256::from_parts(5, 46_i128)),
2251 Some(i256::MAX),
2252 ])
2253 .with_precision_and_scale(DECIMAL256_MAX_PRECISION, 7)
2254 .unwrap(),
2255 ) as ArrayRef;
2256
2257 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2258 for i in 0..rows.num_rows() - 1 {
2259 assert!(rows.row(i) < rows.row(i + 1));
2260 }
2261
2262 let back = converter.convert_rows(&rows).unwrap();
2263 assert_eq!(back.len(), 1);
2264 assert_eq!(col.as_ref(), back[0].as_ref())
2265 }
2266
2267 #[test]
2268 fn test_bool() {
2269 let converter = RowConverter::new(vec![SortField::new(DataType::Boolean)]).unwrap();
2270
2271 let col = Arc::new(BooleanArray::from_iter([None, Some(false), Some(true)])) as ArrayRef;
2272
2273 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2274 assert!(rows.row(2) > rows.row(1));
2275 assert!(rows.row(2) > rows.row(0));
2276 assert!(rows.row(1) > rows.row(0));
2277
2278 let cols = converter.convert_rows(&rows).unwrap();
2279 assert_eq!(&cols[0], &col);
2280
2281 let converter = RowConverter::new(vec![SortField::new_with_options(
2282 DataType::Boolean,
2283 SortOptions::default().desc().with_nulls_first(false),
2284 )])
2285 .unwrap();
2286
2287 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2288 assert!(rows.row(2) < rows.row(1));
2289 assert!(rows.row(2) < rows.row(0));
2290 assert!(rows.row(1) < rows.row(0));
2291 let cols = converter.convert_rows(&rows).unwrap();
2292 assert_eq!(&cols[0], &col);
2293 }
2294
2295 #[test]
2296 fn test_timezone() {
2297 let a =
2298 TimestampNanosecondArray::from(vec![1, 2, 3, 4, 5]).with_timezone("+01:00".to_string());
2299 let d = a.data_type().clone();
2300
2301 let converter = RowConverter::new(vec![SortField::new(a.data_type().clone())]).unwrap();
2302 let rows = converter.convert_columns(&[Arc::new(a) as _]).unwrap();
2303 let back = converter.convert_rows(&rows).unwrap();
2304 assert_eq!(back.len(), 1);
2305 assert_eq!(back[0].data_type(), &d);
2306
2307 let mut a = PrimitiveDictionaryBuilder::<Int32Type, TimestampNanosecondType>::new();
2309 a.append(34).unwrap();
2310 a.append_null();
2311 a.append(345).unwrap();
2312
2313 let dict = a.finish();
2315 let values = TimestampNanosecondArray::from(dict.values().to_data());
2316 let dict_with_tz = dict.with_values(Arc::new(values.with_timezone("+02:00")));
2317 let v = DataType::Timestamp(TimeUnit::Nanosecond, Some("+02:00".into()));
2318 let d = DataType::Dictionary(Box::new(DataType::Int32), Box::new(v.clone()));
2319
2320 assert_eq!(dict_with_tz.data_type(), &d);
2321 let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
2322 let rows = converter
2323 .convert_columns(&[Arc::new(dict_with_tz) as _])
2324 .unwrap();
2325 let back = converter.convert_rows(&rows).unwrap();
2326 assert_eq!(back.len(), 1);
2327 assert_eq!(back[0].data_type(), &v);
2328 }
2329
2330 #[test]
2331 fn test_null_encoding() {
2332 let col = Arc::new(NullArray::new(10));
2333 let converter = RowConverter::new(vec![SortField::new(DataType::Null)]).unwrap();
2334 let rows = converter.convert_columns(&[col]).unwrap();
2335 assert_eq!(rows.num_rows(), 10);
2336 assert_eq!(rows.row(1).data.len(), 0);
2337 }
2338
2339 #[test]
2340 fn test_variable_width() {
2341 let col = Arc::new(StringArray::from_iter([
2342 Some("hello"),
2343 Some("he"),
2344 None,
2345 Some("foo"),
2346 Some(""),
2347 ])) as ArrayRef;
2348
2349 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2350 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2351
2352 assert!(rows.row(1) < rows.row(0));
2353 assert!(rows.row(2) < rows.row(4));
2354 assert!(rows.row(3) < rows.row(0));
2355 assert!(rows.row(3) < rows.row(1));
2356
2357 let cols = converter.convert_rows(&rows).unwrap();
2358 assert_eq!(&cols[0], &col);
2359
2360 let col = Arc::new(BinaryArray::from_iter([
2361 None,
2362 Some(vec![0_u8; 0]),
2363 Some(vec![0_u8; 6]),
2364 Some(vec![0_u8; variable::MINI_BLOCK_SIZE]),
2365 Some(vec![0_u8; variable::MINI_BLOCK_SIZE + 1]),
2366 Some(vec![0_u8; variable::BLOCK_SIZE]),
2367 Some(vec![0_u8; variable::BLOCK_SIZE + 1]),
2368 Some(vec![1_u8; 6]),
2369 Some(vec![1_u8; variable::MINI_BLOCK_SIZE]),
2370 Some(vec![1_u8; variable::MINI_BLOCK_SIZE + 1]),
2371 Some(vec![1_u8; variable::BLOCK_SIZE]),
2372 Some(vec![1_u8; variable::BLOCK_SIZE + 1]),
2373 Some(vec![0xFF_u8; 6]),
2374 Some(vec![0xFF_u8; variable::MINI_BLOCK_SIZE]),
2375 Some(vec![0xFF_u8; variable::MINI_BLOCK_SIZE + 1]),
2376 Some(vec![0xFF_u8; variable::BLOCK_SIZE]),
2377 Some(vec![0xFF_u8; variable::BLOCK_SIZE + 1]),
2378 ])) as ArrayRef;
2379
2380 let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
2381 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2382
2383 for i in 0..rows.num_rows() {
2384 for j in i + 1..rows.num_rows() {
2385 assert!(
2386 rows.row(i) < rows.row(j),
2387 "{} < {} - {:?} < {:?}",
2388 i,
2389 j,
2390 rows.row(i),
2391 rows.row(j)
2392 );
2393 }
2394 }
2395
2396 let cols = converter.convert_rows(&rows).unwrap();
2397 assert_eq!(&cols[0], &col);
2398
2399 let converter = RowConverter::new(vec![SortField::new_with_options(
2400 DataType::Binary,
2401 SortOptions::default().desc().with_nulls_first(false),
2402 )])
2403 .unwrap();
2404 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2405
2406 for i in 0..rows.num_rows() {
2407 for j in i + 1..rows.num_rows() {
2408 assert!(
2409 rows.row(i) > rows.row(j),
2410 "{} > {} - {:?} > {:?}",
2411 i,
2412 j,
2413 rows.row(i),
2414 rows.row(j)
2415 );
2416 }
2417 }
2418
2419 let cols = converter.convert_rows(&rows).unwrap();
2420 assert_eq!(&cols[0], &col);
2421 }
2422
2423 fn dictionary_eq(a: &dyn Array, b: &dyn Array) {
2425 match b.data_type() {
2426 DataType::Dictionary(_, v) => {
2427 assert_eq!(a.data_type(), v.as_ref());
2428 let b = arrow_cast::cast(b, v).unwrap();
2429 assert_eq!(a, b.as_ref())
2430 }
2431 _ => assert_eq!(a, b),
2432 }
2433 }
2434
2435 #[test]
2436 fn test_string_dictionary() {
2437 let a = Arc::new(DictionaryArray::<Int32Type>::from_iter([
2438 Some("foo"),
2439 Some("hello"),
2440 Some("he"),
2441 None,
2442 Some("hello"),
2443 Some(""),
2444 Some("hello"),
2445 Some("hello"),
2446 ])) as ArrayRef;
2447
2448 let field = SortField::new(a.data_type().clone());
2449 let converter = RowConverter::new(vec![field]).unwrap();
2450 let rows_a = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
2451
2452 assert!(rows_a.row(3) < rows_a.row(5));
2453 assert!(rows_a.row(2) < rows_a.row(1));
2454 assert!(rows_a.row(0) < rows_a.row(1));
2455 assert!(rows_a.row(3) < rows_a.row(0));
2456
2457 assert_eq!(rows_a.row(1), rows_a.row(4));
2458 assert_eq!(rows_a.row(1), rows_a.row(6));
2459 assert_eq!(rows_a.row(1), rows_a.row(7));
2460
2461 let cols = converter.convert_rows(&rows_a).unwrap();
2462 dictionary_eq(&cols[0], &a);
2463
2464 let b = Arc::new(DictionaryArray::<Int32Type>::from_iter([
2465 Some("hello"),
2466 None,
2467 Some("cupcakes"),
2468 ])) as ArrayRef;
2469
2470 let rows_b = converter.convert_columns(&[Arc::clone(&b)]).unwrap();
2471 assert_eq!(rows_a.row(1), rows_b.row(0));
2472 assert_eq!(rows_a.row(3), rows_b.row(1));
2473 assert!(rows_b.row(2) < rows_a.row(0));
2474
2475 let cols = converter.convert_rows(&rows_b).unwrap();
2476 dictionary_eq(&cols[0], &b);
2477
2478 let converter = RowConverter::new(vec![SortField::new_with_options(
2479 a.data_type().clone(),
2480 SortOptions::default().desc().with_nulls_first(false),
2481 )])
2482 .unwrap();
2483
2484 let rows_c = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
2485 assert!(rows_c.row(3) > rows_c.row(5));
2486 assert!(rows_c.row(2) > rows_c.row(1));
2487 assert!(rows_c.row(0) > rows_c.row(1));
2488 assert!(rows_c.row(3) > rows_c.row(0));
2489
2490 let cols = converter.convert_rows(&rows_c).unwrap();
2491 dictionary_eq(&cols[0], &a);
2492
2493 let converter = RowConverter::new(vec![SortField::new_with_options(
2494 a.data_type().clone(),
2495 SortOptions::default().desc().with_nulls_first(true),
2496 )])
2497 .unwrap();
2498
2499 let rows_c = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
2500 assert!(rows_c.row(3) < rows_c.row(5));
2501 assert!(rows_c.row(2) > rows_c.row(1));
2502 assert!(rows_c.row(0) > rows_c.row(1));
2503 assert!(rows_c.row(3) < rows_c.row(0));
2504
2505 let cols = converter.convert_rows(&rows_c).unwrap();
2506 dictionary_eq(&cols[0], &a);
2507 }
2508
2509 #[test]
2510 fn test_struct() {
2511 let a = Arc::new(Int32Array::from(vec![1, 1, 2, 2])) as ArrayRef;
2513 let a_f = Arc::new(Field::new("int", DataType::Int32, false));
2514 let u = Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as ArrayRef;
2515 let u_f = Arc::new(Field::new("s", DataType::Utf8, false));
2516 let s1 = Arc::new(StructArray::from(vec![(a_f, a), (u_f, u)])) as ArrayRef;
2517
2518 let sort_fields = vec![SortField::new(s1.data_type().clone())];
2519 let converter = RowConverter::new(sort_fields).unwrap();
2520 let r1 = converter.convert_columns(&[Arc::clone(&s1)]).unwrap();
2521
2522 for (a, b) in r1.iter().zip(r1.iter().skip(1)) {
2523 assert!(a < b);
2524 }
2525
2526 let back = converter.convert_rows(&r1).unwrap();
2527 assert_eq!(back.len(), 1);
2528 assert_eq!(&back[0], &s1);
2529
2530 let data = s1
2532 .to_data()
2533 .into_builder()
2534 .null_bit_buffer(Some(Buffer::from_slice_ref([0b00001010])))
2535 .null_count(2)
2536 .build()
2537 .unwrap();
2538
2539 let s2 = Arc::new(StructArray::from(data)) as ArrayRef;
2540 let r2 = converter.convert_columns(&[Arc::clone(&s2)]).unwrap();
2541 assert_eq!(r2.row(0), r2.row(2)); assert!(r2.row(0) < r2.row(1)); assert_ne!(r1.row(0), r2.row(0)); assert_eq!(r1.row(1), r2.row(1)); let back = converter.convert_rows(&r2).unwrap();
2547 assert_eq!(back.len(), 1);
2548 assert_eq!(&back[0], &s2);
2549
2550 back[0].to_data().validate_full().unwrap();
2551 }
2552
2553 #[test]
2554 fn test_dictionary_in_struct() {
2555 let builder = StringDictionaryBuilder::<Int32Type>::new();
2556 let mut struct_builder = StructBuilder::new(
2557 vec![Field::new_dictionary(
2558 "foo",
2559 DataType::Int32,
2560 DataType::Utf8,
2561 true,
2562 )],
2563 vec![Box::new(builder)],
2564 );
2565
2566 let dict_builder = struct_builder
2567 .field_builder::<StringDictionaryBuilder<Int32Type>>(0)
2568 .unwrap();
2569
2570 dict_builder.append_value("a");
2572 dict_builder.append_null();
2573 dict_builder.append_value("a");
2574 dict_builder.append_value("b");
2575
2576 for _ in 0..4 {
2577 struct_builder.append(true);
2578 }
2579
2580 let s = Arc::new(struct_builder.finish()) as ArrayRef;
2581 let sort_fields = vec![SortField::new(s.data_type().clone())];
2582 let converter = RowConverter::new(sort_fields).unwrap();
2583 let r = converter.convert_columns(&[Arc::clone(&s)]).unwrap();
2584
2585 let back = converter.convert_rows(&r).unwrap();
2586 let [s2] = back.try_into().unwrap();
2587
2588 assert_ne!(&s.data_type(), &s2.data_type());
2591 s2.to_data().validate_full().unwrap();
2592
2593 let s1_struct = s.as_struct();
2597 let s1_0 = s1_struct.column(0);
2598 let s1_idx_0 = s1_0.as_dictionary::<Int32Type>();
2599 let keys = s1_idx_0.keys();
2600 let values = s1_idx_0.values().as_string::<i32>();
2601 let s2_struct = s2.as_struct();
2603 let s2_0 = s2_struct.column(0);
2604 let s2_idx_0 = s2_0.as_string::<i32>();
2605
2606 for i in 0..keys.len() {
2607 if keys.is_null(i) {
2608 assert!(s2_idx_0.is_null(i));
2609 } else {
2610 let dict_index = keys.value(i) as usize;
2611 assert_eq!(values.value(dict_index), s2_idx_0.value(i));
2612 }
2613 }
2614 }
2615
2616 #[test]
2617 fn test_dictionary_in_struct_empty() {
2618 let ty = DataType::Struct(
2619 vec![Field::new_dictionary(
2620 "foo",
2621 DataType::Int32,
2622 DataType::Int32,
2623 false,
2624 )]
2625 .into(),
2626 );
2627 let s = arrow_array::new_empty_array(&ty);
2628
2629 let sort_fields = vec![SortField::new(s.data_type().clone())];
2630 let converter = RowConverter::new(sort_fields).unwrap();
2631 let r = converter.convert_columns(&[Arc::clone(&s)]).unwrap();
2632
2633 let back = converter.convert_rows(&r).unwrap();
2634 let [s2] = back.try_into().unwrap();
2635
2636 assert_ne!(&s.data_type(), &s2.data_type());
2639 s2.to_data().validate_full().unwrap();
2640 assert_eq!(s.len(), 0);
2641 assert_eq!(s2.len(), 0);
2642 }
2643
2644 #[test]
2645 fn test_list_of_string_dictionary() {
2646 let mut builder = ListBuilder::<StringDictionaryBuilder<Int32Type>>::default();
2647 builder.values().append("a").unwrap();
2649 builder.values().append("b").unwrap();
2650 builder.values().append("zero").unwrap();
2651 builder.values().append_null();
2652 builder.values().append("c").unwrap();
2653 builder.values().append("b").unwrap();
2654 builder.values().append("d").unwrap();
2655 builder.append(true);
2656 builder.append(false);
2658 builder.values().append("e").unwrap();
2660 builder.values().append("zero").unwrap();
2661 builder.values().append("a").unwrap();
2662 builder.append(true);
2663
2664 let a = Arc::new(builder.finish()) as ArrayRef;
2665 let data_type = a.data_type().clone();
2666
2667 let field = SortField::new(data_type.clone());
2668 let converter = RowConverter::new(vec![field]).unwrap();
2669 let rows = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
2670
2671 let back = converter.convert_rows(&rows).unwrap();
2672 assert_eq!(back.len(), 1);
2673 let [a2] = back.try_into().unwrap();
2674
2675 assert_ne!(&a.data_type(), &a2.data_type());
2678
2679 a2.to_data().validate_full().unwrap();
2680
2681 let a2_list = a2.as_list::<i32>();
2682 let a1_list = a.as_list::<i32>();
2683
2684 let a1_0 = a1_list.value(0);
2687 let a1_idx_0 = a1_0.as_dictionary::<Int32Type>();
2688 let keys = a1_idx_0.keys();
2689 let values = a1_idx_0.values().as_string::<i32>();
2690 let a2_0 = a2_list.value(0);
2691 let a2_idx_0 = a2_0.as_string::<i32>();
2692
2693 for i in 0..keys.len() {
2694 if keys.is_null(i) {
2695 assert!(a2_idx_0.is_null(i));
2696 } else {
2697 let dict_index = keys.value(i) as usize;
2698 assert_eq!(values.value(dict_index), a2_idx_0.value(i));
2699 }
2700 }
2701
2702 assert!(a1_list.is_null(1));
2704 assert!(a2_list.is_null(1));
2705
2706 let a1_2 = a1_list.value(2);
2708 let a1_idx_2 = a1_2.as_dictionary::<Int32Type>();
2709 let keys = a1_idx_2.keys();
2710 let values = a1_idx_2.values().as_string::<i32>();
2711 let a2_2 = a2_list.value(2);
2712 let a2_idx_2 = a2_2.as_string::<i32>();
2713
2714 for i in 0..keys.len() {
2715 if keys.is_null(i) {
2716 assert!(a2_idx_2.is_null(i));
2717 } else {
2718 let dict_index = keys.value(i) as usize;
2719 assert_eq!(values.value(dict_index), a2_idx_2.value(i));
2720 }
2721 }
2722 }
2723
2724 #[test]
2725 fn test_primitive_dictionary() {
2726 let mut builder = PrimitiveDictionaryBuilder::<Int32Type, Int32Type>::new();
2727 builder.append(2).unwrap();
2728 builder.append(3).unwrap();
2729 builder.append(0).unwrap();
2730 builder.append_null();
2731 builder.append(5).unwrap();
2732 builder.append(3).unwrap();
2733 builder.append(-1).unwrap();
2734
2735 let a = builder.finish();
2736 let data_type = a.data_type().clone();
2737 let columns = [Arc::new(a) as ArrayRef];
2738
2739 let field = SortField::new(data_type.clone());
2740 let converter = RowConverter::new(vec![field]).unwrap();
2741 let rows = converter.convert_columns(&columns).unwrap();
2742 assert!(rows.row(0) < rows.row(1));
2743 assert!(rows.row(2) < rows.row(0));
2744 assert!(rows.row(3) < rows.row(2));
2745 assert!(rows.row(6) < rows.row(2));
2746 assert!(rows.row(3) < rows.row(6));
2747
2748 let back = converter.convert_rows(&rows).unwrap();
2749 assert_eq!(back.len(), 1);
2750 back[0].to_data().validate_full().unwrap();
2751 }
2752
2753 #[test]
2754 fn test_dictionary_nulls() {
2755 let values = Int32Array::from_iter([Some(1), Some(-1), None, Some(4), None]).into_data();
2756 let keys =
2757 Int32Array::from_iter([Some(0), Some(0), Some(1), Some(2), Some(4), None]).into_data();
2758
2759 let data_type = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Int32));
2760 let data = keys
2761 .into_builder()
2762 .data_type(data_type.clone())
2763 .child_data(vec![values])
2764 .build()
2765 .unwrap();
2766
2767 let columns = [Arc::new(DictionaryArray::<Int32Type>::from(data)) as ArrayRef];
2768 let field = SortField::new(data_type.clone());
2769 let converter = RowConverter::new(vec![field]).unwrap();
2770 let rows = converter.convert_columns(&columns).unwrap();
2771
2772 assert_eq!(rows.row(0), rows.row(1));
2773 assert_eq!(rows.row(3), rows.row(4));
2774 assert_eq!(rows.row(4), rows.row(5));
2775 assert!(rows.row(3) < rows.row(0));
2776 }
2777
2778 #[test]
2779 fn test_from_binary_shared_buffer() {
2780 let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
2781 let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _;
2782 let rows = converter.convert_columns(&[array]).unwrap();
2783 let binary_rows = rows.try_into_binary().expect("known-small rows");
2784 let _binary_rows_shared_buffer = binary_rows.clone();
2785
2786 let parsed = converter.from_binary(binary_rows);
2787
2788 converter.convert_rows(parsed.iter()).unwrap();
2789 }
2790
2791 #[test]
2792 #[should_panic(expected = "Encountered non UTF-8 data")]
2793 fn test_invalid_utf8() {
2794 let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
2795 let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _;
2796 let rows = converter.convert_columns(&[array]).unwrap();
2797 let binary_row = rows.row(0);
2798
2799 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2800 let parser = converter.parser();
2801 let utf8_row = parser.parse(binary_row.as_ref());
2802
2803 converter.convert_rows(std::iter::once(utf8_row)).unwrap();
2804 }
2805
2806 #[test]
2807 #[should_panic(expected = "Encountered non UTF-8 data")]
2808 fn test_invalid_utf8_array() {
2809 let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
2810 let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _;
2811 let rows = converter.convert_columns(&[array]).unwrap();
2812 let binary_rows = rows.try_into_binary().expect("known-small rows");
2813
2814 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2815 let parsed = converter.from_binary(binary_rows);
2816
2817 converter.convert_rows(parsed.iter()).unwrap();
2818 }
2819
2820 #[test]
2821 #[should_panic(expected = "index out of bounds")]
2822 fn test_invalid_empty() {
2823 let binary_row: &[u8] = &[];
2824
2825 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2826 let parser = converter.parser();
2827 let utf8_row = parser.parse(binary_row.as_ref());
2828
2829 converter.convert_rows(std::iter::once(utf8_row)).unwrap();
2830 }
2831
2832 #[test]
2833 #[should_panic(expected = "index out of bounds")]
2834 fn test_invalid_empty_array() {
2835 let row: &[u8] = &[];
2836 let binary_rows = BinaryArray::from(vec![row]);
2837
2838 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2839 let parsed = converter.from_binary(binary_rows);
2840
2841 converter.convert_rows(parsed.iter()).unwrap();
2842 }
2843
2844 #[test]
2845 #[should_panic(expected = "index out of bounds")]
2846 fn test_invalid_truncated() {
2847 let binary_row: &[u8] = &[0x02];
2848
2849 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2850 let parser = converter.parser();
2851 let utf8_row = parser.parse(binary_row.as_ref());
2852
2853 converter.convert_rows(std::iter::once(utf8_row)).unwrap();
2854 }
2855
2856 #[test]
2857 #[should_panic(expected = "index out of bounds")]
2858 fn test_invalid_truncated_array() {
2859 let row: &[u8] = &[0x02];
2860 let binary_rows = BinaryArray::from(vec![row]);
2861
2862 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2863 let parsed = converter.from_binary(binary_rows);
2864
2865 converter.convert_rows(parsed.iter()).unwrap();
2866 }
2867
2868 #[test]
2869 #[should_panic(expected = "rows were not produced by this RowConverter")]
2870 fn test_different_converter() {
2871 let values = Arc::new(Int32Array::from_iter([Some(1), Some(-1)]));
2872 let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
2873 let rows = converter.convert_columns(&[values]).unwrap();
2874
2875 let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
2876 let _ = converter.convert_rows(&rows);
2877 }
2878
2879 fn test_single_list<O: OffsetSizeTrait>() {
2880 let mut builder = GenericListBuilder::<O, _>::new(Int32Builder::new());
2881 builder.values().append_value(32);
2882 builder.values().append_value(52);
2883 builder.values().append_value(32);
2884 builder.append(true);
2885 builder.values().append_value(32);
2886 builder.values().append_value(52);
2887 builder.values().append_value(12);
2888 builder.append(true);
2889 builder.values().append_value(32);
2890 builder.values().append_value(52);
2891 builder.append(true);
2892 builder.values().append_value(32); builder.values().append_value(52); builder.append(false);
2895 builder.values().append_value(32);
2896 builder.values().append_null();
2897 builder.append(true);
2898 builder.append(true);
2899 builder.values().append_value(17); builder.values().append_null(); builder.append(false);
2902
2903 let list = Arc::new(builder.finish()) as ArrayRef;
2904 let d = list.data_type().clone();
2905
2906 let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
2907
2908 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2909 assert!(rows.row(0) > rows.row(1)); assert!(rows.row(2) < rows.row(1)); assert!(rows.row(3) < rows.row(2)); assert!(rows.row(4) < rows.row(2)); assert!(rows.row(5) < rows.row(2)); assert!(rows.row(3) < rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
2918 assert_eq!(back.len(), 1);
2919 back[0].to_data().validate_full().unwrap();
2920 assert_eq!(&back[0], &list);
2921
2922 let options = SortOptions::default().asc().with_nulls_first(false);
2923 let field = SortField::new_with_options(d.clone(), options);
2924 let converter = RowConverter::new(vec![field]).unwrap();
2925 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2926
2927 assert!(rows.row(0) > rows.row(1)); assert!(rows.row(2) < rows.row(1)); assert!(rows.row(3) > rows.row(2)); assert!(rows.row(4) > rows.row(2)); assert!(rows.row(5) < rows.row(2)); assert!(rows.row(3) > rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
2936 assert_eq!(back.len(), 1);
2937 back[0].to_data().validate_full().unwrap();
2938 assert_eq!(&back[0], &list);
2939
2940 let options = SortOptions::default().desc().with_nulls_first(false);
2941 let field = SortField::new_with_options(d.clone(), options);
2942 let converter = RowConverter::new(vec![field]).unwrap();
2943 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2944
2945 assert!(rows.row(0) < rows.row(1)); assert!(rows.row(2) > rows.row(1)); assert!(rows.row(3) > rows.row(2)); assert!(rows.row(4) > rows.row(2)); assert!(rows.row(5) > rows.row(2)); assert!(rows.row(3) > rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
2954 assert_eq!(back.len(), 1);
2955 back[0].to_data().validate_full().unwrap();
2956 assert_eq!(&back[0], &list);
2957
2958 let options = SortOptions::default().desc().with_nulls_first(true);
2959 let field = SortField::new_with_options(d, options);
2960 let converter = RowConverter::new(vec![field]).unwrap();
2961 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2962
2963 assert!(rows.row(0) < rows.row(1)); assert!(rows.row(2) > rows.row(1)); assert!(rows.row(3) < rows.row(2)); assert!(rows.row(4) < rows.row(2)); assert!(rows.row(5) > rows.row(2)); assert!(rows.row(3) < rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
2972 assert_eq!(back.len(), 1);
2973 back[0].to_data().validate_full().unwrap();
2974 assert_eq!(&back[0], &list);
2975
2976 let sliced_list = list.slice(1, 5);
2977 let rows_on_sliced_list = converter
2978 .convert_columns(&[Arc::clone(&sliced_list)])
2979 .unwrap();
2980
2981 assert!(rows_on_sliced_list.row(1) > rows_on_sliced_list.row(0)); assert!(rows_on_sliced_list.row(2) < rows_on_sliced_list.row(1)); assert!(rows_on_sliced_list.row(3) < rows_on_sliced_list.row(1)); assert!(rows_on_sliced_list.row(4) > rows_on_sliced_list.row(1)); assert!(rows_on_sliced_list.row(2) < rows_on_sliced_list.row(4)); let back = converter.convert_rows(&rows_on_sliced_list).unwrap();
2988 assert_eq!(back.len(), 1);
2989 back[0].to_data().validate_full().unwrap();
2990 assert_eq!(&back[0], &sliced_list);
2991 }
2992
2993 fn test_nested_list<O: OffsetSizeTrait>() {
2994 let mut builder =
2995 GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(Int32Builder::new()));
2996
2997 builder.values().values().append_value(1);
2998 builder.values().values().append_value(2);
2999 builder.values().append(true);
3000 builder.values().values().append_value(1);
3001 builder.values().values().append_null();
3002 builder.values().append(true);
3003 builder.append(true);
3004
3005 builder.values().values().append_value(1);
3006 builder.values().values().append_null();
3007 builder.values().append(true);
3008 builder.values().values().append_value(1);
3009 builder.values().values().append_null();
3010 builder.values().append(true);
3011 builder.append(true);
3012
3013 builder.values().values().append_value(1);
3014 builder.values().values().append_null();
3015 builder.values().append(true);
3016 builder.values().append(false);
3017 builder.append(true);
3018 builder.append(false);
3019
3020 builder.values().values().append_value(1);
3021 builder.values().values().append_value(2);
3022 builder.values().append(true);
3023 builder.append(true);
3024
3025 let list = Arc::new(builder.finish()) as ArrayRef;
3026 let d = list.data_type().clone();
3027
3028 let options = SortOptions::default().asc().with_nulls_first(true);
3036 let field = SortField::new_with_options(d.clone(), options);
3037 let converter = RowConverter::new(vec![field]).unwrap();
3038 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3039
3040 assert!(rows.row(0) > rows.row(1));
3041 assert!(rows.row(1) > rows.row(2));
3042 assert!(rows.row(2) > rows.row(3));
3043 assert!(rows.row(4) < rows.row(0));
3044 assert!(rows.row(4) > rows.row(1));
3045
3046 let back = converter.convert_rows(&rows).unwrap();
3047 assert_eq!(back.len(), 1);
3048 back[0].to_data().validate_full().unwrap();
3049 assert_eq!(&back[0], &list);
3050
3051 let options = SortOptions::default().desc().with_nulls_first(true);
3052 let field = SortField::new_with_options(d.clone(), options);
3053 let converter = RowConverter::new(vec![field]).unwrap();
3054 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3055
3056 assert!(rows.row(0) > rows.row(1));
3057 assert!(rows.row(1) > rows.row(2));
3058 assert!(rows.row(2) > rows.row(3));
3059 assert!(rows.row(4) > rows.row(0));
3060 assert!(rows.row(4) > rows.row(1));
3061
3062 let back = converter.convert_rows(&rows).unwrap();
3063 assert_eq!(back.len(), 1);
3064 back[0].to_data().validate_full().unwrap();
3065 assert_eq!(&back[0], &list);
3066
3067 let options = SortOptions::default().desc().with_nulls_first(false);
3068 let field = SortField::new_with_options(d, options);
3069 let converter = RowConverter::new(vec![field]).unwrap();
3070 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3071
3072 assert!(rows.row(0) < rows.row(1));
3073 assert!(rows.row(1) < rows.row(2));
3074 assert!(rows.row(2) < rows.row(3));
3075 assert!(rows.row(4) > rows.row(0));
3076 assert!(rows.row(4) < rows.row(1));
3077
3078 let back = converter.convert_rows(&rows).unwrap();
3079 assert_eq!(back.len(), 1);
3080 back[0].to_data().validate_full().unwrap();
3081 assert_eq!(&back[0], &list);
3082
3083 let sliced_list = list.slice(1, 3);
3084 let rows = converter
3085 .convert_columns(&[Arc::clone(&sliced_list)])
3086 .unwrap();
3087
3088 assert!(rows.row(0) < rows.row(1));
3089 assert!(rows.row(1) < rows.row(2));
3090
3091 let back = converter.convert_rows(&rows).unwrap();
3092 assert_eq!(back.len(), 1);
3093 back[0].to_data().validate_full().unwrap();
3094 assert_eq!(&back[0], &sliced_list);
3095 }
3096
3097 #[test]
3098 fn test_list() {
3099 test_single_list::<i32>();
3100 test_nested_list::<i32>();
3101 }
3102
3103 #[test]
3104 fn test_large_list() {
3105 test_single_list::<i64>();
3106 test_nested_list::<i64>();
3107 }
3108
3109 #[test]
3110 fn test_fixed_size_list() {
3111 let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 3);
3112 builder.values().append_value(32);
3113 builder.values().append_value(52);
3114 builder.values().append_value(32);
3115 builder.append(true);
3116 builder.values().append_value(32);
3117 builder.values().append_value(52);
3118 builder.values().append_value(12);
3119 builder.append(true);
3120 builder.values().append_value(32);
3121 builder.values().append_value(52);
3122 builder.values().append_null();
3123 builder.append(true);
3124 builder.values().append_value(32); builder.values().append_value(52); builder.values().append_value(13); builder.append(false);
3128 builder.values().append_value(32);
3129 builder.values().append_null();
3130 builder.values().append_null();
3131 builder.append(true);
3132 builder.values().append_null();
3133 builder.values().append_null();
3134 builder.values().append_null();
3135 builder.append(true);
3136 builder.values().append_value(17); builder.values().append_null(); builder.values().append_value(77); builder.append(false);
3140
3141 let list = Arc::new(builder.finish()) as ArrayRef;
3142 let d = list.data_type().clone();
3143
3144 let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
3146
3147 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3148 assert!(rows.row(0) > rows.row(1)); assert!(rows.row(2) < rows.row(1)); assert!(rows.row(3) < rows.row(2)); assert!(rows.row(4) < rows.row(2)); assert!(rows.row(5) < rows.row(2)); assert!(rows.row(3) < rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
3157 assert_eq!(back.len(), 1);
3158 back[0].to_data().validate_full().unwrap();
3159 assert_eq!(&back[0], &list);
3160
3161 let options = SortOptions::default().asc().with_nulls_first(false);
3163 let field = SortField::new_with_options(d.clone(), options);
3164 let converter = RowConverter::new(vec![field]).unwrap();
3165 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3166 assert!(rows.row(0) > rows.row(1)); assert!(rows.row(2) > rows.row(1)); assert!(rows.row(3) > rows.row(2)); assert!(rows.row(4) > rows.row(2)); assert!(rows.row(5) > rows.row(2)); assert!(rows.row(3) > rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
3175 assert_eq!(back.len(), 1);
3176 back[0].to_data().validate_full().unwrap();
3177 assert_eq!(&back[0], &list);
3178
3179 let options = SortOptions::default().desc().with_nulls_first(false);
3181 let field = SortField::new_with_options(d.clone(), options);
3182 let converter = RowConverter::new(vec![field]).unwrap();
3183 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3184 assert!(rows.row(0) < rows.row(1)); assert!(rows.row(2) > rows.row(1)); assert!(rows.row(3) > rows.row(2)); assert!(rows.row(4) > rows.row(2)); assert!(rows.row(5) > rows.row(2)); assert!(rows.row(3) > rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
3193 assert_eq!(back.len(), 1);
3194 back[0].to_data().validate_full().unwrap();
3195 assert_eq!(&back[0], &list);
3196
3197 let options = SortOptions::default().desc().with_nulls_first(true);
3199 let field = SortField::new_with_options(d, options);
3200 let converter = RowConverter::new(vec![field]).unwrap();
3201 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3202
3203 assert!(rows.row(0) < rows.row(1)); assert!(rows.row(2) < rows.row(1)); assert!(rows.row(3) < rows.row(2)); assert!(rows.row(4) < rows.row(2)); assert!(rows.row(5) < rows.row(2)); assert!(rows.row(3) < rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
3212 assert_eq!(back.len(), 1);
3213 back[0].to_data().validate_full().unwrap();
3214 assert_eq!(&back[0], &list);
3215
3216 let sliced_list = list.slice(1, 5);
3217 let rows_on_sliced_list = converter
3218 .convert_columns(&[Arc::clone(&sliced_list)])
3219 .unwrap();
3220
3221 assert!(rows_on_sliced_list.row(2) < rows_on_sliced_list.row(1)); assert!(rows_on_sliced_list.row(3) < rows_on_sliced_list.row(1)); assert!(rows_on_sliced_list.row(4) < rows_on_sliced_list.row(1)); assert!(rows_on_sliced_list.row(2) < rows_on_sliced_list.row(4)); let back = converter.convert_rows(&rows_on_sliced_list).unwrap();
3227 assert_eq!(back.len(), 1);
3228 back[0].to_data().validate_full().unwrap();
3229 assert_eq!(&back[0], &sliced_list);
3230 }
3231
3232 #[test]
3233 fn test_two_fixed_size_lists() {
3234 let mut first = FixedSizeListBuilder::new(UInt8Builder::new(), 1);
3235 first.values().append_value(100);
3237 first.append(true);
3238 first.values().append_value(101);
3240 first.append(true);
3241 first.values().append_value(102);
3243 first.append(true);
3244 first.values().append_null();
3246 first.append(true);
3247 first.values().append_null(); first.append(false);
3250 let first = Arc::new(first.finish()) as ArrayRef;
3251 let first_type = first.data_type().clone();
3252
3253 let mut second = FixedSizeListBuilder::new(UInt8Builder::new(), 1);
3254 second.values().append_value(200);
3256 second.append(true);
3257 second.values().append_value(201);
3259 second.append(true);
3260 second.values().append_value(202);
3262 second.append(true);
3263 second.values().append_null();
3265 second.append(true);
3266 second.values().append_null(); second.append(false);
3269 let second = Arc::new(second.finish()) as ArrayRef;
3270 let second_type = second.data_type().clone();
3271
3272 let converter = RowConverter::new(vec![
3273 SortField::new(first_type.clone()),
3274 SortField::new(second_type.clone()),
3275 ])
3276 .unwrap();
3277
3278 let rows = converter
3279 .convert_columns(&[Arc::clone(&first), Arc::clone(&second)])
3280 .unwrap();
3281
3282 let back = converter.convert_rows(&rows).unwrap();
3283 assert_eq!(back.len(), 2);
3284 back[0].to_data().validate_full().unwrap();
3285 assert_eq!(&back[0], &first);
3286 back[1].to_data().validate_full().unwrap();
3287 assert_eq!(&back[1], &second);
3288 }
3289
3290 #[test]
3291 fn test_fixed_size_list_with_variable_width_content() {
3292 let mut first = FixedSizeListBuilder::new(
3293 StructBuilder::from_fields(
3294 vec![
3295 Field::new(
3296 "timestamp",
3297 DataType::Timestamp(TimeUnit::Microsecond, Some(Arc::from("UTC"))),
3298 false,
3299 ),
3300 Field::new("offset_minutes", DataType::Int16, false),
3301 Field::new("time_zone", DataType::Utf8, false),
3302 ],
3303 1,
3304 ),
3305 1,
3306 );
3307 first
3309 .values()
3310 .field_builder::<TimestampMicrosecondBuilder>(0)
3311 .unwrap()
3312 .append_null();
3313 first
3314 .values()
3315 .field_builder::<Int16Builder>(1)
3316 .unwrap()
3317 .append_null();
3318 first
3319 .values()
3320 .field_builder::<StringBuilder>(2)
3321 .unwrap()
3322 .append_null();
3323 first.values().append(false);
3324 first.append(false);
3325 first
3327 .values()
3328 .field_builder::<TimestampMicrosecondBuilder>(0)
3329 .unwrap()
3330 .append_null();
3331 first
3332 .values()
3333 .field_builder::<Int16Builder>(1)
3334 .unwrap()
3335 .append_null();
3336 first
3337 .values()
3338 .field_builder::<StringBuilder>(2)
3339 .unwrap()
3340 .append_null();
3341 first.values().append(false);
3342 first.append(true);
3343 first
3345 .values()
3346 .field_builder::<TimestampMicrosecondBuilder>(0)
3347 .unwrap()
3348 .append_value(0);
3349 first
3350 .values()
3351 .field_builder::<Int16Builder>(1)
3352 .unwrap()
3353 .append_value(0);
3354 first
3355 .values()
3356 .field_builder::<StringBuilder>(2)
3357 .unwrap()
3358 .append_value("UTC");
3359 first.values().append(true);
3360 first.append(true);
3361 first
3363 .values()
3364 .field_builder::<TimestampMicrosecondBuilder>(0)
3365 .unwrap()
3366 .append_value(1126351800123456);
3367 first
3368 .values()
3369 .field_builder::<Int16Builder>(1)
3370 .unwrap()
3371 .append_value(120);
3372 first
3373 .values()
3374 .field_builder::<StringBuilder>(2)
3375 .unwrap()
3376 .append_value("Europe/Warsaw");
3377 first.values().append(true);
3378 first.append(true);
3379 let first = Arc::new(first.finish()) as ArrayRef;
3380 let first_type = first.data_type().clone();
3381
3382 let mut second = StringBuilder::new();
3383 second.append_value("somewhere near");
3384 second.append_null();
3385 second.append_value("Greenwich");
3386 second.append_value("Warsaw");
3387 let second = Arc::new(second.finish()) as ArrayRef;
3388 let second_type = second.data_type().clone();
3389
3390 let converter = RowConverter::new(vec![
3391 SortField::new(first_type.clone()),
3392 SortField::new(second_type.clone()),
3393 ])
3394 .unwrap();
3395
3396 let rows = converter
3397 .convert_columns(&[Arc::clone(&first), Arc::clone(&second)])
3398 .unwrap();
3399
3400 let back = converter.convert_rows(&rows).unwrap();
3401 assert_eq!(back.len(), 2);
3402 back[0].to_data().validate_full().unwrap();
3403 assert_eq!(&back[0], &first);
3404 back[1].to_data().validate_full().unwrap();
3405 assert_eq!(&back[1], &second);
3406 }
3407
3408 fn generate_primitive_array<K>(len: usize, valid_percent: f64) -> PrimitiveArray<K>
3409 where
3410 K: ArrowPrimitiveType,
3411 StandardUniform: Distribution<K::Native>,
3412 {
3413 let mut rng = rng();
3414 (0..len)
3415 .map(|_| rng.random_bool(valid_percent).then(|| rng.random()))
3416 .collect()
3417 }
3418
3419 fn generate_strings<O: OffsetSizeTrait>(
3420 len: usize,
3421 valid_percent: f64,
3422 ) -> GenericStringArray<O> {
3423 let mut rng = rng();
3424 (0..len)
3425 .map(|_| {
3426 rng.random_bool(valid_percent).then(|| {
3427 let len = rng.random_range(0..100);
3428 let bytes = (0..len).map(|_| rng.random_range(0..128)).collect();
3429 String::from_utf8(bytes).unwrap()
3430 })
3431 })
3432 .collect()
3433 }
3434
3435 fn generate_string_view(len: usize, valid_percent: f64) -> StringViewArray {
3436 let mut rng = rng();
3437 (0..len)
3438 .map(|_| {
3439 rng.random_bool(valid_percent).then(|| {
3440 let len = rng.random_range(0..100);
3441 let bytes = (0..len).map(|_| rng.random_range(0..128)).collect();
3442 String::from_utf8(bytes).unwrap()
3443 })
3444 })
3445 .collect()
3446 }
3447
3448 fn generate_byte_view(len: usize, valid_percent: f64) -> BinaryViewArray {
3449 let mut rng = rng();
3450 (0..len)
3451 .map(|_| {
3452 rng.random_bool(valid_percent).then(|| {
3453 let len = rng.random_range(0..100);
3454 let bytes: Vec<_> = (0..len).map(|_| rng.random_range(0..128)).collect();
3455 bytes
3456 })
3457 })
3458 .collect()
3459 }
3460
3461 fn generate_fixed_stringview_column(len: usize) -> StringViewArray {
3462 let edge_cases = vec![
3463 Some("bar".to_string()),
3464 Some("bar\0".to_string()),
3465 Some("LongerThan12Bytes".to_string()),
3466 Some("LongerThan12Bytez".to_string()),
3467 Some("LongerThan12Bytes\0".to_string()),
3468 Some("LongerThan12Byt".to_string()),
3469 Some("backend one".to_string()),
3470 Some("backend two".to_string()),
3471 Some("a".repeat(257)),
3472 Some("a".repeat(300)),
3473 ];
3474
3475 let mut values = Vec::with_capacity(len);
3477 for i in 0..len {
3478 values.push(
3479 edge_cases
3480 .get(i % edge_cases.len())
3481 .cloned()
3482 .unwrap_or(None),
3483 );
3484 }
3485
3486 StringViewArray::from(values)
3487 }
3488
3489 fn generate_dictionary<K>(
3490 values: ArrayRef,
3491 len: usize,
3492 valid_percent: f64,
3493 ) -> DictionaryArray<K>
3494 where
3495 K: ArrowDictionaryKeyType,
3496 K::Native: SampleUniform,
3497 {
3498 let mut rng = rng();
3499 let min_key = K::Native::from_usize(0).unwrap();
3500 let max_key = K::Native::from_usize(values.len()).unwrap();
3501 let keys: PrimitiveArray<K> = (0..len)
3502 .map(|_| {
3503 rng.random_bool(valid_percent)
3504 .then(|| rng.random_range(min_key..max_key))
3505 })
3506 .collect();
3507
3508 let data_type =
3509 DataType::Dictionary(Box::new(K::DATA_TYPE), Box::new(values.data_type().clone()));
3510
3511 let data = keys
3512 .into_data()
3513 .into_builder()
3514 .data_type(data_type)
3515 .add_child_data(values.to_data())
3516 .build()
3517 .unwrap();
3518
3519 DictionaryArray::from(data)
3520 }
3521
3522 fn generate_fixed_size_binary(len: usize, valid_percent: f64) -> FixedSizeBinaryArray {
3523 let mut rng = rng();
3524 let width = rng.random_range(0..20);
3525 let mut builder = FixedSizeBinaryBuilder::new(width);
3526
3527 let mut b = vec![0; width as usize];
3528 for _ in 0..len {
3529 match rng.random_bool(valid_percent) {
3530 true => {
3531 b.iter_mut().for_each(|x| *x = rng.random());
3532 builder.append_value(&b).unwrap();
3533 }
3534 false => builder.append_null(),
3535 }
3536 }
3537
3538 builder.finish()
3539 }
3540
3541 fn generate_struct(len: usize, valid_percent: f64) -> StructArray {
3542 let mut rng = rng();
3543 let nulls = NullBuffer::from_iter((0..len).map(|_| rng.random_bool(valid_percent)));
3544 let a = generate_primitive_array::<Int32Type>(len, valid_percent);
3545 let b = generate_strings::<i32>(len, valid_percent);
3546 let fields = Fields::from(vec![
3547 Field::new("a", DataType::Int32, true),
3548 Field::new("b", DataType::Utf8, true),
3549 ]);
3550 let values = vec![Arc::new(a) as _, Arc::new(b) as _];
3551 StructArray::new(fields, values, Some(nulls))
3552 }
3553
3554 fn generate_list<F>(len: usize, valid_percent: f64, values: F) -> ListArray
3555 where
3556 F: FnOnce(usize) -> ArrayRef,
3557 {
3558 let mut rng = rng();
3559 let offsets = OffsetBuffer::<i32>::from_lengths((0..len).map(|_| rng.random_range(0..10)));
3560 let values_len = offsets.last().unwrap().to_usize().unwrap();
3561 let values = values(values_len);
3562 let nulls = NullBuffer::from_iter((0..len).map(|_| rng.random_bool(valid_percent)));
3563 let field = Arc::new(Field::new_list_field(values.data_type().clone(), true));
3564 ListArray::new(field, offsets, values, Some(nulls))
3565 }
3566
3567 fn generate_column(len: usize) -> ArrayRef {
3568 let mut rng = rng();
3569 match rng.random_range(0..18) {
3570 0 => Arc::new(generate_primitive_array::<Int32Type>(len, 0.8)),
3571 1 => Arc::new(generate_primitive_array::<UInt32Type>(len, 0.8)),
3572 2 => Arc::new(generate_primitive_array::<Int64Type>(len, 0.8)),
3573 3 => Arc::new(generate_primitive_array::<UInt64Type>(len, 0.8)),
3574 4 => Arc::new(generate_primitive_array::<Float32Type>(len, 0.8)),
3575 5 => Arc::new(generate_primitive_array::<Float64Type>(len, 0.8)),
3576 6 => Arc::new(generate_strings::<i32>(len, 0.8)),
3577 7 => Arc::new(generate_dictionary::<Int64Type>(
3578 Arc::new(generate_strings::<i32>(rng.random_range(1..len), 1.0)),
3580 len,
3581 0.8,
3582 )),
3583 8 => Arc::new(generate_dictionary::<Int64Type>(
3584 Arc::new(generate_primitive_array::<Int64Type>(
3586 rng.random_range(1..len),
3587 1.0,
3588 )),
3589 len,
3590 0.8,
3591 )),
3592 9 => Arc::new(generate_fixed_size_binary(len, 0.8)),
3593 10 => Arc::new(generate_struct(len, 0.8)),
3594 11 => Arc::new(generate_list(len, 0.8, |values_len| {
3595 Arc::new(generate_primitive_array::<Int64Type>(values_len, 0.8))
3596 })),
3597 12 => Arc::new(generate_list(len, 0.8, |values_len| {
3598 Arc::new(generate_strings::<i32>(values_len, 0.8))
3599 })),
3600 13 => Arc::new(generate_list(len, 0.8, |values_len| {
3601 Arc::new(generate_struct(values_len, 0.8))
3602 })),
3603 14 => Arc::new(generate_string_view(len, 0.8)),
3604 15 => Arc::new(generate_byte_view(len, 0.8)),
3605 16 => Arc::new(generate_fixed_stringview_column(len)),
3606 17 => Arc::new(
3607 generate_list(len + 1000, 0.8, |values_len| {
3608 Arc::new(generate_primitive_array::<Int64Type>(values_len, 0.8))
3609 })
3610 .slice(500, len),
3611 ),
3612 _ => unreachable!(),
3613 }
3614 }
3615
3616 fn print_row(cols: &[SortColumn], row: usize) -> String {
3617 let t: Vec<_> = cols
3618 .iter()
3619 .map(|x| match x.values.is_valid(row) {
3620 true => {
3621 let opts = FormatOptions::default().with_null("NULL");
3622 let formatter = ArrayFormatter::try_new(x.values.as_ref(), &opts).unwrap();
3623 formatter.value(row).to_string()
3624 }
3625 false => "NULL".to_string(),
3626 })
3627 .collect();
3628 t.join(",")
3629 }
3630
3631 fn print_col_types(cols: &[SortColumn]) -> String {
3632 let t: Vec<_> = cols
3633 .iter()
3634 .map(|x| x.values.data_type().to_string())
3635 .collect();
3636 t.join(",")
3637 }
3638
3639 #[test]
3640 #[cfg_attr(miri, ignore)]
3641 fn fuzz_test() {
3642 for _ in 0..100 {
3643 let mut rng = rng();
3644 let num_columns = rng.random_range(1..5);
3645 let len = rng.random_range(5..100);
3646 let arrays: Vec<_> = (0..num_columns).map(|_| generate_column(len)).collect();
3647
3648 let options: Vec<_> = (0..num_columns)
3649 .map(|_| SortOptions {
3650 descending: rng.random_bool(0.5),
3651 nulls_first: rng.random_bool(0.5),
3652 })
3653 .collect();
3654
3655 let sort_columns: Vec<_> = options
3656 .iter()
3657 .zip(&arrays)
3658 .map(|(o, c)| SortColumn {
3659 values: Arc::clone(c),
3660 options: Some(*o),
3661 })
3662 .collect();
3663
3664 let comparator = LexicographicalComparator::try_new(&sort_columns).unwrap();
3665
3666 let columns: Vec<SortField> = options
3667 .into_iter()
3668 .zip(&arrays)
3669 .map(|(o, a)| SortField::new_with_options(a.data_type().clone(), o))
3670 .collect();
3671
3672 let converter = RowConverter::new(columns).unwrap();
3673 let rows = converter.convert_columns(&arrays).unwrap();
3674
3675 for i in 0..len {
3676 for j in 0..len {
3677 let row_i = rows.row(i);
3678 let row_j = rows.row(j);
3679 let row_cmp = row_i.cmp(&row_j);
3680 let lex_cmp = comparator.compare(i, j);
3681 assert_eq!(
3682 row_cmp,
3683 lex_cmp,
3684 "({:?} vs {:?}) vs ({:?} vs {:?}) for types {}",
3685 print_row(&sort_columns, i),
3686 print_row(&sort_columns, j),
3687 row_i,
3688 row_j,
3689 print_col_types(&sort_columns)
3690 );
3691 }
3692 }
3693
3694 let back = converter.convert_rows(&rows).unwrap();
3697 for (actual, expected) in back.iter().zip(&arrays) {
3698 actual.to_data().validate_full().unwrap();
3699 dictionary_eq(actual, expected)
3700 }
3701
3702 let rows = rows.try_into_binary().expect("reasonable size");
3705 let parser = converter.parser();
3706 let back = converter
3707 .convert_rows(rows.iter().map(|b| parser.parse(b.expect("valid bytes"))))
3708 .unwrap();
3709 for (actual, expected) in back.iter().zip(&arrays) {
3710 actual.to_data().validate_full().unwrap();
3711 dictionary_eq(actual, expected)
3712 }
3713
3714 let rows = converter.from_binary(rows);
3715 let back = converter.convert_rows(&rows).unwrap();
3716 for (actual, expected) in back.iter().zip(&arrays) {
3717 actual.to_data().validate_full().unwrap();
3718 dictionary_eq(actual, expected)
3719 }
3720 }
3721 }
3722
3723 #[test]
3724 fn test_clear() {
3725 let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
3726 let mut rows = converter.empty_rows(3, 128);
3727
3728 let first = Int32Array::from(vec![None, Some(2), Some(4)]);
3729 let second = Int32Array::from(vec![Some(2), None, Some(4)]);
3730 let arrays = [Arc::new(first) as ArrayRef, Arc::new(second) as ArrayRef];
3731
3732 for array in arrays.iter() {
3733 rows.clear();
3734 converter
3735 .append(&mut rows, std::slice::from_ref(array))
3736 .unwrap();
3737 let back = converter.convert_rows(&rows).unwrap();
3738 assert_eq!(&back[0], array);
3739 }
3740
3741 let mut rows_expected = converter.empty_rows(3, 128);
3742 converter.append(&mut rows_expected, &arrays[1..]).unwrap();
3743
3744 for (i, (actual, expected)) in rows.iter().zip(rows_expected.iter()).enumerate() {
3745 assert_eq!(
3746 actual, expected,
3747 "For row {i}: expected {expected:?}, actual: {actual:?}",
3748 );
3749 }
3750 }
3751
3752 #[test]
3753 fn test_append_codec_dictionary_binary() {
3754 use DataType::*;
3755 let converter = RowConverter::new(vec![SortField::new(Dictionary(
3757 Box::new(Int32),
3758 Box::new(Binary),
3759 ))])
3760 .unwrap();
3761 let mut rows = converter.empty_rows(4, 128);
3762
3763 let keys = Int32Array::from_iter_values([0, 1, 2, 3]);
3764 let values = BinaryArray::from(vec![
3765 Some("a".as_bytes()),
3766 Some(b"b"),
3767 Some(b"c"),
3768 Some(b"d"),
3769 ]);
3770 let dict_array = DictionaryArray::new(keys, Arc::new(values));
3771
3772 rows.clear();
3773 let array = Arc::new(dict_array) as ArrayRef;
3774 converter
3775 .append(&mut rows, std::slice::from_ref(&array))
3776 .unwrap();
3777 let back = converter.convert_rows(&rows).unwrap();
3778
3779 dictionary_eq(&back[0], &array);
3780 }
3781
3782 #[test]
3783 fn test_list_prefix() {
3784 let mut a = ListBuilder::new(Int8Builder::new());
3785 a.append_value([None]);
3786 a.append_value([None, None]);
3787 let a = a.finish();
3788
3789 let converter = RowConverter::new(vec![SortField::new(a.data_type().clone())]).unwrap();
3790 let rows = converter.convert_columns(&[Arc::new(a) as _]).unwrap();
3791 assert_eq!(rows.row(0).cmp(&rows.row(1)), Ordering::Less);
3792 }
3793
3794 #[test]
3795 fn map_should_be_marked_as_unsupported() {
3796 let map_data_type = Field::new_map(
3797 "map",
3798 "entries",
3799 Field::new("key", DataType::Utf8, false),
3800 Field::new("value", DataType::Utf8, true),
3801 false,
3802 true,
3803 )
3804 .data_type()
3805 .clone();
3806
3807 let is_supported = RowConverter::supports_fields(&[SortField::new(map_data_type)]);
3808
3809 assert!(!is_supported, "Map should not be supported");
3810 }
3811
3812 #[test]
3813 fn should_fail_to_create_row_converter_for_unsupported_map_type() {
3814 let map_data_type = Field::new_map(
3815 "map",
3816 "entries",
3817 Field::new("key", DataType::Utf8, false),
3818 Field::new("value", DataType::Utf8, true),
3819 false,
3820 true,
3821 )
3822 .data_type()
3823 .clone();
3824
3825 let converter = RowConverter::new(vec![SortField::new(map_data_type)]);
3826
3827 match converter {
3828 Err(ArrowError::NotYetImplemented(message)) => {
3829 assert!(
3830 message.contains("Row format support not yet implemented for"),
3831 "Expected NotYetImplemented error for map data type, got: {message}",
3832 );
3833 }
3834 Err(e) => panic!("Expected NotYetImplemented error, got: {e}"),
3835 Ok(_) => panic!("Expected NotYetImplemented error for map data type"),
3836 }
3837 }
3838
3839 #[test]
3840 fn test_values_buffer_smaller_when_utf8_validation_disabled() {
3841 fn get_values_buffer_len(col: ArrayRef) -> (usize, usize) {
3842 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8View)]).unwrap();
3844
3845 let rows = converter.convert_columns(&[col]).unwrap();
3847 let converted = converter.convert_rows(&rows).unwrap();
3848 let unchecked_values_len = converted[0].as_string_view().data_buffers()[0].len();
3849
3850 let rows = rows.try_into_binary().expect("reasonable size");
3852 let parser = converter.parser();
3853 let converted = converter
3854 .convert_rows(rows.iter().map(|b| parser.parse(b.expect("valid bytes"))))
3855 .unwrap();
3856 let checked_values_len = converted[0].as_string_view().data_buffers()[0].len();
3857 (unchecked_values_len, checked_values_len)
3858 }
3859
3860 let col = Arc::new(StringViewArray::from_iter([
3862 Some("hello"), None, Some("short"), Some("tiny"), ])) as ArrayRef;
3867
3868 let (unchecked_values_len, checked_values_len) = get_values_buffer_len(col);
3869 assert_eq!(unchecked_values_len, 0);
3871 assert_eq!(checked_values_len, 14);
3873
3874 let col = Arc::new(StringViewArray::from_iter([
3876 Some("this is a very long string over 12 bytes"),
3877 Some("another long string to test the buffer"),
3878 ])) as ArrayRef;
3879
3880 let (unchecked_values_len, checked_values_len) = get_values_buffer_len(col);
3881 assert!(unchecked_values_len > 0);
3883 assert_eq!(unchecked_values_len, checked_values_len);
3884
3885 let col = Arc::new(StringViewArray::from_iter([
3887 Some("tiny"), Some("thisisexact13"), None,
3890 Some("short"), ])) as ArrayRef;
3892
3893 let (unchecked_values_len, checked_values_len) = get_values_buffer_len(col);
3894 assert_eq!(unchecked_values_len, 13);
3896 assert!(checked_values_len > unchecked_values_len);
3897 }
3898
3899 #[test]
3900 fn test_sparse_union() {
3901 let int_array = Int32Array::from(vec![Some(1), None, Some(3), None, Some(5)]);
3903 let str_array = StringArray::from(vec![None, Some("b"), None, Some("d"), None]);
3904
3905 let type_ids = vec![0, 1, 0, 1, 0].into();
3907
3908 let union_fields = [
3909 (0, Arc::new(Field::new("int", DataType::Int32, false))),
3910 (1, Arc::new(Field::new("str", DataType::Utf8, false))),
3911 ]
3912 .into_iter()
3913 .collect();
3914
3915 let union_array = UnionArray::try_new(
3916 union_fields,
3917 type_ids,
3918 None,
3919 vec![Arc::new(int_array) as ArrayRef, Arc::new(str_array)],
3920 )
3921 .unwrap();
3922
3923 let union_type = union_array.data_type().clone();
3924 let converter = RowConverter::new(vec![SortField::new(union_type)]).unwrap();
3925
3926 let rows = converter
3927 .convert_columns(&[Arc::new(union_array.clone())])
3928 .unwrap();
3929
3930 let back = converter.convert_rows(&rows).unwrap();
3932 let back_union = back[0].as_any().downcast_ref::<UnionArray>().unwrap();
3933
3934 assert_eq!(union_array.len(), back_union.len());
3935 for i in 0..union_array.len() {
3936 assert_eq!(union_array.type_id(i), back_union.type_id(i));
3937 }
3938 }
3939
3940 #[test]
3941 fn test_sparse_union_with_nulls() {
3942 let int_array = Int32Array::from(vec![Some(1), None, Some(3), None, Some(5)]);
3944 let str_array = StringArray::from(vec![None::<&str>; 5]);
3945
3946 let type_ids = vec![0, 1, 0, 1, 0].into();
3948
3949 let union_fields = [
3950 (0, Arc::new(Field::new("int", DataType::Int32, true))),
3951 (1, Arc::new(Field::new("str", DataType::Utf8, true))),
3952 ]
3953 .into_iter()
3954 .collect();
3955
3956 let union_array = UnionArray::try_new(
3957 union_fields,
3958 type_ids,
3959 None,
3960 vec![Arc::new(int_array) as ArrayRef, Arc::new(str_array)],
3961 )
3962 .unwrap();
3963
3964 let union_type = union_array.data_type().clone();
3965 let converter = RowConverter::new(vec![SortField::new(union_type)]).unwrap();
3966
3967 let rows = converter
3968 .convert_columns(&[Arc::new(union_array.clone())])
3969 .unwrap();
3970
3971 let back = converter.convert_rows(&rows).unwrap();
3973 let back_union = back[0].as_any().downcast_ref::<UnionArray>().unwrap();
3974
3975 assert_eq!(union_array.len(), back_union.len());
3976 for i in 0..union_array.len() {
3977 let expected_null = union_array.is_null(i);
3978 let actual_null = back_union.is_null(i);
3979 assert_eq!(expected_null, actual_null, "Null mismatch at index {i}");
3980 if !expected_null {
3981 assert_eq!(union_array.type_id(i), back_union.type_id(i));
3982 }
3983 }
3984 }
3985
3986 #[test]
3987 fn test_dense_union() {
3988 let int_array = Int32Array::from(vec![1, 3, 5]);
3990 let str_array = StringArray::from(vec!["a", "b"]);
3991
3992 let type_ids = vec![0, 1, 0, 1, 0].into();
3993
3994 let offsets = vec![0, 0, 1, 1, 2].into();
3996
3997 let union_fields = [
3998 (0, Arc::new(Field::new("int", DataType::Int32, false))),
3999 (1, Arc::new(Field::new("str", DataType::Utf8, false))),
4000 ]
4001 .into_iter()
4002 .collect();
4003
4004 let union_array = UnionArray::try_new(
4005 union_fields,
4006 type_ids,
4007 Some(offsets), vec![Arc::new(int_array) as ArrayRef, Arc::new(str_array)],
4009 )
4010 .unwrap();
4011
4012 let union_type = union_array.data_type().clone();
4013 let converter = RowConverter::new(vec![SortField::new(union_type)]).unwrap();
4014
4015 let rows = converter
4016 .convert_columns(&[Arc::new(union_array.clone())])
4017 .unwrap();
4018
4019 let back = converter.convert_rows(&rows).unwrap();
4021 let back_union = back[0].as_any().downcast_ref::<UnionArray>().unwrap();
4022
4023 assert_eq!(union_array.len(), back_union.len());
4024 for i in 0..union_array.len() {
4025 assert_eq!(union_array.type_id(i), back_union.type_id(i));
4026 }
4027 }
4028
4029 #[test]
4030 fn test_dense_union_with_nulls() {
4031 let int_array = Int32Array::from(vec![Some(1), None, Some(5)]);
4033 let str_array = StringArray::from(vec![Some("a"), None]);
4034
4035 let type_ids = vec![0, 1, 0, 1, 0].into();
4037 let offsets = vec![0, 0, 1, 1, 2].into();
4038
4039 let union_fields = [
4040 (0, Arc::new(Field::new("int", DataType::Int32, true))),
4041 (1, Arc::new(Field::new("str", DataType::Utf8, true))),
4042 ]
4043 .into_iter()
4044 .collect();
4045
4046 let union_array = UnionArray::try_new(
4047 union_fields,
4048 type_ids,
4049 Some(offsets),
4050 vec![Arc::new(int_array) as ArrayRef, Arc::new(str_array)],
4051 )
4052 .unwrap();
4053
4054 let union_type = union_array.data_type().clone();
4055 let converter = RowConverter::new(vec![SortField::new(union_type)]).unwrap();
4056
4057 let rows = converter
4058 .convert_columns(&[Arc::new(union_array.clone())])
4059 .unwrap();
4060
4061 let back = converter.convert_rows(&rows).unwrap();
4063 let back_union = back[0].as_any().downcast_ref::<UnionArray>().unwrap();
4064
4065 assert_eq!(union_array.len(), back_union.len());
4066 for i in 0..union_array.len() {
4067 let expected_null = union_array.is_null(i);
4068 let actual_null = back_union.is_null(i);
4069 assert_eq!(expected_null, actual_null, "Null mismatch at index {i}");
4070 if !expected_null {
4071 assert_eq!(union_array.type_id(i), back_union.type_id(i));
4072 }
4073 }
4074 }
4075
4076 #[test]
4077 fn test_union_ordering() {
4078 let int_array = Int32Array::from(vec![100, 5, 20]);
4079 let str_array = StringArray::from(vec!["z", "a"]);
4080
4081 let type_ids = vec![0, 1, 0, 1, 0].into();
4083 let offsets = vec![0, 0, 1, 1, 2].into();
4084
4085 let union_fields = [
4086 (0, Arc::new(Field::new("int", DataType::Int32, false))),
4087 (1, Arc::new(Field::new("str", DataType::Utf8, false))),
4088 ]
4089 .into_iter()
4090 .collect();
4091
4092 let union_array = UnionArray::try_new(
4093 union_fields,
4094 type_ids,
4095 Some(offsets),
4096 vec![Arc::new(int_array) as ArrayRef, Arc::new(str_array)],
4097 )
4098 .unwrap();
4099
4100 let union_type = union_array.data_type().clone();
4101 let converter = RowConverter::new(vec![SortField::new(union_type)]).unwrap();
4102
4103 let rows = converter.convert_columns(&[Arc::new(union_array)]).unwrap();
4104
4105 assert!(rows.row(2) < rows.row(1));
4117
4118 assert!(rows.row(0) < rows.row(3));
4120
4121 assert!(rows.row(2) < rows.row(4));
4124 assert!(rows.row(4) < rows.row(0));
4126
4127 assert!(rows.row(3) < rows.row(1));
4130 }
4131
4132 #[test]
4133 fn test_row_converter_roundtrip_with_many_union_columns() {
4134 let fields1 = UnionFields::try_new(
4136 vec![0, 1],
4137 vec![
4138 Field::new("int", DataType::Int32, true),
4139 Field::new("string", DataType::Utf8, true),
4140 ],
4141 )
4142 .unwrap();
4143
4144 let int_array1 = Int32Array::from(vec![Some(67), None]);
4145 let string_array1 = StringArray::from(vec![None::<&str>, Some("hello")]);
4146 let type_ids1 = vec![0i8, 1].into();
4147
4148 let union_array1 = UnionArray::try_new(
4149 fields1.clone(),
4150 type_ids1,
4151 None,
4152 vec![
4153 Arc::new(int_array1) as ArrayRef,
4154 Arc::new(string_array1) as ArrayRef,
4155 ],
4156 )
4157 .unwrap();
4158
4159 let fields2 = UnionFields::try_new(
4161 vec![0, 1],
4162 vec![
4163 Field::new("int", DataType::Int32, true),
4164 Field::new("string", DataType::Utf8, true),
4165 ],
4166 )
4167 .unwrap();
4168
4169 let int_array2 = Int32Array::from(vec![Some(100), None]);
4170 let string_array2 = StringArray::from(vec![None::<&str>, Some("world")]);
4171 let type_ids2 = vec![0i8, 1].into();
4172
4173 let union_array2 = UnionArray::try_new(
4174 fields2.clone(),
4175 type_ids2,
4176 None,
4177 vec![
4178 Arc::new(int_array2) as ArrayRef,
4179 Arc::new(string_array2) as ArrayRef,
4180 ],
4181 )
4182 .unwrap();
4183
4184 let field1 = Field::new("col1", DataType::Union(fields1, UnionMode::Sparse), true);
4186 let field2 = Field::new("col2", DataType::Union(fields2, UnionMode::Sparse), true);
4187
4188 let sort_field1 = SortField::new(field1.data_type().clone());
4189 let sort_field2 = SortField::new(field2.data_type().clone());
4190
4191 let converter = RowConverter::new(vec![sort_field1, sort_field2]).unwrap();
4192
4193 let rows = converter
4194 .convert_columns(&[
4195 Arc::new(union_array1.clone()) as ArrayRef,
4196 Arc::new(union_array2.clone()) as ArrayRef,
4197 ])
4198 .unwrap();
4199
4200 let out = converter.convert_rows(&rows).unwrap();
4202
4203 let [col1, col2] = out.as_slice() else {
4204 panic!("expected 2 columns")
4205 };
4206
4207 let col1 = col1.as_any().downcast_ref::<UnionArray>().unwrap();
4208 let col2 = col2.as_any().downcast_ref::<UnionArray>().unwrap();
4209
4210 for (expected, got) in [union_array1, union_array2].iter().zip([col1, col2]) {
4211 assert_eq!(expected.len(), got.len());
4212 assert_eq!(expected.type_ids(), got.type_ids());
4213
4214 for i in 0..expected.len() {
4215 assert_eq!(expected.value(i).as_ref(), got.value(i).as_ref());
4216 }
4217 }
4218 }
4219
4220 #[test]
4221 fn test_row_converter_roundtrip_with_one_union_column() {
4222 let fields = UnionFields::try_new(
4223 vec![0, 1],
4224 vec![
4225 Field::new("int", DataType::Int32, true),
4226 Field::new("string", DataType::Utf8, true),
4227 ],
4228 )
4229 .unwrap();
4230
4231 let int_array = Int32Array::from(vec![Some(67), None]);
4232 let string_array = StringArray::from(vec![None::<&str>, Some("hello")]);
4233 let type_ids = vec![0i8, 1].into();
4234
4235 let union_array = UnionArray::try_new(
4236 fields.clone(),
4237 type_ids,
4238 None,
4239 vec![
4240 Arc::new(int_array) as ArrayRef,
4241 Arc::new(string_array) as ArrayRef,
4242 ],
4243 )
4244 .unwrap();
4245
4246 let field = Field::new("col", DataType::Union(fields, UnionMode::Sparse), true);
4247 let sort_field = SortField::new(field.data_type().clone());
4248 let converter = RowConverter::new(vec![sort_field]).unwrap();
4249
4250 let rows = converter
4251 .convert_columns(&[Arc::new(union_array.clone()) as ArrayRef])
4252 .unwrap();
4253
4254 let out = converter.convert_rows(&rows).unwrap();
4256
4257 let [col1] = out.as_slice() else {
4258 panic!("expected 1 column")
4259 };
4260
4261 let col = col1.as_any().downcast_ref::<UnionArray>().unwrap();
4262 assert_eq!(col.len(), union_array.len());
4263 assert_eq!(col.type_ids(), union_array.type_ids());
4264
4265 for i in 0..col.len() {
4266 assert_eq!(col.value(i).as_ref(), union_array.value(i).as_ref());
4267 }
4268 }
4269
4270 #[test]
4271 fn rows_size_should_count_for_capacity() {
4272 let row_converter = RowConverter::new(vec![SortField::new(DataType::UInt8)]).unwrap();
4273
4274 let empty_rows_size_with_preallocate_rows_and_data = {
4275 let rows = row_converter.empty_rows(1000, 1000);
4276
4277 rows.size()
4278 };
4279 let empty_rows_size_with_preallocate_rows = {
4280 let rows = row_converter.empty_rows(1000, 0);
4281
4282 rows.size()
4283 };
4284 let empty_rows_size_with_preallocate_data = {
4285 let rows = row_converter.empty_rows(0, 1000);
4286
4287 rows.size()
4288 };
4289 let empty_rows_size_without_preallocate = {
4290 let rows = row_converter.empty_rows(0, 0);
4291
4292 rows.size()
4293 };
4294
4295 assert!(
4296 empty_rows_size_with_preallocate_rows_and_data > empty_rows_size_with_preallocate_rows,
4297 "{empty_rows_size_with_preallocate_rows_and_data} should be larger than {empty_rows_size_with_preallocate_rows}"
4298 );
4299 assert!(
4300 empty_rows_size_with_preallocate_rows_and_data > empty_rows_size_with_preallocate_data,
4301 "{empty_rows_size_with_preallocate_rows_and_data} should be larger than {empty_rows_size_with_preallocate_data}"
4302 );
4303 assert!(
4304 empty_rows_size_with_preallocate_rows > empty_rows_size_without_preallocate,
4305 "{empty_rows_size_with_preallocate_rows} should be larger than {empty_rows_size_without_preallocate}"
4306 );
4307 assert!(
4308 empty_rows_size_with_preallocate_data > empty_rows_size_without_preallocate,
4309 "{empty_rows_size_with_preallocate_data} should be larger than {empty_rows_size_without_preallocate}"
4310 );
4311 }
4312
4313 #[test]
4314 fn reserve_should_increase_capacity_to_the_requested_size() {
4315 let row_converter = RowConverter::new(vec![SortField::new(DataType::UInt8)]).unwrap();
4316 let mut empty_rows = row_converter.empty_rows(0, 0);
4317 empty_rows.reserve(50, 50);
4318 let before_size = empty_rows.size();
4319 empty_rows.reserve(50, 50);
4320 assert_eq!(
4321 empty_rows.size(),
4322 before_size,
4323 "Size should not change when reserving already reserved space"
4324 );
4325 empty_rows.reserve(10, 20);
4326 assert_eq!(
4327 empty_rows.size(),
4328 before_size,
4329 "Size should not change when already have space for the expected reserved data"
4330 );
4331
4332 empty_rows.reserve(100, 20);
4333 assert!(
4334 empty_rows.size() > before_size,
4335 "Size should increase when reserving more space than previously reserved"
4336 );
4337
4338 let before_size = empty_rows.size();
4339
4340 empty_rows.reserve(20, 100);
4341 assert!(
4342 empty_rows.size() > before_size,
4343 "Size should increase when reserving more space than previously reserved"
4344 );
4345 }
4346}