1#![doc(
157 html_logo_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_white-bg.svg",
158 html_favicon_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_transparent-bg.svg"
159)]
160#![cfg_attr(docsrs, feature(doc_cfg))]
161#![warn(missing_docs)]
162use std::cmp::Ordering;
163use std::hash::{Hash, Hasher};
164use std::iter::Map;
165use std::slice::Windows;
166use std::sync::Arc;
167
168use arrow_array::cast::*;
169use arrow_array::types::{ArrowDictionaryKeyType, ByteArrayType, ByteViewType};
170use arrow_array::*;
171use arrow_buffer::{ArrowNativeType, Buffer, OffsetBuffer, ScalarBuffer};
172use arrow_data::{ArrayData, ArrayDataBuilder};
173use arrow_schema::*;
174use variable::{decode_binary_view, decode_string_view};
175
176use crate::fixed::{decode_bool, decode_fixed_size_binary, decode_primitive};
177use crate::list::{compute_lengths_fixed_size_list, encode_fixed_size_list};
178use crate::variable::{decode_binary, decode_string};
179use arrow_array::types::{Int16Type, Int32Type, Int64Type};
180
181mod fixed;
182mod list;
183mod run;
184mod variable;
185
186#[derive(Debug)]
532pub struct RowConverter {
533 fields: Arc<[SortField]>,
534 codecs: Vec<Codec>,
536}
537
538#[derive(Debug)]
539enum Codec {
540 Stateless,
542 Dictionary(RowConverter, OwnedRow),
545 Struct(RowConverter, OwnedRow),
548 List(RowConverter),
550 RunEndEncoded(RowConverter),
552 Union(Vec<RowConverter>, Vec<i8>, Vec<OwnedRow>),
555}
556
557fn compute_list_view_bounds<O: OffsetSizeTrait>(array: &GenericListViewArray<O>) -> (usize, usize) {
560 if array.is_empty() {
561 return (0, 0);
562 }
563
564 let offsets = array.value_offsets();
565 let sizes = array.value_sizes();
566 let values_len = array.values().len();
567
568 let mut min_offset = usize::MAX;
569 let mut max_end = 0usize;
570
571 for i in 0..array.len() {
572 let offset = offsets[i].as_usize();
573 let size = sizes[i].as_usize();
574 let end = offset + size;
575
576 if size > 0 {
577 min_offset = min_offset.min(offset);
578 max_end = max_end.max(end);
579 }
580
581 if min_offset == 0 && max_end == values_len {
585 break;
586 }
587 }
588
589 if min_offset == usize::MAX {
590 (0, 0)
592 } else {
593 (min_offset, max_end)
594 }
595}
596
597impl Codec {
598 fn new(sort_field: &SortField) -> Result<Self, ArrowError> {
599 match &sort_field.data_type {
600 DataType::Dictionary(_, values) => {
601 let sort_field =
602 SortField::new_with_options(values.as_ref().clone(), sort_field.options);
603
604 let converter = RowConverter::new(vec![sort_field])?;
605 let null_array = new_null_array(values.as_ref(), 1);
606 let nulls = converter.convert_columns(&[null_array])?;
607
608 let owned = OwnedRow {
609 data: nulls.buffer.into(),
610 config: nulls.config,
611 };
612 Ok(Self::Dictionary(converter, owned))
613 }
614 DataType::RunEndEncoded(_, values) => {
615 let options = SortOptions {
617 descending: false,
618 nulls_first: sort_field.options.nulls_first != sort_field.options.descending,
619 };
620
621 let field = SortField::new_with_options(values.data_type().clone(), options);
622 let converter = RowConverter::new(vec![field])?;
623 Ok(Self::RunEndEncoded(converter))
624 }
625 d if !d.is_nested() => Ok(Self::Stateless),
626 DataType::List(f)
627 | DataType::LargeList(f)
628 | DataType::ListView(f)
629 | DataType::LargeListView(f) => {
630 let options = SortOptions {
634 descending: false,
635 nulls_first: sort_field.options.nulls_first != sort_field.options.descending,
636 };
637
638 let field = SortField::new_with_options(f.data_type().clone(), options);
639 let converter = RowConverter::new(vec![field])?;
640 Ok(Self::List(converter))
641 }
642 DataType::FixedSizeList(f, _) => {
643 let field = SortField::new_with_options(f.data_type().clone(), sort_field.options);
644 let converter = RowConverter::new(vec![field])?;
645 Ok(Self::List(converter))
646 }
647 DataType::Struct(f) => {
648 let sort_fields = f
649 .iter()
650 .map(|x| SortField::new_with_options(x.data_type().clone(), sort_field.options))
651 .collect();
652
653 let converter = RowConverter::new(sort_fields)?;
654 let nulls: Vec<_> = f.iter().map(|x| new_null_array(x.data_type(), 1)).collect();
655
656 let nulls = converter.convert_columns(&nulls)?;
657 let owned = OwnedRow {
658 data: nulls.buffer.into(),
659 config: nulls.config,
660 };
661
662 Ok(Self::Struct(converter, owned))
663 }
664 DataType::Union(fields, _mode) => {
665 let options = SortOptions {
668 descending: false,
669 nulls_first: sort_field.options.nulls_first != sort_field.options.descending,
670 };
671
672 let mut converters = Vec::with_capacity(fields.len());
673 let mut type_ids = Vec::with_capacity(fields.len());
674 let mut null_rows = Vec::with_capacity(fields.len());
675
676 for (type_id, field) in fields.iter() {
677 let sort_field =
678 SortField::new_with_options(field.data_type().clone(), options);
679 let converter = RowConverter::new(vec![sort_field])?;
680
681 let null_array = new_null_array(field.data_type(), 1);
682 let nulls = converter.convert_columns(&[null_array])?;
683 let owned = OwnedRow {
684 data: nulls.buffer.into(),
685 config: nulls.config,
686 };
687
688 converters.push(converter);
689 type_ids.push(type_id);
690 null_rows.push(owned);
691 }
692
693 Ok(Self::Union(converters, type_ids, null_rows))
694 }
695 _ => Err(ArrowError::NotYetImplemented(format!(
696 "not yet implemented: {:?}",
697 sort_field.data_type
698 ))),
699 }
700 }
701
702 fn encoder(&self, array: &dyn Array) -> Result<Encoder<'_>, ArrowError> {
703 match self {
704 Codec::Stateless => Ok(Encoder::Stateless),
705 Codec::Dictionary(converter, nulls) => {
706 let values = array.as_any_dictionary().values().clone();
707 let rows = converter.convert_columns(&[values])?;
708 Ok(Encoder::Dictionary(rows, nulls.row()))
709 }
710 Codec::Struct(converter, null) => {
711 let v = as_struct_array(array);
712 let rows = converter.convert_columns(v.columns())?;
713 Ok(Encoder::Struct(rows, null.row()))
714 }
715 Codec::List(converter) => {
716 let values = match array.data_type() {
717 DataType::List(_) => {
718 let list_array = as_list_array(array);
719 let first_offset = list_array.offsets()[0] as usize;
720 let last_offset =
721 list_array.offsets()[list_array.offsets().len() - 1] as usize;
722
723 list_array
726 .values()
727 .slice(first_offset, last_offset - first_offset)
728 }
729 DataType::LargeList(_) => {
730 let list_array = as_large_list_array(array);
731
732 let first_offset = list_array.offsets()[0] as usize;
733 let last_offset =
734 list_array.offsets()[list_array.offsets().len() - 1] as usize;
735
736 list_array
739 .values()
740 .slice(first_offset, last_offset - first_offset)
741 }
742 DataType::ListView(_) => {
743 let list_view_array = array.as_list_view::<i32>();
744 let (min_offset, max_end) = compute_list_view_bounds(list_view_array);
745 list_view_array
746 .values()
747 .slice(min_offset, max_end - min_offset)
748 }
749 DataType::LargeListView(_) => {
750 let list_view_array = array.as_list_view::<i64>();
751 let (min_offset, max_end) = compute_list_view_bounds(list_view_array);
752 list_view_array
753 .values()
754 .slice(min_offset, max_end - min_offset)
755 }
756 DataType::FixedSizeList(_, _) => {
757 as_fixed_size_list_array(array).values().clone()
758 }
759 _ => unreachable!(),
760 };
761 let rows = converter.convert_columns(&[values])?;
762 Ok(Encoder::List(rows))
763 }
764 Codec::RunEndEncoded(converter) => {
765 let values = match array.data_type() {
766 DataType::RunEndEncoded(r, _) => match r.data_type() {
767 DataType::Int16 => array.as_run::<Int16Type>().values_slice(),
768 DataType::Int32 => array.as_run::<Int32Type>().values_slice(),
769 DataType::Int64 => array.as_run::<Int64Type>().values_slice(),
770 _ => unreachable!("Unsupported run end index type: {r:?}"),
771 },
772 _ => unreachable!(),
773 };
774 let rows = converter.convert_columns(std::slice::from_ref(&values))?;
775 Ok(Encoder::RunEndEncoded(rows))
776 }
777 Codec::Union(converters, field_to_type_ids, _) => {
778 let union_array = array
779 .as_any()
780 .downcast_ref::<UnionArray>()
781 .expect("expected Union array");
782
783 let type_ids = union_array.type_ids().clone();
784 let offsets = union_array.offsets().cloned();
785
786 let mut child_rows = Vec::with_capacity(converters.len());
787 for (field_idx, converter) in converters.iter().enumerate() {
788 let type_id = field_to_type_ids[field_idx];
789 let child_array = union_array.child(type_id);
790 let rows = converter.convert_columns(std::slice::from_ref(child_array))?;
791 child_rows.push(rows);
792 }
793
794 Ok(Encoder::Union {
795 child_rows,
796 field_to_type_ids: field_to_type_ids.clone(),
797 type_ids,
798 offsets,
799 })
800 }
801 }
802 }
803
804 fn size(&self) -> usize {
805 match self {
806 Codec::Stateless => 0,
807 Codec::Dictionary(converter, nulls) => converter.size() + nulls.data.len(),
808 Codec::Struct(converter, nulls) => converter.size() + nulls.data.len(),
809 Codec::List(converter) => converter.size(),
810 Codec::RunEndEncoded(converter) => converter.size(),
811 Codec::Union(converters, _, null_rows) => {
812 converters.iter().map(|c| c.size()).sum::<usize>()
813 + null_rows.iter().map(|n| n.data.len()).sum::<usize>()
814 }
815 }
816 }
817}
818
819#[derive(Debug)]
820enum Encoder<'a> {
821 Stateless,
823 Dictionary(Rows, Row<'a>),
825 Struct(Rows, Row<'a>),
831 List(Rows),
833 RunEndEncoded(Rows),
835 Union {
837 child_rows: Vec<Rows>,
838 field_to_type_ids: Vec<i8>,
839 type_ids: ScalarBuffer<i8>,
840 offsets: Option<ScalarBuffer<i32>>,
841 },
842}
843
844#[derive(Debug, Clone, PartialEq, Eq)]
846pub struct SortField {
847 options: SortOptions,
849 data_type: DataType,
851}
852
853impl SortField {
854 pub fn new(data_type: DataType) -> Self {
856 Self::new_with_options(data_type, Default::default())
857 }
858
859 pub fn new_with_options(data_type: DataType, options: SortOptions) -> Self {
861 Self { options, data_type }
862 }
863
864 pub fn size(&self) -> usize {
868 self.data_type.size() + std::mem::size_of::<Self>() - std::mem::size_of::<DataType>()
869 }
870}
871
872impl RowConverter {
873 pub fn new(fields: Vec<SortField>) -> Result<Self, ArrowError> {
875 if !Self::supports_fields(&fields) {
876 return Err(ArrowError::NotYetImplemented(format!(
877 "Row format support not yet implemented for: {fields:?}"
878 )));
879 }
880
881 let codecs = fields.iter().map(Codec::new).collect::<Result<_, _>>()?;
882 Ok(Self {
883 fields: fields.into(),
884 codecs,
885 })
886 }
887
888 pub fn supports_fields(fields: &[SortField]) -> bool {
890 fields.iter().all(|x| Self::supports_datatype(&x.data_type))
891 }
892
893 fn supports_datatype(d: &DataType) -> bool {
894 match d {
895 _ if !d.is_nested() => true,
896 DataType::List(f)
897 | DataType::LargeList(f)
898 | DataType::ListView(f)
899 | DataType::LargeListView(f)
900 | DataType::FixedSizeList(f, _) => Self::supports_datatype(f.data_type()),
901 DataType::Struct(f) => f.iter().all(|x| Self::supports_datatype(x.data_type())),
902 DataType::RunEndEncoded(_, values) => Self::supports_datatype(values.data_type()),
903 DataType::Union(fs, _mode) => fs
904 .iter()
905 .all(|(_, f)| Self::supports_datatype(f.data_type())),
906 _ => false,
907 }
908 }
909
910 pub fn convert_columns(&self, columns: &[ArrayRef]) -> Result<Rows, ArrowError> {
920 let num_rows = columns.first().map(|x| x.len()).unwrap_or(0);
921 let mut rows = self.empty_rows(num_rows, 0);
922 self.append(&mut rows, columns)?;
923 Ok(rows)
924 }
925
926 pub fn append(&self, rows: &mut Rows, columns: &[ArrayRef]) -> Result<(), ArrowError> {
957 assert!(
958 Arc::ptr_eq(&rows.config.fields, &self.fields),
959 "rows were not produced by this RowConverter"
960 );
961
962 if columns.len() != self.fields.len() {
963 return Err(ArrowError::InvalidArgumentError(format!(
964 "Incorrect number of arrays provided to RowConverter, expected {} got {}",
965 self.fields.len(),
966 columns.len()
967 )));
968 }
969 for colum in columns.iter().skip(1) {
970 if colum.len() != columns[0].len() {
971 return Err(ArrowError::InvalidArgumentError(format!(
972 "RowConverter columns must all have the same length, expected {} got {}",
973 columns[0].len(),
974 colum.len()
975 )));
976 }
977 }
978
979 let encoders = columns
980 .iter()
981 .zip(&self.codecs)
982 .zip(self.fields.iter())
983 .map(|((column, codec), field)| {
984 if !column.data_type().equals_datatype(&field.data_type) {
985 return Err(ArrowError::InvalidArgumentError(format!(
986 "RowConverter column schema mismatch, expected {} got {}",
987 field.data_type,
988 column.data_type()
989 )));
990 }
991 codec.encoder(column.as_ref())
992 })
993 .collect::<Result<Vec<_>, _>>()?;
994
995 let write_offset = rows.num_rows();
996 let lengths = row_lengths(columns, &encoders);
997 let total = lengths.extend_offsets(rows.offsets[write_offset], &mut rows.offsets);
998 rows.buffer.resize(total, 0);
999
1000 for ((column, field), encoder) in columns.iter().zip(self.fields.iter()).zip(encoders) {
1001 encode_column(
1003 &mut rows.buffer,
1004 &mut rows.offsets[write_offset..],
1005 column.as_ref(),
1006 field.options,
1007 &encoder,
1008 )
1009 }
1010
1011 if cfg!(debug_assertions) {
1012 assert_eq!(*rows.offsets.last().unwrap(), rows.buffer.len());
1013 rows.offsets
1014 .windows(2)
1015 .for_each(|w| assert!(w[0] <= w[1], "offsets should be monotonic"));
1016 }
1017
1018 Ok(())
1019 }
1020
1021 pub fn convert_rows<'a, I>(&self, rows: I) -> Result<Vec<ArrayRef>, ArrowError>
1029 where
1030 I: IntoIterator<Item = Row<'a>>,
1031 {
1032 let mut validate_utf8 = false;
1033 let mut rows: Vec<_> = rows
1034 .into_iter()
1035 .map(|row| {
1036 assert!(
1037 Arc::ptr_eq(&row.config.fields, &self.fields),
1038 "rows were not produced by this RowConverter"
1039 );
1040 validate_utf8 |= row.config.validate_utf8;
1041 row.data
1042 })
1043 .collect();
1044
1045 let result = unsafe { self.convert_raw(&mut rows, validate_utf8) }?;
1049
1050 if cfg!(debug_assertions) {
1051 for (i, row) in rows.iter().enumerate() {
1052 if !row.is_empty() {
1053 return Err(ArrowError::InvalidArgumentError(format!(
1054 "Codecs {codecs:?} did not consume all bytes for row {i}, remaining bytes: {row:?}",
1055 codecs = &self.codecs
1056 )));
1057 }
1058 }
1059 }
1060
1061 Ok(result)
1062 }
1063
1064 pub fn empty_rows(&self, row_capacity: usize, data_capacity: usize) -> Rows {
1093 let mut offsets = Vec::with_capacity(row_capacity.saturating_add(1));
1094 offsets.push(0);
1095
1096 Rows {
1097 offsets,
1098 buffer: Vec::with_capacity(data_capacity),
1099 config: RowConfig {
1100 fields: self.fields.clone(),
1101 validate_utf8: false,
1102 },
1103 }
1104 }
1105
1106 pub fn from_binary(&self, array: BinaryArray) -> Rows {
1133 assert_eq!(
1134 array.null_count(),
1135 0,
1136 "can't construct Rows instance from array with nulls"
1137 );
1138 let (offsets, values, _) = array.into_parts();
1139 let offsets = offsets.iter().map(|&i| i.as_usize()).collect();
1140 let buffer = values.into_vec().unwrap_or_else(|values| values.to_vec());
1142 Rows {
1143 buffer,
1144 offsets,
1145 config: RowConfig {
1146 fields: Arc::clone(&self.fields),
1147 validate_utf8: true,
1148 },
1149 }
1150 }
1151
1152 unsafe fn convert_raw(
1158 &self,
1159 rows: &mut [&[u8]],
1160 validate_utf8: bool,
1161 ) -> Result<Vec<ArrayRef>, ArrowError> {
1162 self.fields
1163 .iter()
1164 .zip(&self.codecs)
1165 .map(|(field, codec)| unsafe { decode_column(field, rows, codec, validate_utf8) })
1166 .collect()
1167 }
1168
1169 pub fn parser(&self) -> RowParser {
1171 RowParser::new(Arc::clone(&self.fields))
1172 }
1173
1174 pub fn size(&self) -> usize {
1178 std::mem::size_of::<Self>()
1179 + self.fields.iter().map(|x| x.size()).sum::<usize>()
1180 + self.codecs.capacity() * std::mem::size_of::<Codec>()
1181 + self.codecs.iter().map(Codec::size).sum::<usize>()
1182 }
1183}
1184
1185#[derive(Debug)]
1187pub struct RowParser {
1188 config: RowConfig,
1189}
1190
1191impl RowParser {
1192 fn new(fields: Arc<[SortField]>) -> Self {
1193 Self {
1194 config: RowConfig {
1195 fields,
1196 validate_utf8: true,
1197 },
1198 }
1199 }
1200
1201 pub fn parse<'a>(&'a self, bytes: &'a [u8]) -> Row<'a> {
1206 Row {
1207 data: bytes,
1208 config: &self.config,
1209 }
1210 }
1211}
1212
1213#[derive(Debug, Clone)]
1215struct RowConfig {
1216 fields: Arc<[SortField]>,
1218 validate_utf8: bool,
1220}
1221
1222#[derive(Debug)]
1226pub struct Rows {
1227 buffer: Vec<u8>,
1229 offsets: Vec<usize>,
1231 config: RowConfig,
1233}
1234
1235pub type RowLengthIter<'a> = Map<Windows<'a, usize>, fn(&'a [usize]) -> usize>;
1237
1238impl Rows {
1239 pub fn push(&mut self, row: Row<'_>) {
1241 assert!(
1242 Arc::ptr_eq(&row.config.fields, &self.config.fields),
1243 "row was not produced by this RowConverter"
1244 );
1245 self.config.validate_utf8 |= row.config.validate_utf8;
1246 self.buffer.extend_from_slice(row.data);
1247 self.offsets.push(self.buffer.len())
1248 }
1249
1250 pub fn reserve(&mut self, row_capacity: usize, data_capacity: usize) {
1252 self.buffer.reserve(data_capacity);
1253 self.offsets.reserve(row_capacity);
1254 }
1255
1256 pub fn row(&self, row: usize) -> Row<'_> {
1258 assert!(row + 1 < self.offsets.len());
1259 unsafe { self.row_unchecked(row) }
1260 }
1261
1262 pub unsafe fn row_unchecked(&self, index: usize) -> Row<'_> {
1267 let end = unsafe { self.offsets.get_unchecked(index + 1) };
1268 let start = unsafe { self.offsets.get_unchecked(index) };
1269 let data = unsafe { self.buffer.get_unchecked(*start..*end) };
1270 Row {
1271 data,
1272 config: &self.config,
1273 }
1274 }
1275
1276 pub fn row_len(&self, row: usize) -> usize {
1279 assert!(row + 1 < self.offsets.len());
1280
1281 self.offsets[row + 1] - self.offsets[row]
1282 }
1283
1284 pub fn lengths(&self) -> RowLengthIter<'_> {
1286 self.offsets.windows(2).map(|w| w[1] - w[0])
1287 }
1288
1289 pub fn clear(&mut self) {
1291 self.offsets.truncate(1);
1292 self.buffer.clear();
1293 }
1294
1295 pub fn num_rows(&self) -> usize {
1297 self.offsets.len() - 1
1298 }
1299
1300 pub fn iter(&self) -> RowsIter<'_> {
1302 self.into_iter()
1303 }
1304
1305 pub fn size(&self) -> usize {
1309 std::mem::size_of::<Self>()
1311 + self.buffer.capacity()
1312 + self.offsets.capacity() * std::mem::size_of::<usize>()
1313 }
1314
1315 pub fn try_into_binary(self) -> Result<BinaryArray, ArrowError> {
1345 if self.buffer.len() > i32::MAX as usize {
1346 return Err(ArrowError::InvalidArgumentError(format!(
1347 "{}-byte rows buffer too long to convert into a i32-indexed BinaryArray",
1348 self.buffer.len()
1349 )));
1350 }
1351 let offsets_scalar = ScalarBuffer::from_iter(self.offsets.into_iter().map(i32::usize_as));
1353 let array = unsafe {
1355 BinaryArray::new_unchecked(
1356 OffsetBuffer::new_unchecked(offsets_scalar),
1357 Buffer::from_vec(self.buffer),
1358 None,
1359 )
1360 };
1361 Ok(array)
1362 }
1363}
1364
1365impl<'a> IntoIterator for &'a Rows {
1366 type Item = Row<'a>;
1367 type IntoIter = RowsIter<'a>;
1368
1369 fn into_iter(self) -> Self::IntoIter {
1370 RowsIter {
1371 rows: self,
1372 start: 0,
1373 end: self.num_rows(),
1374 }
1375 }
1376}
1377
1378#[derive(Debug)]
1380pub struct RowsIter<'a> {
1381 rows: &'a Rows,
1382 start: usize,
1383 end: usize,
1384}
1385
1386impl<'a> Iterator for RowsIter<'a> {
1387 type Item = Row<'a>;
1388
1389 fn next(&mut self) -> Option<Self::Item> {
1390 if self.end == self.start {
1391 return None;
1392 }
1393
1394 let row = unsafe { self.rows.row_unchecked(self.start) };
1396 self.start += 1;
1397 Some(row)
1398 }
1399
1400 fn size_hint(&self) -> (usize, Option<usize>) {
1401 let len = self.len();
1402 (len, Some(len))
1403 }
1404}
1405
1406impl ExactSizeIterator for RowsIter<'_> {
1407 fn len(&self) -> usize {
1408 self.end - self.start
1409 }
1410}
1411
1412impl DoubleEndedIterator for RowsIter<'_> {
1413 fn next_back(&mut self) -> Option<Self::Item> {
1414 if self.end == self.start {
1415 return None;
1416 }
1417 let row = unsafe { self.rows.row_unchecked(self.end) };
1419 self.end -= 1;
1420 Some(row)
1421 }
1422}
1423
1424#[derive(Debug, Copy, Clone)]
1433pub struct Row<'a> {
1434 data: &'a [u8],
1435 config: &'a RowConfig,
1436}
1437
1438impl<'a> Row<'a> {
1439 pub fn owned(&self) -> OwnedRow {
1441 OwnedRow {
1442 data: self.data.into(),
1443 config: self.config.clone(),
1444 }
1445 }
1446
1447 pub fn data(&self) -> &'a [u8] {
1449 self.data
1450 }
1451}
1452
1453impl PartialEq for Row<'_> {
1456 #[inline]
1457 fn eq(&self, other: &Self) -> bool {
1458 self.data.eq(other.data)
1459 }
1460}
1461
1462impl Eq for Row<'_> {}
1463
1464impl PartialOrd for Row<'_> {
1465 #[inline]
1466 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1467 Some(self.cmp(other))
1468 }
1469}
1470
1471impl Ord for Row<'_> {
1472 #[inline]
1473 fn cmp(&self, other: &Self) -> Ordering {
1474 self.data.cmp(other.data)
1475 }
1476}
1477
1478impl Hash for Row<'_> {
1479 #[inline]
1480 fn hash<H: Hasher>(&self, state: &mut H) {
1481 self.data.hash(state)
1482 }
1483}
1484
1485impl AsRef<[u8]> for Row<'_> {
1486 #[inline]
1487 fn as_ref(&self) -> &[u8] {
1488 self.data
1489 }
1490}
1491
1492#[derive(Debug, Clone)]
1496pub struct OwnedRow {
1497 data: Box<[u8]>,
1498 config: RowConfig,
1499}
1500
1501impl OwnedRow {
1502 pub fn row(&self) -> Row<'_> {
1506 Row {
1507 data: &self.data,
1508 config: &self.config,
1509 }
1510 }
1511}
1512
1513impl PartialEq for OwnedRow {
1516 #[inline]
1517 fn eq(&self, other: &Self) -> bool {
1518 self.row().eq(&other.row())
1519 }
1520}
1521
1522impl Eq for OwnedRow {}
1523
1524impl PartialOrd for OwnedRow {
1525 #[inline]
1526 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1527 Some(self.cmp(other))
1528 }
1529}
1530
1531impl Ord for OwnedRow {
1532 #[inline]
1533 fn cmp(&self, other: &Self) -> Ordering {
1534 self.row().cmp(&other.row())
1535 }
1536}
1537
1538impl Hash for OwnedRow {
1539 #[inline]
1540 fn hash<H: Hasher>(&self, state: &mut H) {
1541 self.row().hash(state)
1542 }
1543}
1544
1545impl AsRef<[u8]> for OwnedRow {
1546 #[inline]
1547 fn as_ref(&self) -> &[u8] {
1548 &self.data
1549 }
1550}
1551
1552#[inline]
1554fn null_sentinel(options: SortOptions) -> u8 {
1555 match options.nulls_first {
1556 true => 0,
1557 false => 0xFF,
1558 }
1559}
1560
1561enum LengthTracker {
1563 Fixed { length: usize, num_rows: usize },
1565 Variable {
1567 fixed_length: usize,
1568 lengths: Vec<usize>,
1569 },
1570}
1571
1572impl LengthTracker {
1573 fn new(num_rows: usize) -> Self {
1574 Self::Fixed {
1575 length: 0,
1576 num_rows,
1577 }
1578 }
1579
1580 fn push_fixed(&mut self, new_length: usize) {
1582 match self {
1583 LengthTracker::Fixed { length, .. } => *length += new_length,
1584 LengthTracker::Variable { fixed_length, .. } => *fixed_length += new_length,
1585 }
1586 }
1587
1588 fn push_variable(&mut self, new_lengths: impl ExactSizeIterator<Item = usize>) {
1590 match self {
1591 LengthTracker::Fixed { length, .. } => {
1592 *self = LengthTracker::Variable {
1593 fixed_length: *length,
1594 lengths: new_lengths.collect(),
1595 }
1596 }
1597 LengthTracker::Variable { lengths, .. } => {
1598 assert_eq!(lengths.len(), new_lengths.len());
1599 lengths
1600 .iter_mut()
1601 .zip(new_lengths)
1602 .for_each(|(length, new_length)| *length += new_length);
1603 }
1604 }
1605 }
1606
1607 fn materialized(&mut self) -> &mut [usize] {
1609 if let LengthTracker::Fixed { length, num_rows } = *self {
1610 *self = LengthTracker::Variable {
1611 fixed_length: length,
1612 lengths: vec![0; num_rows],
1613 };
1614 }
1615
1616 match self {
1617 LengthTracker::Variable { lengths, .. } => lengths,
1618 LengthTracker::Fixed { .. } => unreachable!(),
1619 }
1620 }
1621
1622 fn extend_offsets(&self, initial_offset: usize, offsets: &mut Vec<usize>) -> usize {
1640 match self {
1641 LengthTracker::Fixed { length, num_rows } => {
1642 offsets.extend((0..*num_rows).map(|i| initial_offset + i * length));
1643
1644 initial_offset + num_rows * length
1645 }
1646 LengthTracker::Variable {
1647 fixed_length,
1648 lengths,
1649 } => {
1650 let mut acc = initial_offset;
1651
1652 offsets.extend(lengths.iter().map(|length| {
1653 let current = acc;
1654 acc += length + fixed_length;
1655 current
1656 }));
1657
1658 acc
1659 }
1660 }
1661 }
1662}
1663
1664fn row_lengths(cols: &[ArrayRef], encoders: &[Encoder]) -> LengthTracker {
1666 use fixed::FixedLengthEncoding;
1667
1668 let num_rows = cols.first().map(|x| x.len()).unwrap_or(0);
1669 let mut tracker = LengthTracker::new(num_rows);
1670
1671 for (array, encoder) in cols.iter().zip(encoders) {
1672 match encoder {
1673 Encoder::Stateless => {
1674 downcast_primitive_array! {
1675 array => tracker.push_fixed(fixed::encoded_len(array)),
1676 DataType::Null => {},
1677 DataType::Boolean => tracker.push_fixed(bool::ENCODED_LEN),
1678 DataType::Binary => push_generic_byte_array_lengths(&mut tracker, as_generic_binary_array::<i32>(array)),
1679 DataType::LargeBinary => push_generic_byte_array_lengths(&mut tracker, as_generic_binary_array::<i64>(array)),
1680 DataType::BinaryView => push_byte_view_array_lengths(&mut tracker, array.as_binary_view()),
1681 DataType::Utf8 => push_generic_byte_array_lengths(&mut tracker, array.as_string::<i32>()),
1682 DataType::LargeUtf8 => push_generic_byte_array_lengths(&mut tracker, array.as_string::<i64>()),
1683 DataType::Utf8View => push_byte_view_array_lengths(&mut tracker, array.as_string_view()),
1684 DataType::FixedSizeBinary(len) => {
1685 let len = len.to_usize().unwrap();
1686 tracker.push_fixed(1 + len)
1687 }
1688 _ => unimplemented!("unsupported data type: {}", array.data_type()),
1689 }
1690 }
1691 Encoder::Dictionary(values, null) => {
1692 downcast_dictionary_array! {
1693 array => {
1694 tracker.push_variable(
1695 array.keys().iter().map(|v| match v {
1696 Some(k) => values.row_len(k.as_usize()),
1697 None => null.data.len(),
1698 })
1699 )
1700 }
1701 _ => unreachable!(),
1702 }
1703 }
1704 Encoder::Struct(rows, null) => {
1705 let array = as_struct_array(array);
1706 if rows.num_rows() > 0 {
1707 tracker.push_variable((0..array.len()).map(|idx| match array.is_valid(idx) {
1709 true => 1 + rows.row_len(idx),
1710 false => 1 + null.data.len(),
1711 }));
1712 } else {
1713 tracker.push_variable((0..array.len()).map(|idx| match array.is_valid(idx) {
1715 true => 1,
1716 false => 1 + null.data.len(),
1717 }));
1718 }
1719 }
1720 Encoder::List(rows) => match array.data_type() {
1721 DataType::List(_) => {
1722 list::compute_lengths(tracker.materialized(), rows, as_list_array(array))
1723 }
1724 DataType::LargeList(_) => {
1725 list::compute_lengths(tracker.materialized(), rows, as_large_list_array(array))
1726 }
1727 DataType::ListView(_) => {
1728 let list_view = array.as_list_view::<i32>();
1729 let (min_offset, _) = compute_list_view_bounds(list_view);
1730 list::compute_lengths_list_view(
1731 tracker.materialized(),
1732 rows,
1733 list_view,
1734 min_offset,
1735 )
1736 }
1737 DataType::LargeListView(_) => {
1738 let list_view = array.as_list_view::<i64>();
1739 let (min_offset, _) = compute_list_view_bounds(list_view);
1740 list::compute_lengths_list_view(
1741 tracker.materialized(),
1742 rows,
1743 list_view,
1744 min_offset,
1745 )
1746 }
1747 DataType::FixedSizeList(_, _) => compute_lengths_fixed_size_list(
1748 &mut tracker,
1749 rows,
1750 as_fixed_size_list_array(array),
1751 ),
1752 _ => unreachable!(),
1753 },
1754 Encoder::RunEndEncoded(rows) => match array.data_type() {
1755 DataType::RunEndEncoded(r, _) => match r.data_type() {
1756 DataType::Int16 => run::compute_lengths(
1757 tracker.materialized(),
1758 rows,
1759 array.as_run::<Int16Type>(),
1760 ),
1761 DataType::Int32 => run::compute_lengths(
1762 tracker.materialized(),
1763 rows,
1764 array.as_run::<Int32Type>(),
1765 ),
1766 DataType::Int64 => run::compute_lengths(
1767 tracker.materialized(),
1768 rows,
1769 array.as_run::<Int64Type>(),
1770 ),
1771 _ => unreachable!("Unsupported run end index type: {r:?}"),
1772 },
1773 _ => unreachable!(),
1774 },
1775 Encoder::Union {
1776 child_rows,
1777 field_to_type_ids,
1778 type_ids,
1779 offsets,
1780 } => {
1781 let union_array = array
1782 .as_any()
1783 .downcast_ref::<UnionArray>()
1784 .expect("expected UnionArray");
1785
1786 let mut type_id_to_field_idx = [0usize; 128];
1787 for (field_idx, &type_id) in field_to_type_ids.iter().enumerate() {
1788 type_id_to_field_idx[type_id as usize] = field_idx;
1789 }
1790
1791 let lengths = (0..union_array.len()).map(|i| {
1792 let type_id = type_ids[i];
1793 let field_idx = type_id_to_field_idx[type_id as usize];
1794 let child_row_i = offsets.as_ref().map(|o| o[i] as usize).unwrap_or(i);
1795 let child_row_len = child_rows[field_idx].row_len(child_row_i);
1796
1797 1 + child_row_len
1799 });
1800
1801 tracker.push_variable(lengths);
1802 }
1803 }
1804 }
1805
1806 tracker
1807}
1808
1809fn push_generic_byte_array_lengths<T: ByteArrayType>(
1811 tracker: &mut LengthTracker,
1812 array: &GenericByteArray<T>,
1813) {
1814 if let Some(nulls) = array.nulls().filter(|n| n.null_count() > 0) {
1815 tracker.push_variable(
1816 array
1817 .offsets()
1818 .lengths()
1819 .zip(nulls.iter())
1820 .map(|(length, is_valid)| if is_valid { Some(length) } else { None })
1821 .map(variable::padded_length),
1822 )
1823 } else {
1824 tracker.push_variable(
1825 array
1826 .offsets()
1827 .lengths()
1828 .map(variable::non_null_padded_length),
1829 )
1830 }
1831}
1832
1833fn push_byte_view_array_lengths<T: ByteViewType>(
1835 tracker: &mut LengthTracker,
1836 array: &GenericByteViewArray<T>,
1837) {
1838 if let Some(nulls) = array.nulls().filter(|n| n.null_count() > 0) {
1839 tracker.push_variable(
1840 array
1841 .lengths()
1842 .zip(nulls.iter())
1843 .map(|(length, is_valid)| {
1844 if is_valid {
1845 Some(length as usize)
1846 } else {
1847 None
1848 }
1849 })
1850 .map(variable::padded_length),
1851 )
1852 } else {
1853 tracker.push_variable(
1854 array
1855 .lengths()
1856 .map(|len| variable::padded_length(Some(len as usize))),
1857 )
1858 }
1859}
1860
1861fn encode_column(
1863 data: &mut [u8],
1864 offsets: &mut [usize],
1865 column: &dyn Array,
1866 opts: SortOptions,
1867 encoder: &Encoder<'_>,
1868) {
1869 match encoder {
1870 Encoder::Stateless => {
1871 downcast_primitive_array! {
1872 column => {
1873 if let Some(nulls) = column.nulls().filter(|n| n.null_count() > 0){
1874 fixed::encode(data, offsets, column.values(), nulls, opts)
1875 } else {
1876 fixed::encode_not_null(data, offsets, column.values(), opts)
1877 }
1878 }
1879 DataType::Null => {}
1880 DataType::Boolean => {
1881 if let Some(nulls) = column.nulls().filter(|n| n.null_count() > 0){
1882 fixed::encode_boolean(data, offsets, column.as_boolean().values(), nulls, opts)
1883 } else {
1884 fixed::encode_boolean_not_null(data, offsets, column.as_boolean().values(), opts)
1885 }
1886 }
1887 DataType::Binary => {
1888 variable::encode_generic_byte_array(data, offsets, as_generic_binary_array::<i32>(column), opts)
1889 }
1890 DataType::BinaryView => {
1891 variable::encode(data, offsets, column.as_binary_view().iter(), opts)
1892 }
1893 DataType::LargeBinary => {
1894 variable::encode_generic_byte_array(data, offsets, as_generic_binary_array::<i64>(column), opts)
1895 }
1896 DataType::Utf8 => variable::encode_generic_byte_array(
1897 data, offsets,
1898 column.as_string::<i32>(),
1899 opts,
1900 ),
1901 DataType::LargeUtf8 => variable::encode_generic_byte_array(
1902 data, offsets,
1903 column.as_string::<i64>(),
1904 opts,
1905 ),
1906 DataType::Utf8View => variable::encode(
1907 data, offsets,
1908 column.as_string_view().iter().map(|x| x.map(|x| x.as_bytes())),
1909 opts,
1910 ),
1911 DataType::FixedSizeBinary(_) => {
1912 let array = column.as_any().downcast_ref().unwrap();
1913 fixed::encode_fixed_size_binary(data, offsets, array, opts)
1914 }
1915 _ => unimplemented!("unsupported data type: {}", column.data_type()),
1916 }
1917 }
1918 Encoder::Dictionary(values, nulls) => {
1919 downcast_dictionary_array! {
1920 column => encode_dictionary_values(data, offsets, column, values, nulls),
1921 _ => unreachable!()
1922 }
1923 }
1924 Encoder::Struct(rows, null) => {
1925 fn struct_encode_helper<const NO_CHILD_FIELDS: bool>(
1926 array: &StructArray,
1927 offsets: &mut [usize],
1928 null_sentinel: u8,
1929 rows: &Rows,
1930 null: &Row<'_>,
1931 data: &mut [u8],
1932 ) {
1933 let empty_row = Row {
1934 data: &[],
1935 config: &rows.config,
1936 };
1937
1938 offsets
1939 .iter_mut()
1940 .skip(1)
1941 .enumerate()
1942 .for_each(|(idx, offset)| {
1943 let (row, sentinel) = match array.is_valid(idx) {
1944 true => (
1945 if NO_CHILD_FIELDS {
1946 empty_row
1947 } else {
1948 rows.row(idx)
1949 },
1950 0x01,
1951 ),
1952 false => (*null, null_sentinel),
1953 };
1954 let end_offset = *offset + 1 + row.as_ref().len();
1955 data[*offset] = sentinel;
1956 data[*offset + 1..end_offset].copy_from_slice(row.as_ref());
1957 *offset = end_offset;
1958 })
1959 }
1960
1961 let array = as_struct_array(column);
1962 let null_sentinel = null_sentinel(opts);
1963 if rows.num_rows() == 0 {
1964 struct_encode_helper::<true>(array, offsets, null_sentinel, rows, null, data);
1966 } else {
1967 struct_encode_helper::<false>(array, offsets, null_sentinel, rows, null, data);
1968 }
1969 }
1970 Encoder::List(rows) => match column.data_type() {
1971 DataType::List(_) => list::encode(data, offsets, rows, opts, as_list_array(column)),
1972 DataType::LargeList(_) => {
1973 list::encode(data, offsets, rows, opts, as_large_list_array(column))
1974 }
1975 DataType::ListView(_) => {
1976 let list_view = column.as_list_view::<i32>();
1977 let (min_offset, _) = compute_list_view_bounds(list_view);
1978 list::encode_list_view(data, offsets, rows, opts, list_view, min_offset)
1979 }
1980 DataType::LargeListView(_) => {
1981 let list_view = column.as_list_view::<i64>();
1982 let (min_offset, _) = compute_list_view_bounds(list_view);
1983 list::encode_list_view(data, offsets, rows, opts, list_view, min_offset)
1984 }
1985 DataType::FixedSizeList(_, _) => {
1986 encode_fixed_size_list(data, offsets, rows, opts, as_fixed_size_list_array(column))
1987 }
1988 _ => unreachable!(),
1989 },
1990 Encoder::RunEndEncoded(rows) => match column.data_type() {
1991 DataType::RunEndEncoded(r, _) => match r.data_type() {
1992 DataType::Int16 => {
1993 run::encode(data, offsets, rows, opts, column.as_run::<Int16Type>())
1994 }
1995 DataType::Int32 => {
1996 run::encode(data, offsets, rows, opts, column.as_run::<Int32Type>())
1997 }
1998 DataType::Int64 => {
1999 run::encode(data, offsets, rows, opts, column.as_run::<Int64Type>())
2000 }
2001 _ => unreachable!("Unsupported run end index type: {r:?}"),
2002 },
2003 _ => unreachable!(),
2004 },
2005 Encoder::Union {
2006 child_rows,
2007 field_to_type_ids,
2008 type_ids,
2009 offsets: offsets_buf,
2010 } => {
2011 let mut type_id_to_field_idx = [0usize; 128];
2012 for (field_idx, &type_id) in field_to_type_ids.iter().enumerate() {
2013 type_id_to_field_idx[type_id as usize] = field_idx;
2014 }
2015
2016 offsets
2017 .iter_mut()
2018 .skip(1)
2019 .enumerate()
2020 .for_each(|(i, offset)| {
2021 let type_id = type_ids[i];
2022 let field_idx = type_id_to_field_idx[type_id as usize];
2023
2024 let child_row_idx = offsets_buf.as_ref().map(|o| o[i] as usize).unwrap_or(i);
2025 let child_row = child_rows[field_idx].row(child_row_idx);
2026 let child_bytes = child_row.as_ref();
2027
2028 let type_id_byte = if opts.descending {
2029 !(type_id as u8)
2030 } else {
2031 type_id as u8
2032 };
2033 data[*offset] = type_id_byte;
2034
2035 let child_start = *offset + 1;
2036 let child_end = child_start + child_bytes.len();
2037 data[child_start..child_end].copy_from_slice(child_bytes);
2038
2039 *offset = child_end;
2040 });
2041 }
2042 }
2043}
2044
2045pub fn encode_dictionary_values<K: ArrowDictionaryKeyType>(
2047 data: &mut [u8],
2048 offsets: &mut [usize],
2049 column: &DictionaryArray<K>,
2050 values: &Rows,
2051 null: &Row<'_>,
2052) {
2053 for (offset, k) in offsets.iter_mut().skip(1).zip(column.keys()) {
2054 let row = match k {
2055 Some(k) => values.row(k.as_usize()).data,
2056 None => null.data,
2057 };
2058 let end_offset = *offset + row.len();
2059 data[*offset..end_offset].copy_from_slice(row);
2060 *offset = end_offset;
2061 }
2062}
2063
2064macro_rules! decode_primitive_helper {
2065 ($t:ty, $rows:ident, $data_type:ident, $options:ident) => {
2066 Arc::new(decode_primitive::<$t>($rows, $data_type, $options))
2067 };
2068}
2069
2070unsafe fn decode_column(
2076 field: &SortField,
2077 rows: &mut [&[u8]],
2078 codec: &Codec,
2079 validate_utf8: bool,
2080) -> Result<ArrayRef, ArrowError> {
2081 let options = field.options;
2082
2083 let array: ArrayRef = match codec {
2084 Codec::Stateless => {
2085 let data_type = field.data_type.clone();
2086 downcast_primitive! {
2087 data_type => (decode_primitive_helper, rows, data_type, options),
2088 DataType::Null => Arc::new(NullArray::new(rows.len())),
2089 DataType::Boolean => Arc::new(decode_bool(rows, options)),
2090 DataType::Binary => Arc::new(decode_binary::<i32>(rows, options)),
2091 DataType::LargeBinary => Arc::new(decode_binary::<i64>(rows, options)),
2092 DataType::BinaryView => Arc::new(decode_binary_view(rows, options)),
2093 DataType::FixedSizeBinary(size) => Arc::new(decode_fixed_size_binary(rows, size, options)),
2094 DataType::Utf8 => Arc::new(unsafe{ decode_string::<i32>(rows, options, validate_utf8) }),
2095 DataType::LargeUtf8 => Arc::new(unsafe { decode_string::<i64>(rows, options, validate_utf8) }),
2096 DataType::Utf8View => Arc::new(unsafe { decode_string_view(rows, options, validate_utf8) }),
2097 _ => return Err(ArrowError::NotYetImplemented(format!("unsupported data type: {data_type}" )))
2098 }
2099 }
2100 Codec::Dictionary(converter, _) => {
2101 let cols = unsafe { converter.convert_raw(rows, validate_utf8) }?;
2102 cols.into_iter().next().unwrap()
2103 }
2104 Codec::Struct(converter, _) => {
2105 let (null_count, nulls) = fixed::decode_nulls(rows);
2106 rows.iter_mut().for_each(|row| *row = &row[1..]);
2107 let children = unsafe { converter.convert_raw(rows, validate_utf8) }?;
2108
2109 let child_data: Vec<ArrayData> = children.iter().map(|c| c.to_data()).collect();
2110 let corrected_fields: Vec<Field> = match &field.data_type {
2113 DataType::Struct(struct_fields) => struct_fields
2114 .iter()
2115 .zip(child_data.iter())
2116 .map(|(orig_field, child_array)| {
2117 orig_field
2118 .as_ref()
2119 .clone()
2120 .with_data_type(child_array.data_type().clone())
2121 })
2122 .collect(),
2123 _ => unreachable!("Only Struct types should be corrected here"),
2124 };
2125 let corrected_struct_type = DataType::Struct(corrected_fields.into());
2126 let builder = ArrayDataBuilder::new(corrected_struct_type)
2127 .len(rows.len())
2128 .null_count(null_count)
2129 .null_bit_buffer(Some(nulls))
2130 .child_data(child_data);
2131
2132 Arc::new(StructArray::from(unsafe { builder.build_unchecked() }))
2133 }
2134 Codec::List(converter) => match &field.data_type {
2135 DataType::List(_) => {
2136 Arc::new(unsafe { list::decode::<i32>(converter, rows, field, validate_utf8) }?)
2137 }
2138 DataType::LargeList(_) => {
2139 Arc::new(unsafe { list::decode::<i64>(converter, rows, field, validate_utf8) }?)
2140 }
2141 DataType::ListView(_) => Arc::new(unsafe {
2142 list::decode_list_view::<i32>(converter, rows, field, validate_utf8)
2143 }?),
2144 DataType::LargeListView(_) => Arc::new(unsafe {
2145 list::decode_list_view::<i64>(converter, rows, field, validate_utf8)
2146 }?),
2147 DataType::FixedSizeList(_, value_length) => Arc::new(unsafe {
2148 list::decode_fixed_size_list(
2149 converter,
2150 rows,
2151 field,
2152 validate_utf8,
2153 value_length.as_usize(),
2154 )
2155 }?),
2156 _ => unreachable!(),
2157 },
2158 Codec::RunEndEncoded(converter) => match &field.data_type {
2159 DataType::RunEndEncoded(run_ends, _) => match run_ends.data_type() {
2160 DataType::Int16 => Arc::new(unsafe {
2161 run::decode::<Int16Type>(converter, rows, field, validate_utf8)
2162 }?),
2163 DataType::Int32 => Arc::new(unsafe {
2164 run::decode::<Int32Type>(converter, rows, field, validate_utf8)
2165 }?),
2166 DataType::Int64 => Arc::new(unsafe {
2167 run::decode::<Int64Type>(converter, rows, field, validate_utf8)
2168 }?),
2169 _ => unreachable!(),
2170 },
2171 _ => unreachable!(),
2172 },
2173 Codec::Union(converters, field_to_type_ids, null_rows) => {
2174 let len = rows.len();
2175
2176 let DataType::Union(union_fields, mode) = &field.data_type else {
2177 unreachable!()
2178 };
2179
2180 let mut type_id_to_field_idx = [0usize; 128];
2181 for (field_idx, &type_id) in field_to_type_ids.iter().enumerate() {
2182 type_id_to_field_idx[type_id as usize] = field_idx;
2183 }
2184
2185 let mut type_ids = Vec::with_capacity(len);
2186 let mut rows_by_field: Vec<Vec<(usize, &[u8])>> = vec![Vec::new(); converters.len()];
2187
2188 for (idx, row) in rows.iter_mut().enumerate() {
2189 let type_id_byte = {
2190 let id = row[0];
2191 if options.descending { !id } else { id }
2192 };
2193
2194 let type_id = type_id_byte as i8;
2195 type_ids.push(type_id);
2196
2197 let field_idx = type_id_to_field_idx[type_id as usize];
2198
2199 let child_row = &row[1..];
2200 rows_by_field[field_idx].push((idx, child_row));
2201 }
2202
2203 let mut child_arrays: Vec<ArrayRef> = Vec::with_capacity(converters.len());
2204 let mut offsets = (*mode == UnionMode::Dense).then(|| Vec::with_capacity(len));
2205
2206 for (field_idx, converter) in converters.iter().enumerate() {
2207 let field_rows = &rows_by_field[field_idx];
2208
2209 match &mode {
2210 UnionMode::Dense => {
2211 if field_rows.is_empty() {
2212 let (_, field) = union_fields.iter().nth(field_idx).unwrap();
2213 child_arrays.push(arrow_array::new_empty_array(field.data_type()));
2214 continue;
2215 }
2216
2217 let mut child_data = field_rows
2218 .iter()
2219 .map(|(_, bytes)| *bytes)
2220 .collect::<Vec<_>>();
2221
2222 let child_array =
2223 unsafe { converter.convert_raw(&mut child_data, validate_utf8) }?;
2224
2225 for ((row_idx, original_bytes), remaining_bytes) in
2227 field_rows.iter().zip(child_data)
2228 {
2229 let consumed_length = 1 + original_bytes.len() - remaining_bytes.len();
2230 rows[*row_idx] = &rows[*row_idx][consumed_length..];
2231 }
2232
2233 child_arrays.push(child_array.into_iter().next().unwrap());
2234 }
2235 UnionMode::Sparse => {
2236 let mut sparse_data: Vec<&[u8]> = Vec::with_capacity(len);
2237 let mut field_row_iter = field_rows.iter().peekable();
2238 let null_row_bytes: &[u8] = &null_rows[field_idx].data;
2239
2240 for idx in 0..len {
2241 if let Some((next_idx, bytes)) = field_row_iter.peek() {
2242 if *next_idx == idx {
2243 sparse_data.push(*bytes);
2244
2245 field_row_iter.next();
2246 continue;
2247 }
2248 }
2249 sparse_data.push(null_row_bytes);
2250 }
2251
2252 let child_array =
2253 unsafe { converter.convert_raw(&mut sparse_data, validate_utf8) }?;
2254
2255 for (row_idx, child_row) in field_rows.iter() {
2257 let remaining_len = sparse_data[*row_idx].len();
2258 let consumed_length = 1 + child_row.len() - remaining_len;
2259 rows[*row_idx] = &rows[*row_idx][consumed_length..];
2260 }
2261
2262 child_arrays.push(child_array.into_iter().next().unwrap());
2263 }
2264 }
2265 }
2266
2267 if let Some(ref mut offsets_vec) = offsets {
2269 let mut count = vec![0i32; converters.len()];
2270 for type_id in &type_ids {
2271 let field_idx = *type_id as usize;
2272 offsets_vec.push(count[field_idx]);
2273
2274 count[field_idx] += 1;
2275 }
2276 }
2277
2278 let type_ids_buffer = ScalarBuffer::from(type_ids);
2279 let offsets_buffer = offsets.map(ScalarBuffer::from);
2280
2281 let union_array = UnionArray::try_new(
2282 union_fields.clone(),
2283 type_ids_buffer,
2284 offsets_buffer,
2285 child_arrays,
2286 )?;
2287
2288 Arc::new(union_array)
2291 }
2292 };
2293 Ok(array)
2294}
2295
2296#[cfg(test)]
2297mod tests {
2298 use arrow_array::builder::*;
2299 use arrow_array::types::*;
2300 use arrow_array::*;
2301 use arrow_buffer::{Buffer, OffsetBuffer};
2302 use arrow_buffer::{NullBuffer, i256};
2303 use arrow_cast::display::{ArrayFormatter, FormatOptions};
2304 use arrow_ord::sort::{LexicographicalComparator, SortColumn};
2305 use rand::distr::uniform::SampleUniform;
2306 use rand::distr::{Distribution, StandardUniform};
2307 use rand::prelude::StdRng;
2308 use rand::{Rng, RngCore, SeedableRng};
2309
2310 use super::*;
2311
2312 #[test]
2313 fn test_fixed_width() {
2314 let cols = [
2315 Arc::new(Int16Array::from_iter([
2316 Some(1),
2317 Some(2),
2318 None,
2319 Some(-5),
2320 Some(2),
2321 Some(2),
2322 Some(0),
2323 ])) as ArrayRef,
2324 Arc::new(Float32Array::from_iter([
2325 Some(1.3),
2326 Some(2.5),
2327 None,
2328 Some(4.),
2329 Some(0.1),
2330 Some(-4.),
2331 Some(-0.),
2332 ])) as ArrayRef,
2333 ];
2334
2335 let converter = RowConverter::new(vec![
2336 SortField::new(DataType::Int16),
2337 SortField::new(DataType::Float32),
2338 ])
2339 .unwrap();
2340 let rows = converter.convert_columns(&cols).unwrap();
2341
2342 assert_eq!(rows.offsets, &[0, 8, 16, 24, 32, 40, 48, 56]);
2343 assert_eq!(
2344 rows.buffer,
2345 &[
2346 1, 128, 1, 1, 191, 166, 102, 102, 1, 128, 2, 1, 192, 32, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 127, 251, 1, 192, 128, 0, 0, 1, 128, 2, 1, 189, 204, 204, 205, 1, 128, 2, 1, 63, 127, 255, 255, 1, 128, 0, 1, 127, 255, 255, 255 ]
2361 );
2362
2363 assert!(rows.row(3) < rows.row(6));
2364 assert!(rows.row(0) < rows.row(1));
2365 assert!(rows.row(3) < rows.row(0));
2366 assert!(rows.row(4) < rows.row(1));
2367 assert!(rows.row(5) < rows.row(4));
2368
2369 let back = converter.convert_rows(&rows).unwrap();
2370 for (expected, actual) in cols.iter().zip(&back) {
2371 assert_eq!(expected, actual);
2372 }
2373 }
2374
2375 #[test]
2376 fn test_decimal32() {
2377 let converter = RowConverter::new(vec![SortField::new(DataType::Decimal32(
2378 DECIMAL32_MAX_PRECISION,
2379 7,
2380 ))])
2381 .unwrap();
2382 let col = Arc::new(
2383 Decimal32Array::from_iter([
2384 None,
2385 Some(i32::MIN),
2386 Some(-13),
2387 Some(46_i32),
2388 Some(5456_i32),
2389 Some(i32::MAX),
2390 ])
2391 .with_precision_and_scale(9, 7)
2392 .unwrap(),
2393 ) as ArrayRef;
2394
2395 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2396 for i in 0..rows.num_rows() - 1 {
2397 assert!(rows.row(i) < rows.row(i + 1));
2398 }
2399
2400 let back = converter.convert_rows(&rows).unwrap();
2401 assert_eq!(back.len(), 1);
2402 assert_eq!(col.as_ref(), back[0].as_ref())
2403 }
2404
2405 #[test]
2406 fn test_decimal64() {
2407 let converter = RowConverter::new(vec![SortField::new(DataType::Decimal64(
2408 DECIMAL64_MAX_PRECISION,
2409 7,
2410 ))])
2411 .unwrap();
2412 let col = Arc::new(
2413 Decimal64Array::from_iter([
2414 None,
2415 Some(i64::MIN),
2416 Some(-13),
2417 Some(46_i64),
2418 Some(5456_i64),
2419 Some(i64::MAX),
2420 ])
2421 .with_precision_and_scale(18, 7)
2422 .unwrap(),
2423 ) as ArrayRef;
2424
2425 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2426 for i in 0..rows.num_rows() - 1 {
2427 assert!(rows.row(i) < rows.row(i + 1));
2428 }
2429
2430 let back = converter.convert_rows(&rows).unwrap();
2431 assert_eq!(back.len(), 1);
2432 assert_eq!(col.as_ref(), back[0].as_ref())
2433 }
2434
2435 #[test]
2436 fn test_decimal128() {
2437 let converter = RowConverter::new(vec![SortField::new(DataType::Decimal128(
2438 DECIMAL128_MAX_PRECISION,
2439 7,
2440 ))])
2441 .unwrap();
2442 let col = Arc::new(
2443 Decimal128Array::from_iter([
2444 None,
2445 Some(i128::MIN),
2446 Some(-13),
2447 Some(46_i128),
2448 Some(5456_i128),
2449 Some(i128::MAX),
2450 ])
2451 .with_precision_and_scale(38, 7)
2452 .unwrap(),
2453 ) as ArrayRef;
2454
2455 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2456 for i in 0..rows.num_rows() - 1 {
2457 assert!(rows.row(i) < rows.row(i + 1));
2458 }
2459
2460 let back = converter.convert_rows(&rows).unwrap();
2461 assert_eq!(back.len(), 1);
2462 assert_eq!(col.as_ref(), back[0].as_ref())
2463 }
2464
2465 #[test]
2466 fn test_decimal256() {
2467 let converter = RowConverter::new(vec![SortField::new(DataType::Decimal256(
2468 DECIMAL256_MAX_PRECISION,
2469 7,
2470 ))])
2471 .unwrap();
2472 let col = Arc::new(
2473 Decimal256Array::from_iter([
2474 None,
2475 Some(i256::MIN),
2476 Some(i256::from_parts(0, -1)),
2477 Some(i256::from_parts(u128::MAX, -1)),
2478 Some(i256::from_parts(u128::MAX, 0)),
2479 Some(i256::from_parts(0, 46_i128)),
2480 Some(i256::from_parts(5, 46_i128)),
2481 Some(i256::MAX),
2482 ])
2483 .with_precision_and_scale(DECIMAL256_MAX_PRECISION, 7)
2484 .unwrap(),
2485 ) as ArrayRef;
2486
2487 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2488 for i in 0..rows.num_rows() - 1 {
2489 assert!(rows.row(i) < rows.row(i + 1));
2490 }
2491
2492 let back = converter.convert_rows(&rows).unwrap();
2493 assert_eq!(back.len(), 1);
2494 assert_eq!(col.as_ref(), back[0].as_ref())
2495 }
2496
2497 #[test]
2498 fn test_bool() {
2499 let converter = RowConverter::new(vec![SortField::new(DataType::Boolean)]).unwrap();
2500
2501 let col = Arc::new(BooleanArray::from_iter([None, Some(false), Some(true)])) as ArrayRef;
2502
2503 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2504 assert!(rows.row(2) > rows.row(1));
2505 assert!(rows.row(2) > rows.row(0));
2506 assert!(rows.row(1) > rows.row(0));
2507
2508 let cols = converter.convert_rows(&rows).unwrap();
2509 assert_eq!(&cols[0], &col);
2510
2511 let converter = RowConverter::new(vec![SortField::new_with_options(
2512 DataType::Boolean,
2513 SortOptions::default().desc().with_nulls_first(false),
2514 )])
2515 .unwrap();
2516
2517 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2518 assert!(rows.row(2) < rows.row(1));
2519 assert!(rows.row(2) < rows.row(0));
2520 assert!(rows.row(1) < rows.row(0));
2521 let cols = converter.convert_rows(&rows).unwrap();
2522 assert_eq!(&cols[0], &col);
2523 }
2524
2525 #[test]
2526 fn test_timezone() {
2527 let a =
2528 TimestampNanosecondArray::from(vec![1, 2, 3, 4, 5]).with_timezone("+01:00".to_string());
2529 let d = a.data_type().clone();
2530
2531 let converter = RowConverter::new(vec![SortField::new(a.data_type().clone())]).unwrap();
2532 let rows = converter.convert_columns(&[Arc::new(a) as _]).unwrap();
2533 let back = converter.convert_rows(&rows).unwrap();
2534 assert_eq!(back.len(), 1);
2535 assert_eq!(back[0].data_type(), &d);
2536
2537 let mut a = PrimitiveDictionaryBuilder::<Int32Type, TimestampNanosecondType>::new();
2539 a.append(34).unwrap();
2540 a.append_null();
2541 a.append(345).unwrap();
2542
2543 let dict = a.finish();
2545 let values = TimestampNanosecondArray::from(dict.values().to_data());
2546 let dict_with_tz = dict.with_values(Arc::new(values.with_timezone("+02:00")));
2547 let v = DataType::Timestamp(TimeUnit::Nanosecond, Some("+02:00".into()));
2548 let d = DataType::Dictionary(Box::new(DataType::Int32), Box::new(v.clone()));
2549
2550 assert_eq!(dict_with_tz.data_type(), &d);
2551 let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
2552 let rows = converter
2553 .convert_columns(&[Arc::new(dict_with_tz) as _])
2554 .unwrap();
2555 let back = converter.convert_rows(&rows).unwrap();
2556 assert_eq!(back.len(), 1);
2557 assert_eq!(back[0].data_type(), &v);
2558 }
2559
2560 #[test]
2561 fn test_null_encoding() {
2562 let col = Arc::new(NullArray::new(10));
2563 let converter = RowConverter::new(vec![SortField::new(DataType::Null)]).unwrap();
2564 let rows = converter.convert_columns(&[col]).unwrap();
2565 assert_eq!(rows.num_rows(), 10);
2566 assert_eq!(rows.row(1).data.len(), 0);
2567 }
2568
2569 #[test]
2570 fn test_variable_width() {
2571 let col = Arc::new(StringArray::from_iter([
2572 Some("hello"),
2573 Some("he"),
2574 None,
2575 Some("foo"),
2576 Some(""),
2577 ])) as ArrayRef;
2578
2579 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2580 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2581
2582 assert!(rows.row(1) < rows.row(0));
2583 assert!(rows.row(2) < rows.row(4));
2584 assert!(rows.row(3) < rows.row(0));
2585 assert!(rows.row(3) < rows.row(1));
2586
2587 let cols = converter.convert_rows(&rows).unwrap();
2588 assert_eq!(&cols[0], &col);
2589
2590 let col = Arc::new(BinaryArray::from_iter([
2591 None,
2592 Some(vec![0_u8; 0]),
2593 Some(vec![0_u8; 6]),
2594 Some(vec![0_u8; variable::MINI_BLOCK_SIZE]),
2595 Some(vec![0_u8; variable::MINI_BLOCK_SIZE + 1]),
2596 Some(vec![0_u8; variable::BLOCK_SIZE]),
2597 Some(vec![0_u8; variable::BLOCK_SIZE + 1]),
2598 Some(vec![1_u8; 6]),
2599 Some(vec![1_u8; variable::MINI_BLOCK_SIZE]),
2600 Some(vec![1_u8; variable::MINI_BLOCK_SIZE + 1]),
2601 Some(vec![1_u8; variable::BLOCK_SIZE]),
2602 Some(vec![1_u8; variable::BLOCK_SIZE + 1]),
2603 Some(vec![0xFF_u8; 6]),
2604 Some(vec![0xFF_u8; variable::MINI_BLOCK_SIZE]),
2605 Some(vec![0xFF_u8; variable::MINI_BLOCK_SIZE + 1]),
2606 Some(vec![0xFF_u8; variable::BLOCK_SIZE]),
2607 Some(vec![0xFF_u8; variable::BLOCK_SIZE + 1]),
2608 ])) as ArrayRef;
2609
2610 let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
2611 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2612
2613 for i in 0..rows.num_rows() {
2614 for j in i + 1..rows.num_rows() {
2615 assert!(
2616 rows.row(i) < rows.row(j),
2617 "{} < {} - {:?} < {:?}",
2618 i,
2619 j,
2620 rows.row(i),
2621 rows.row(j)
2622 );
2623 }
2624 }
2625
2626 let cols = converter.convert_rows(&rows).unwrap();
2627 assert_eq!(&cols[0], &col);
2628
2629 let converter = RowConverter::new(vec![SortField::new_with_options(
2630 DataType::Binary,
2631 SortOptions::default().desc().with_nulls_first(false),
2632 )])
2633 .unwrap();
2634 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2635
2636 for i in 0..rows.num_rows() {
2637 for j in i + 1..rows.num_rows() {
2638 assert!(
2639 rows.row(i) > rows.row(j),
2640 "{} > {} - {:?} > {:?}",
2641 i,
2642 j,
2643 rows.row(i),
2644 rows.row(j)
2645 );
2646 }
2647 }
2648
2649 let cols = converter.convert_rows(&rows).unwrap();
2650 assert_eq!(&cols[0], &col);
2651 }
2652
2653 fn dictionary_eq(a: &dyn Array, b: &dyn Array) {
2655 match b.data_type() {
2656 DataType::Dictionary(_, v) => {
2657 assert_eq!(a.data_type(), v.as_ref());
2658 let b = arrow_cast::cast(b, v).unwrap();
2659 assert_eq!(a, b.as_ref())
2660 }
2661 _ => assert_eq!(a, b),
2662 }
2663 }
2664
2665 #[test]
2666 fn test_string_dictionary() {
2667 let a = Arc::new(DictionaryArray::<Int32Type>::from_iter([
2668 Some("foo"),
2669 Some("hello"),
2670 Some("he"),
2671 None,
2672 Some("hello"),
2673 Some(""),
2674 Some("hello"),
2675 Some("hello"),
2676 ])) as ArrayRef;
2677
2678 let field = SortField::new(a.data_type().clone());
2679 let converter = RowConverter::new(vec![field]).unwrap();
2680 let rows_a = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
2681
2682 assert!(rows_a.row(3) < rows_a.row(5));
2683 assert!(rows_a.row(2) < rows_a.row(1));
2684 assert!(rows_a.row(0) < rows_a.row(1));
2685 assert!(rows_a.row(3) < rows_a.row(0));
2686
2687 assert_eq!(rows_a.row(1), rows_a.row(4));
2688 assert_eq!(rows_a.row(1), rows_a.row(6));
2689 assert_eq!(rows_a.row(1), rows_a.row(7));
2690
2691 let cols = converter.convert_rows(&rows_a).unwrap();
2692 dictionary_eq(&cols[0], &a);
2693
2694 let b = Arc::new(DictionaryArray::<Int32Type>::from_iter([
2695 Some("hello"),
2696 None,
2697 Some("cupcakes"),
2698 ])) as ArrayRef;
2699
2700 let rows_b = converter.convert_columns(&[Arc::clone(&b)]).unwrap();
2701 assert_eq!(rows_a.row(1), rows_b.row(0));
2702 assert_eq!(rows_a.row(3), rows_b.row(1));
2703 assert!(rows_b.row(2) < rows_a.row(0));
2704
2705 let cols = converter.convert_rows(&rows_b).unwrap();
2706 dictionary_eq(&cols[0], &b);
2707
2708 let converter = RowConverter::new(vec![SortField::new_with_options(
2709 a.data_type().clone(),
2710 SortOptions::default().desc().with_nulls_first(false),
2711 )])
2712 .unwrap();
2713
2714 let rows_c = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
2715 assert!(rows_c.row(3) > rows_c.row(5));
2716 assert!(rows_c.row(2) > rows_c.row(1));
2717 assert!(rows_c.row(0) > rows_c.row(1));
2718 assert!(rows_c.row(3) > rows_c.row(0));
2719
2720 let cols = converter.convert_rows(&rows_c).unwrap();
2721 dictionary_eq(&cols[0], &a);
2722
2723 let converter = RowConverter::new(vec![SortField::new_with_options(
2724 a.data_type().clone(),
2725 SortOptions::default().desc().with_nulls_first(true),
2726 )])
2727 .unwrap();
2728
2729 let rows_c = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
2730 assert!(rows_c.row(3) < rows_c.row(5));
2731 assert!(rows_c.row(2) > rows_c.row(1));
2732 assert!(rows_c.row(0) > rows_c.row(1));
2733 assert!(rows_c.row(3) < rows_c.row(0));
2734
2735 let cols = converter.convert_rows(&rows_c).unwrap();
2736 dictionary_eq(&cols[0], &a);
2737 }
2738
2739 #[test]
2740 fn test_struct() {
2741 let a = Arc::new(Int32Array::from(vec![1, 1, 2, 2])) as ArrayRef;
2743 let a_f = Arc::new(Field::new("int", DataType::Int32, false));
2744 let u = Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as ArrayRef;
2745 let u_f = Arc::new(Field::new("s", DataType::Utf8, false));
2746 let s1 = Arc::new(StructArray::from(vec![(a_f, a), (u_f, u)])) as ArrayRef;
2747
2748 let sort_fields = vec![SortField::new(s1.data_type().clone())];
2749 let converter = RowConverter::new(sort_fields).unwrap();
2750 let r1 = converter.convert_columns(&[Arc::clone(&s1)]).unwrap();
2751
2752 for (a, b) in r1.iter().zip(r1.iter().skip(1)) {
2753 assert!(a < b);
2754 }
2755
2756 let back = converter.convert_rows(&r1).unwrap();
2757 assert_eq!(back.len(), 1);
2758 assert_eq!(&back[0], &s1);
2759
2760 let data = s1
2762 .to_data()
2763 .into_builder()
2764 .null_bit_buffer(Some(Buffer::from_slice_ref([0b00001010])))
2765 .null_count(2)
2766 .build()
2767 .unwrap();
2768
2769 let s2 = Arc::new(StructArray::from(data)) as ArrayRef;
2770 let r2 = converter.convert_columns(&[Arc::clone(&s2)]).unwrap();
2771 assert_eq!(r2.row(0), r2.row(2)); assert!(r2.row(0) < r2.row(1)); assert_ne!(r1.row(0), r2.row(0)); assert_eq!(r1.row(1), r2.row(1)); let back = converter.convert_rows(&r2).unwrap();
2777 assert_eq!(back.len(), 1);
2778 assert_eq!(&back[0], &s2);
2779
2780 back[0].to_data().validate_full().unwrap();
2781 }
2782
2783 #[test]
2784 fn test_dictionary_in_struct() {
2785 let builder = StringDictionaryBuilder::<Int32Type>::new();
2786 let mut struct_builder = StructBuilder::new(
2787 vec![Field::new_dictionary(
2788 "foo",
2789 DataType::Int32,
2790 DataType::Utf8,
2791 true,
2792 )],
2793 vec![Box::new(builder)],
2794 );
2795
2796 let dict_builder = struct_builder
2797 .field_builder::<StringDictionaryBuilder<Int32Type>>(0)
2798 .unwrap();
2799
2800 dict_builder.append_value("a");
2802 dict_builder.append_null();
2803 dict_builder.append_value("a");
2804 dict_builder.append_value("b");
2805
2806 for _ in 0..4 {
2807 struct_builder.append(true);
2808 }
2809
2810 let s = Arc::new(struct_builder.finish()) as ArrayRef;
2811 let sort_fields = vec![SortField::new(s.data_type().clone())];
2812 let converter = RowConverter::new(sort_fields).unwrap();
2813 let r = converter.convert_columns(&[Arc::clone(&s)]).unwrap();
2814
2815 let back = converter.convert_rows(&r).unwrap();
2816 let [s2] = back.try_into().unwrap();
2817
2818 assert_ne!(&s.data_type(), &s2.data_type());
2821 s2.to_data().validate_full().unwrap();
2822
2823 let s1_struct = s.as_struct();
2827 let s1_0 = s1_struct.column(0);
2828 let s1_idx_0 = s1_0.as_dictionary::<Int32Type>();
2829 let keys = s1_idx_0.keys();
2830 let values = s1_idx_0.values().as_string::<i32>();
2831 let s2_struct = s2.as_struct();
2833 let s2_0 = s2_struct.column(0);
2834 let s2_idx_0 = s2_0.as_string::<i32>();
2835
2836 for i in 0..keys.len() {
2837 if keys.is_null(i) {
2838 assert!(s2_idx_0.is_null(i));
2839 } else {
2840 let dict_index = keys.value(i) as usize;
2841 assert_eq!(values.value(dict_index), s2_idx_0.value(i));
2842 }
2843 }
2844 }
2845
2846 #[test]
2847 fn test_dictionary_in_struct_empty() {
2848 let ty = DataType::Struct(
2849 vec![Field::new_dictionary(
2850 "foo",
2851 DataType::Int32,
2852 DataType::Int32,
2853 false,
2854 )]
2855 .into(),
2856 );
2857 let s = arrow_array::new_empty_array(&ty);
2858
2859 let sort_fields = vec![SortField::new(s.data_type().clone())];
2860 let converter = RowConverter::new(sort_fields).unwrap();
2861 let r = converter.convert_columns(&[Arc::clone(&s)]).unwrap();
2862
2863 let back = converter.convert_rows(&r).unwrap();
2864 let [s2] = back.try_into().unwrap();
2865
2866 assert_ne!(&s.data_type(), &s2.data_type());
2869 s2.to_data().validate_full().unwrap();
2870 assert_eq!(s.len(), 0);
2871 assert_eq!(s2.len(), 0);
2872 }
2873
2874 #[test]
2875 fn test_list_of_string_dictionary() {
2876 let mut builder = ListBuilder::<StringDictionaryBuilder<Int32Type>>::default();
2877 builder.values().append("a").unwrap();
2879 builder.values().append("b").unwrap();
2880 builder.values().append("zero").unwrap();
2881 builder.values().append_null();
2882 builder.values().append("c").unwrap();
2883 builder.values().append("b").unwrap();
2884 builder.values().append("d").unwrap();
2885 builder.append(true);
2886 builder.append(false);
2888 builder.values().append("e").unwrap();
2890 builder.values().append("zero").unwrap();
2891 builder.values().append("a").unwrap();
2892 builder.append(true);
2893
2894 let a = Arc::new(builder.finish()) as ArrayRef;
2895 let data_type = a.data_type().clone();
2896
2897 let field = SortField::new(data_type.clone());
2898 let converter = RowConverter::new(vec![field]).unwrap();
2899 let rows = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
2900
2901 let back = converter.convert_rows(&rows).unwrap();
2902 assert_eq!(back.len(), 1);
2903 let [a2] = back.try_into().unwrap();
2904
2905 assert_ne!(&a.data_type(), &a2.data_type());
2908
2909 a2.to_data().validate_full().unwrap();
2910
2911 let a2_list = a2.as_list::<i32>();
2912 let a1_list = a.as_list::<i32>();
2913
2914 let a1_0 = a1_list.value(0);
2917 let a1_idx_0 = a1_0.as_dictionary::<Int32Type>();
2918 let keys = a1_idx_0.keys();
2919 let values = a1_idx_0.values().as_string::<i32>();
2920 let a2_0 = a2_list.value(0);
2921 let a2_idx_0 = a2_0.as_string::<i32>();
2922
2923 for i in 0..keys.len() {
2924 if keys.is_null(i) {
2925 assert!(a2_idx_0.is_null(i));
2926 } else {
2927 let dict_index = keys.value(i) as usize;
2928 assert_eq!(values.value(dict_index), a2_idx_0.value(i));
2929 }
2930 }
2931
2932 assert!(a1_list.is_null(1));
2934 assert!(a2_list.is_null(1));
2935
2936 let a1_2 = a1_list.value(2);
2938 let a1_idx_2 = a1_2.as_dictionary::<Int32Type>();
2939 let keys = a1_idx_2.keys();
2940 let values = a1_idx_2.values().as_string::<i32>();
2941 let a2_2 = a2_list.value(2);
2942 let a2_idx_2 = a2_2.as_string::<i32>();
2943
2944 for i in 0..keys.len() {
2945 if keys.is_null(i) {
2946 assert!(a2_idx_2.is_null(i));
2947 } else {
2948 let dict_index = keys.value(i) as usize;
2949 assert_eq!(values.value(dict_index), a2_idx_2.value(i));
2950 }
2951 }
2952 }
2953
2954 #[test]
2955 fn test_primitive_dictionary() {
2956 let mut builder = PrimitiveDictionaryBuilder::<Int32Type, Int32Type>::new();
2957 builder.append(2).unwrap();
2958 builder.append(3).unwrap();
2959 builder.append(0).unwrap();
2960 builder.append_null();
2961 builder.append(5).unwrap();
2962 builder.append(3).unwrap();
2963 builder.append(-1).unwrap();
2964
2965 let a = builder.finish();
2966 let data_type = a.data_type().clone();
2967 let columns = [Arc::new(a) as ArrayRef];
2968
2969 let field = SortField::new(data_type.clone());
2970 let converter = RowConverter::new(vec![field]).unwrap();
2971 let rows = converter.convert_columns(&columns).unwrap();
2972 assert!(rows.row(0) < rows.row(1));
2973 assert!(rows.row(2) < rows.row(0));
2974 assert!(rows.row(3) < rows.row(2));
2975 assert!(rows.row(6) < rows.row(2));
2976 assert!(rows.row(3) < rows.row(6));
2977
2978 let back = converter.convert_rows(&rows).unwrap();
2979 assert_eq!(back.len(), 1);
2980 back[0].to_data().validate_full().unwrap();
2981 }
2982
2983 #[test]
2984 fn test_dictionary_nulls() {
2985 let values = Int32Array::from_iter([Some(1), Some(-1), None, Some(4), None]).into_data();
2986 let keys =
2987 Int32Array::from_iter([Some(0), Some(0), Some(1), Some(2), Some(4), None]).into_data();
2988
2989 let data_type = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Int32));
2990 let data = keys
2991 .into_builder()
2992 .data_type(data_type.clone())
2993 .child_data(vec![values])
2994 .build()
2995 .unwrap();
2996
2997 let columns = [Arc::new(DictionaryArray::<Int32Type>::from(data)) as ArrayRef];
2998 let field = SortField::new(data_type.clone());
2999 let converter = RowConverter::new(vec![field]).unwrap();
3000 let rows = converter.convert_columns(&columns).unwrap();
3001
3002 assert_eq!(rows.row(0), rows.row(1));
3003 assert_eq!(rows.row(3), rows.row(4));
3004 assert_eq!(rows.row(4), rows.row(5));
3005 assert!(rows.row(3) < rows.row(0));
3006 }
3007
3008 #[test]
3009 fn test_from_binary_shared_buffer() {
3010 let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
3011 let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _;
3012 let rows = converter.convert_columns(&[array]).unwrap();
3013 let binary_rows = rows.try_into_binary().expect("known-small rows");
3014 let _binary_rows_shared_buffer = binary_rows.clone();
3015
3016 let parsed = converter.from_binary(binary_rows);
3017
3018 converter.convert_rows(parsed.iter()).unwrap();
3019 }
3020
3021 #[test]
3022 #[should_panic(expected = "Encountered non UTF-8 data")]
3023 fn test_invalid_utf8() {
3024 let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
3025 let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _;
3026 let rows = converter.convert_columns(&[array]).unwrap();
3027 let binary_row = rows.row(0);
3028
3029 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
3030 let parser = converter.parser();
3031 let utf8_row = parser.parse(binary_row.as_ref());
3032
3033 converter.convert_rows(std::iter::once(utf8_row)).unwrap();
3034 }
3035
3036 #[test]
3037 #[should_panic(expected = "Encountered non UTF-8 data")]
3038 fn test_invalid_utf8_array() {
3039 let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
3040 let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _;
3041 let rows = converter.convert_columns(&[array]).unwrap();
3042 let binary_rows = rows.try_into_binary().expect("known-small rows");
3043
3044 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
3045 let parsed = converter.from_binary(binary_rows);
3046
3047 converter.convert_rows(parsed.iter()).unwrap();
3048 }
3049
3050 #[test]
3051 #[should_panic(expected = "index out of bounds")]
3052 fn test_invalid_empty() {
3053 let binary_row: &[u8] = &[];
3054
3055 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
3056 let parser = converter.parser();
3057 let utf8_row = parser.parse(binary_row.as_ref());
3058
3059 converter.convert_rows(std::iter::once(utf8_row)).unwrap();
3060 }
3061
3062 #[test]
3063 #[should_panic(expected = "index out of bounds")]
3064 fn test_invalid_empty_array() {
3065 let row: &[u8] = &[];
3066 let binary_rows = BinaryArray::from(vec![row]);
3067
3068 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
3069 let parsed = converter.from_binary(binary_rows);
3070
3071 converter.convert_rows(parsed.iter()).unwrap();
3072 }
3073
3074 #[test]
3075 #[should_panic(expected = "index out of bounds")]
3076 fn test_invalid_truncated() {
3077 let binary_row: &[u8] = &[0x02];
3078
3079 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
3080 let parser = converter.parser();
3081 let utf8_row = parser.parse(binary_row.as_ref());
3082
3083 converter.convert_rows(std::iter::once(utf8_row)).unwrap();
3084 }
3085
3086 #[test]
3087 #[should_panic(expected = "index out of bounds")]
3088 fn test_invalid_truncated_array() {
3089 let row: &[u8] = &[0x02];
3090 let binary_rows = BinaryArray::from(vec![row]);
3091
3092 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
3093 let parsed = converter.from_binary(binary_rows);
3094
3095 converter.convert_rows(parsed.iter()).unwrap();
3096 }
3097
3098 #[test]
3099 #[should_panic(expected = "rows were not produced by this RowConverter")]
3100 fn test_different_converter() {
3101 let values = Arc::new(Int32Array::from_iter([Some(1), Some(-1)]));
3102 let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
3103 let rows = converter.convert_columns(&[values]).unwrap();
3104
3105 let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
3106 let _ = converter.convert_rows(&rows);
3107 }
3108
3109 fn test_single_list<O: OffsetSizeTrait>() {
3110 let mut builder = GenericListBuilder::<O, _>::new(Int32Builder::new());
3111 builder.values().append_value(32);
3112 builder.values().append_value(52);
3113 builder.values().append_value(32);
3114 builder.append(true);
3115 builder.values().append_value(32);
3116 builder.values().append_value(52);
3117 builder.values().append_value(12);
3118 builder.append(true);
3119 builder.values().append_value(32);
3120 builder.values().append_value(52);
3121 builder.append(true);
3122 builder.values().append_value(32); builder.values().append_value(52); builder.append(false);
3125 builder.values().append_value(32);
3126 builder.values().append_null();
3127 builder.append(true);
3128 builder.append(true);
3129 builder.values().append_value(17); builder.values().append_null(); builder.append(false);
3132
3133 let list = Arc::new(builder.finish()) as ArrayRef;
3134 let d = list.data_type().clone();
3135
3136 let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
3137
3138 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3139 assert!(rows.row(0) > rows.row(1)); assert!(rows.row(2) < rows.row(1)); assert!(rows.row(3) < rows.row(2)); assert!(rows.row(4) < rows.row(2)); assert!(rows.row(5) < rows.row(2)); assert!(rows.row(3) < rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
3148 assert_eq!(back.len(), 1);
3149 back[0].to_data().validate_full().unwrap();
3150 assert_eq!(&back[0], &list);
3151
3152 let options = SortOptions::default().asc().with_nulls_first(false);
3153 let field = SortField::new_with_options(d.clone(), options);
3154 let converter = RowConverter::new(vec![field]).unwrap();
3155 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3156
3157 assert!(rows.row(0) > rows.row(1)); assert!(rows.row(2) < rows.row(1)); assert!(rows.row(3) > rows.row(2)); assert!(rows.row(4) > rows.row(2)); assert!(rows.row(5) < rows.row(2)); assert!(rows.row(3) > rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
3166 assert_eq!(back.len(), 1);
3167 back[0].to_data().validate_full().unwrap();
3168 assert_eq!(&back[0], &list);
3169
3170 let options = SortOptions::default().desc().with_nulls_first(false);
3171 let field = SortField::new_with_options(d.clone(), options);
3172 let converter = RowConverter::new(vec![field]).unwrap();
3173 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3174
3175 assert!(rows.row(0) < rows.row(1)); assert!(rows.row(2) > rows.row(1)); assert!(rows.row(3) > rows.row(2)); assert!(rows.row(4) > rows.row(2)); assert!(rows.row(5) > rows.row(2)); assert!(rows.row(3) > rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
3184 assert_eq!(back.len(), 1);
3185 back[0].to_data().validate_full().unwrap();
3186 assert_eq!(&back[0], &list);
3187
3188 let options = SortOptions::default().desc().with_nulls_first(true);
3189 let field = SortField::new_with_options(d, options);
3190 let converter = RowConverter::new(vec![field]).unwrap();
3191 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3192
3193 assert!(rows.row(0) < rows.row(1)); assert!(rows.row(2) > rows.row(1)); assert!(rows.row(3) < rows.row(2)); assert!(rows.row(4) < rows.row(2)); assert!(rows.row(5) > rows.row(2)); assert!(rows.row(3) < rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
3202 assert_eq!(back.len(), 1);
3203 back[0].to_data().validate_full().unwrap();
3204 assert_eq!(&back[0], &list);
3205
3206 let sliced_list = list.slice(1, 5);
3207 let rows_on_sliced_list = converter
3208 .convert_columns(&[Arc::clone(&sliced_list)])
3209 .unwrap();
3210
3211 assert!(rows_on_sliced_list.row(1) > rows_on_sliced_list.row(0)); assert!(rows_on_sliced_list.row(2) < rows_on_sliced_list.row(1)); assert!(rows_on_sliced_list.row(3) < rows_on_sliced_list.row(1)); assert!(rows_on_sliced_list.row(4) > rows_on_sliced_list.row(1)); assert!(rows_on_sliced_list.row(2) < rows_on_sliced_list.row(4)); let back = converter.convert_rows(&rows_on_sliced_list).unwrap();
3218 assert_eq!(back.len(), 1);
3219 back[0].to_data().validate_full().unwrap();
3220 assert_eq!(&back[0], &sliced_list);
3221 }
3222
3223 fn test_nested_list<O: OffsetSizeTrait>() {
3224 let mut builder =
3225 GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(Int32Builder::new()));
3226
3227 builder.values().values().append_value(1);
3228 builder.values().values().append_value(2);
3229 builder.values().append(true);
3230 builder.values().values().append_value(1);
3231 builder.values().values().append_null();
3232 builder.values().append(true);
3233 builder.append(true);
3234
3235 builder.values().values().append_value(1);
3236 builder.values().values().append_null();
3237 builder.values().append(true);
3238 builder.values().values().append_value(1);
3239 builder.values().values().append_null();
3240 builder.values().append(true);
3241 builder.append(true);
3242
3243 builder.values().values().append_value(1);
3244 builder.values().values().append_null();
3245 builder.values().append(true);
3246 builder.values().append(false);
3247 builder.append(true);
3248 builder.append(false);
3249
3250 builder.values().values().append_value(1);
3251 builder.values().values().append_value(2);
3252 builder.values().append(true);
3253 builder.append(true);
3254
3255 let list = Arc::new(builder.finish()) as ArrayRef;
3256 let d = list.data_type().clone();
3257
3258 let options = SortOptions::default().asc().with_nulls_first(true);
3266 let field = SortField::new_with_options(d.clone(), options);
3267 let converter = RowConverter::new(vec![field]).unwrap();
3268 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3269
3270 assert!(rows.row(0) > rows.row(1));
3271 assert!(rows.row(1) > rows.row(2));
3272 assert!(rows.row(2) > rows.row(3));
3273 assert!(rows.row(4) < rows.row(0));
3274 assert!(rows.row(4) > rows.row(1));
3275
3276 let back = converter.convert_rows(&rows).unwrap();
3277 assert_eq!(back.len(), 1);
3278 back[0].to_data().validate_full().unwrap();
3279 assert_eq!(&back[0], &list);
3280
3281 let options = SortOptions::default().desc().with_nulls_first(true);
3282 let field = SortField::new_with_options(d.clone(), options);
3283 let converter = RowConverter::new(vec![field]).unwrap();
3284 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3285
3286 assert!(rows.row(0) > rows.row(1));
3287 assert!(rows.row(1) > rows.row(2));
3288 assert!(rows.row(2) > rows.row(3));
3289 assert!(rows.row(4) > rows.row(0));
3290 assert!(rows.row(4) > rows.row(1));
3291
3292 let back = converter.convert_rows(&rows).unwrap();
3293 assert_eq!(back.len(), 1);
3294 back[0].to_data().validate_full().unwrap();
3295 assert_eq!(&back[0], &list);
3296
3297 let options = SortOptions::default().desc().with_nulls_first(false);
3298 let field = SortField::new_with_options(d, options);
3299 let converter = RowConverter::new(vec![field]).unwrap();
3300 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3301
3302 assert!(rows.row(0) < rows.row(1));
3303 assert!(rows.row(1) < rows.row(2));
3304 assert!(rows.row(2) < rows.row(3));
3305 assert!(rows.row(4) > rows.row(0));
3306 assert!(rows.row(4) < rows.row(1));
3307
3308 let back = converter.convert_rows(&rows).unwrap();
3309 assert_eq!(back.len(), 1);
3310 back[0].to_data().validate_full().unwrap();
3311 assert_eq!(&back[0], &list);
3312
3313 let sliced_list = list.slice(1, 3);
3314 let rows = converter
3315 .convert_columns(&[Arc::clone(&sliced_list)])
3316 .unwrap();
3317
3318 assert!(rows.row(0) < rows.row(1));
3319 assert!(rows.row(1) < rows.row(2));
3320
3321 let back = converter.convert_rows(&rows).unwrap();
3322 assert_eq!(back.len(), 1);
3323 back[0].to_data().validate_full().unwrap();
3324 assert_eq!(&back[0], &sliced_list);
3325 }
3326
3327 #[test]
3328 fn test_list() {
3329 test_single_list::<i32>();
3330 test_nested_list::<i32>();
3331 }
3332
3333 #[test]
3334 fn test_large_list() {
3335 test_single_list::<i64>();
3336 test_nested_list::<i64>();
3337 }
3338
3339 fn test_single_list_view<O: OffsetSizeTrait>() {
3340 let mut builder = GenericListViewBuilder::<O, _>::new(Int32Builder::new());
3341 builder.values().append_value(32);
3342 builder.values().append_value(52);
3343 builder.values().append_value(32);
3344 builder.append(true);
3345 builder.values().append_value(32);
3346 builder.values().append_value(52);
3347 builder.values().append_value(12);
3348 builder.append(true);
3349 builder.values().append_value(32);
3350 builder.values().append_value(52);
3351 builder.append(true);
3352 builder.values().append_value(32); builder.values().append_value(52); builder.append(false);
3355 builder.values().append_value(32);
3356 builder.values().append_null();
3357 builder.append(true);
3358 builder.append(true);
3359 builder.values().append_value(17); builder.values().append_null(); builder.append(false);
3362
3363 let list = Arc::new(builder.finish()) as ArrayRef;
3364 let d = list.data_type().clone();
3365
3366 let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
3367
3368 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3369 assert!(rows.row(0) > rows.row(1)); assert!(rows.row(2) < rows.row(1)); assert!(rows.row(3) < rows.row(2)); assert!(rows.row(4) < rows.row(2)); assert!(rows.row(5) < rows.row(2)); assert!(rows.row(3) < rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
3378 assert_eq!(back.len(), 1);
3379 back[0].to_data().validate_full().unwrap();
3380
3381 let back_list_view = back[0]
3383 .as_any()
3384 .downcast_ref::<GenericListViewArray<O>>()
3385 .unwrap();
3386 let orig_list_view = list
3387 .as_any()
3388 .downcast_ref::<GenericListViewArray<O>>()
3389 .unwrap();
3390
3391 assert_eq!(back_list_view.len(), orig_list_view.len());
3392 for i in 0..back_list_view.len() {
3393 assert_eq!(back_list_view.is_valid(i), orig_list_view.is_valid(i));
3394 if back_list_view.is_valid(i) {
3395 assert_eq!(&back_list_view.value(i), &orig_list_view.value(i));
3396 }
3397 }
3398
3399 let options = SortOptions::default().asc().with_nulls_first(false);
3400 let field = SortField::new_with_options(d.clone(), options);
3401 let converter = RowConverter::new(vec![field]).unwrap();
3402 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3403
3404 assert!(rows.row(0) > rows.row(1)); assert!(rows.row(2) < rows.row(1)); assert!(rows.row(3) > rows.row(2)); assert!(rows.row(4) > rows.row(2)); assert!(rows.row(5) < rows.row(2)); assert!(rows.row(3) > rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
3413 assert_eq!(back.len(), 1);
3414 back[0].to_data().validate_full().unwrap();
3415
3416 let options = SortOptions::default().desc().with_nulls_first(false);
3417 let field = SortField::new_with_options(d.clone(), options);
3418 let converter = RowConverter::new(vec![field]).unwrap();
3419 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3420
3421 assert!(rows.row(0) < rows.row(1)); assert!(rows.row(2) > rows.row(1)); assert!(rows.row(3) > rows.row(2)); assert!(rows.row(4) > rows.row(2)); assert!(rows.row(5) > rows.row(2)); assert!(rows.row(3) > rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
3430 assert_eq!(back.len(), 1);
3431 back[0].to_data().validate_full().unwrap();
3432
3433 let options = SortOptions::default().desc().with_nulls_first(true);
3434 let field = SortField::new_with_options(d, options);
3435 let converter = RowConverter::new(vec![field]).unwrap();
3436 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3437
3438 assert!(rows.row(0) < rows.row(1)); assert!(rows.row(2) > rows.row(1)); assert!(rows.row(3) < rows.row(2)); assert!(rows.row(4) < rows.row(2)); assert!(rows.row(5) > rows.row(2)); assert!(rows.row(3) < rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
3447 assert_eq!(back.len(), 1);
3448 back[0].to_data().validate_full().unwrap();
3449
3450 let sliced_list = list.slice(1, 5);
3451 let rows_on_sliced_list = converter
3452 .convert_columns(&[Arc::clone(&sliced_list)])
3453 .unwrap();
3454
3455 assert!(rows_on_sliced_list.row(1) > rows_on_sliced_list.row(0)); assert!(rows_on_sliced_list.row(2) < rows_on_sliced_list.row(1)); assert!(rows_on_sliced_list.row(3) < rows_on_sliced_list.row(1)); assert!(rows_on_sliced_list.row(4) > rows_on_sliced_list.row(1)); assert!(rows_on_sliced_list.row(2) < rows_on_sliced_list.row(4)); let back = converter.convert_rows(&rows_on_sliced_list).unwrap();
3462 assert_eq!(back.len(), 1);
3463 back[0].to_data().validate_full().unwrap();
3464 }
3465
3466 fn test_nested_list_view<O: OffsetSizeTrait>() {
3467 let mut builder = GenericListViewBuilder::<O, _>::new(GenericListViewBuilder::<O, _>::new(
3468 Int32Builder::new(),
3469 ));
3470
3471 builder.values().values().append_value(1);
3473 builder.values().values().append_value(2);
3474 builder.values().append(true);
3475 builder.values().values().append_value(1);
3476 builder.values().values().append_null();
3477 builder.values().append(true);
3478 builder.append(true);
3479
3480 builder.values().values().append_value(1);
3482 builder.values().values().append_null();
3483 builder.values().append(true);
3484 builder.values().values().append_value(1);
3485 builder.values().values().append_null();
3486 builder.values().append(true);
3487 builder.append(true);
3488
3489 builder.values().values().append_value(1);
3491 builder.values().values().append_null();
3492 builder.values().append(true);
3493 builder.values().append(false);
3494 builder.append(true);
3495
3496 builder.append(false);
3498
3499 builder.values().values().append_value(1);
3501 builder.values().values().append_value(2);
3502 builder.values().append(true);
3503 builder.append(true);
3504
3505 let list = Arc::new(builder.finish()) as ArrayRef;
3506 let d = list.data_type().clone();
3507
3508 let options = SortOptions::default().asc().with_nulls_first(true);
3516 let field = SortField::new_with_options(d.clone(), options);
3517 let converter = RowConverter::new(vec![field]).unwrap();
3518 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3519
3520 assert!(rows.row(0) > rows.row(1));
3521 assert!(rows.row(1) > rows.row(2));
3522 assert!(rows.row(2) > rows.row(3));
3523 assert!(rows.row(4) < rows.row(0));
3524 assert!(rows.row(4) > rows.row(1));
3525
3526 let back = converter.convert_rows(&rows).unwrap();
3527 assert_eq!(back.len(), 1);
3528 back[0].to_data().validate_full().unwrap();
3529
3530 let back_list_view = back[0]
3532 .as_any()
3533 .downcast_ref::<GenericListViewArray<O>>()
3534 .unwrap();
3535 let orig_list_view = list
3536 .as_any()
3537 .downcast_ref::<GenericListViewArray<O>>()
3538 .unwrap();
3539
3540 assert_eq!(back_list_view.len(), orig_list_view.len());
3541 for i in 0..back_list_view.len() {
3542 assert_eq!(back_list_view.is_valid(i), orig_list_view.is_valid(i));
3543 if back_list_view.is_valid(i) {
3544 assert_eq!(&back_list_view.value(i), &orig_list_view.value(i));
3545 }
3546 }
3547
3548 let options = SortOptions::default().desc().with_nulls_first(true);
3549 let field = SortField::new_with_options(d.clone(), options);
3550 let converter = RowConverter::new(vec![field]).unwrap();
3551 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3552
3553 assert!(rows.row(0) > rows.row(1));
3554 assert!(rows.row(1) > rows.row(2));
3555 assert!(rows.row(2) > rows.row(3));
3556 assert!(rows.row(4) > rows.row(0));
3557 assert!(rows.row(4) > rows.row(1));
3558
3559 let back = converter.convert_rows(&rows).unwrap();
3560 assert_eq!(back.len(), 1);
3561 back[0].to_data().validate_full().unwrap();
3562
3563 let back_list_view = back[0]
3565 .as_any()
3566 .downcast_ref::<GenericListViewArray<O>>()
3567 .unwrap();
3568
3569 assert_eq!(back_list_view.len(), orig_list_view.len());
3570 for i in 0..back_list_view.len() {
3571 assert_eq!(back_list_view.is_valid(i), orig_list_view.is_valid(i));
3572 if back_list_view.is_valid(i) {
3573 assert_eq!(&back_list_view.value(i), &orig_list_view.value(i));
3574 }
3575 }
3576
3577 let options = SortOptions::default().desc().with_nulls_first(false);
3578 let field = SortField::new_with_options(d.clone(), options);
3579 let converter = RowConverter::new(vec![field]).unwrap();
3580 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3581
3582 assert!(rows.row(0) < rows.row(1));
3583 assert!(rows.row(1) < rows.row(2));
3584 assert!(rows.row(2) < rows.row(3));
3585 assert!(rows.row(4) > rows.row(0));
3586 assert!(rows.row(4) < rows.row(1));
3587
3588 let back = converter.convert_rows(&rows).unwrap();
3589 assert_eq!(back.len(), 1);
3590 back[0].to_data().validate_full().unwrap();
3591
3592 let back_list_view = back[0]
3594 .as_any()
3595 .downcast_ref::<GenericListViewArray<O>>()
3596 .unwrap();
3597
3598 assert_eq!(back_list_view.len(), orig_list_view.len());
3599 for i in 0..back_list_view.len() {
3600 assert_eq!(back_list_view.is_valid(i), orig_list_view.is_valid(i));
3601 if back_list_view.is_valid(i) {
3602 assert_eq!(&back_list_view.value(i), &orig_list_view.value(i));
3603 }
3604 }
3605
3606 let sliced_list = list.slice(1, 3);
3607 let rows = converter
3608 .convert_columns(&[Arc::clone(&sliced_list)])
3609 .unwrap();
3610
3611 assert!(rows.row(0) < rows.row(1));
3612 assert!(rows.row(1) < rows.row(2));
3613
3614 let back = converter.convert_rows(&rows).unwrap();
3615 assert_eq!(back.len(), 1);
3616 back[0].to_data().validate_full().unwrap();
3617 }
3618
3619 #[test]
3620 fn test_list_view() {
3621 test_single_list_view::<i32>();
3622 test_nested_list_view::<i32>();
3623 }
3624
3625 #[test]
3626 fn test_large_list_view() {
3627 test_single_list_view::<i64>();
3628 test_nested_list_view::<i64>();
3629 }
3630
3631 fn test_list_view_with_shared_values<O: OffsetSizeTrait>() {
3632 let values = Int32Array::from(vec![1, 2, 3, 4, 5, 6, 7, 8]);
3634 let field = Arc::new(Field::new_list_field(DataType::Int32, true));
3635
3636 let offsets = ScalarBuffer::<O>::from(vec![
3644 O::from_usize(0).unwrap(),
3645 O::from_usize(0).unwrap(),
3646 O::from_usize(5).unwrap(),
3647 O::from_usize(2).unwrap(),
3648 O::from_usize(1).unwrap(),
3649 O::from_usize(2).unwrap(),
3650 ]);
3651 let sizes = ScalarBuffer::<O>::from(vec![
3652 O::from_usize(3).unwrap(),
3653 O::from_usize(3).unwrap(),
3654 O::from_usize(2).unwrap(),
3655 O::from_usize(2).unwrap(),
3656 O::from_usize(4).unwrap(),
3657 O::from_usize(1).unwrap(),
3658 ]);
3659
3660 let list_view: GenericListViewArray<O> =
3661 GenericListViewArray::try_new(field, offsets, sizes, Arc::new(values), None).unwrap();
3662
3663 let d = list_view.data_type().clone();
3664 let list = Arc::new(list_view) as ArrayRef;
3665
3666 let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
3667 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3668
3669 assert_eq!(rows.row(0), rows.row(1));
3671
3672 assert!(rows.row(0) < rows.row(2));
3674
3675 assert!(rows.row(3) > rows.row(0));
3677
3678 assert!(rows.row(4) > rows.row(0));
3680
3681 assert!(rows.row(5) < rows.row(3));
3683
3684 assert!(rows.row(5) > rows.row(4));
3686
3687 let back = converter.convert_rows(&rows).unwrap();
3689 assert_eq!(back.len(), 1);
3690 back[0].to_data().validate_full().unwrap();
3691
3692 let back_list_view = back[0]
3694 .as_any()
3695 .downcast_ref::<GenericListViewArray<O>>()
3696 .unwrap();
3697 let orig_list_view = list
3698 .as_any()
3699 .downcast_ref::<GenericListViewArray<O>>()
3700 .unwrap();
3701
3702 assert_eq!(back_list_view.len(), orig_list_view.len());
3703 for i in 0..back_list_view.len() {
3704 assert_eq!(back_list_view.is_valid(i), orig_list_view.is_valid(i));
3705 if back_list_view.is_valid(i) {
3706 assert_eq!(&back_list_view.value(i), &orig_list_view.value(i));
3707 }
3708 }
3709
3710 let options = SortOptions::default().desc();
3712 let field = SortField::new_with_options(d, options);
3713 let converter = RowConverter::new(vec![field]).unwrap();
3714 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3715
3716 assert_eq!(rows.row(0), rows.row(1)); assert!(rows.row(0) > rows.row(2)); assert!(rows.row(3) < rows.row(0)); let back = converter.convert_rows(&rows).unwrap();
3722 assert_eq!(back.len(), 1);
3723 back[0].to_data().validate_full().unwrap();
3724 }
3725
3726 #[test]
3727 fn test_list_view_shared_values() {
3728 test_list_view_with_shared_values::<i32>();
3729 }
3730
3731 #[test]
3732 fn test_large_list_view_shared_values() {
3733 test_list_view_with_shared_values::<i64>();
3734 }
3735
3736 #[test]
3737 fn test_fixed_size_list() {
3738 let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 3);
3739 builder.values().append_value(32);
3740 builder.values().append_value(52);
3741 builder.values().append_value(32);
3742 builder.append(true);
3743 builder.values().append_value(32);
3744 builder.values().append_value(52);
3745 builder.values().append_value(12);
3746 builder.append(true);
3747 builder.values().append_value(32);
3748 builder.values().append_value(52);
3749 builder.values().append_null();
3750 builder.append(true);
3751 builder.values().append_value(32); builder.values().append_value(52); builder.values().append_value(13); builder.append(false);
3755 builder.values().append_value(32);
3756 builder.values().append_null();
3757 builder.values().append_null();
3758 builder.append(true);
3759 builder.values().append_null();
3760 builder.values().append_null();
3761 builder.values().append_null();
3762 builder.append(true);
3763 builder.values().append_value(17); builder.values().append_null(); builder.values().append_value(77); builder.append(false);
3767
3768 let list = Arc::new(builder.finish()) as ArrayRef;
3769 let d = list.data_type().clone();
3770
3771 let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
3773
3774 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3775 assert!(rows.row(0) > rows.row(1)); assert!(rows.row(2) < rows.row(1)); assert!(rows.row(3) < rows.row(2)); assert!(rows.row(4) < rows.row(2)); assert!(rows.row(5) < rows.row(2)); assert!(rows.row(3) < rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
3784 assert_eq!(back.len(), 1);
3785 back[0].to_data().validate_full().unwrap();
3786 assert_eq!(&back[0], &list);
3787
3788 let options = SortOptions::default().asc().with_nulls_first(false);
3790 let field = SortField::new_with_options(d.clone(), options);
3791 let converter = RowConverter::new(vec![field]).unwrap();
3792 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3793 assert!(rows.row(0) > rows.row(1)); assert!(rows.row(2) > rows.row(1)); assert!(rows.row(3) > rows.row(2)); assert!(rows.row(4) > rows.row(2)); assert!(rows.row(5) > rows.row(2)); assert!(rows.row(3) > rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
3802 assert_eq!(back.len(), 1);
3803 back[0].to_data().validate_full().unwrap();
3804 assert_eq!(&back[0], &list);
3805
3806 let options = SortOptions::default().desc().with_nulls_first(false);
3808 let field = SortField::new_with_options(d.clone(), options);
3809 let converter = RowConverter::new(vec![field]).unwrap();
3810 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3811 assert!(rows.row(0) < rows.row(1)); assert!(rows.row(2) > rows.row(1)); assert!(rows.row(3) > rows.row(2)); assert!(rows.row(4) > rows.row(2)); assert!(rows.row(5) > rows.row(2)); assert!(rows.row(3) > rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
3820 assert_eq!(back.len(), 1);
3821 back[0].to_data().validate_full().unwrap();
3822 assert_eq!(&back[0], &list);
3823
3824 let options = SortOptions::default().desc().with_nulls_first(true);
3826 let field = SortField::new_with_options(d, options);
3827 let converter = RowConverter::new(vec![field]).unwrap();
3828 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
3829
3830 assert!(rows.row(0) < rows.row(1)); assert!(rows.row(2) < rows.row(1)); assert!(rows.row(3) < rows.row(2)); assert!(rows.row(4) < rows.row(2)); assert!(rows.row(5) < rows.row(2)); assert!(rows.row(3) < rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
3839 assert_eq!(back.len(), 1);
3840 back[0].to_data().validate_full().unwrap();
3841 assert_eq!(&back[0], &list);
3842
3843 let sliced_list = list.slice(1, 5);
3844 let rows_on_sliced_list = converter
3845 .convert_columns(&[Arc::clone(&sliced_list)])
3846 .unwrap();
3847
3848 assert!(rows_on_sliced_list.row(2) < rows_on_sliced_list.row(1)); assert!(rows_on_sliced_list.row(3) < rows_on_sliced_list.row(1)); assert!(rows_on_sliced_list.row(4) < rows_on_sliced_list.row(1)); assert!(rows_on_sliced_list.row(2) < rows_on_sliced_list.row(4)); let back = converter.convert_rows(&rows_on_sliced_list).unwrap();
3854 assert_eq!(back.len(), 1);
3855 back[0].to_data().validate_full().unwrap();
3856 assert_eq!(&back[0], &sliced_list);
3857 }
3858
3859 #[test]
3860 fn test_two_fixed_size_lists() {
3861 let mut first = FixedSizeListBuilder::new(UInt8Builder::new(), 1);
3862 first.values().append_value(100);
3864 first.append(true);
3865 first.values().append_value(101);
3867 first.append(true);
3868 first.values().append_value(102);
3870 first.append(true);
3871 first.values().append_null();
3873 first.append(true);
3874 first.values().append_null(); first.append(false);
3877 let first = Arc::new(first.finish()) as ArrayRef;
3878 let first_type = first.data_type().clone();
3879
3880 let mut second = FixedSizeListBuilder::new(UInt8Builder::new(), 1);
3881 second.values().append_value(200);
3883 second.append(true);
3884 second.values().append_value(201);
3886 second.append(true);
3887 second.values().append_value(202);
3889 second.append(true);
3890 second.values().append_null();
3892 second.append(true);
3893 second.values().append_null(); second.append(false);
3896 let second = Arc::new(second.finish()) as ArrayRef;
3897 let second_type = second.data_type().clone();
3898
3899 let converter = RowConverter::new(vec![
3900 SortField::new(first_type.clone()),
3901 SortField::new(second_type.clone()),
3902 ])
3903 .unwrap();
3904
3905 let rows = converter
3906 .convert_columns(&[Arc::clone(&first), Arc::clone(&second)])
3907 .unwrap();
3908
3909 let back = converter.convert_rows(&rows).unwrap();
3910 assert_eq!(back.len(), 2);
3911 back[0].to_data().validate_full().unwrap();
3912 assert_eq!(&back[0], &first);
3913 back[1].to_data().validate_full().unwrap();
3914 assert_eq!(&back[1], &second);
3915 }
3916
3917 #[test]
3918 fn test_fixed_size_list_with_variable_width_content() {
3919 let mut first = FixedSizeListBuilder::new(
3920 StructBuilder::from_fields(
3921 vec![
3922 Field::new(
3923 "timestamp",
3924 DataType::Timestamp(TimeUnit::Microsecond, Some(Arc::from("UTC"))),
3925 false,
3926 ),
3927 Field::new("offset_minutes", DataType::Int16, false),
3928 Field::new("time_zone", DataType::Utf8, false),
3929 ],
3930 1,
3931 ),
3932 1,
3933 );
3934 first
3936 .values()
3937 .field_builder::<TimestampMicrosecondBuilder>(0)
3938 .unwrap()
3939 .append_null();
3940 first
3941 .values()
3942 .field_builder::<Int16Builder>(1)
3943 .unwrap()
3944 .append_null();
3945 first
3946 .values()
3947 .field_builder::<StringBuilder>(2)
3948 .unwrap()
3949 .append_null();
3950 first.values().append(false);
3951 first.append(false);
3952 first
3954 .values()
3955 .field_builder::<TimestampMicrosecondBuilder>(0)
3956 .unwrap()
3957 .append_null();
3958 first
3959 .values()
3960 .field_builder::<Int16Builder>(1)
3961 .unwrap()
3962 .append_null();
3963 first
3964 .values()
3965 .field_builder::<StringBuilder>(2)
3966 .unwrap()
3967 .append_null();
3968 first.values().append(false);
3969 first.append(true);
3970 first
3972 .values()
3973 .field_builder::<TimestampMicrosecondBuilder>(0)
3974 .unwrap()
3975 .append_value(0);
3976 first
3977 .values()
3978 .field_builder::<Int16Builder>(1)
3979 .unwrap()
3980 .append_value(0);
3981 first
3982 .values()
3983 .field_builder::<StringBuilder>(2)
3984 .unwrap()
3985 .append_value("UTC");
3986 first.values().append(true);
3987 first.append(true);
3988 first
3990 .values()
3991 .field_builder::<TimestampMicrosecondBuilder>(0)
3992 .unwrap()
3993 .append_value(1126351800123456);
3994 first
3995 .values()
3996 .field_builder::<Int16Builder>(1)
3997 .unwrap()
3998 .append_value(120);
3999 first
4000 .values()
4001 .field_builder::<StringBuilder>(2)
4002 .unwrap()
4003 .append_value("Europe/Warsaw");
4004 first.values().append(true);
4005 first.append(true);
4006 let first = Arc::new(first.finish()) as ArrayRef;
4007 let first_type = first.data_type().clone();
4008
4009 let mut second = StringBuilder::new();
4010 second.append_value("somewhere near");
4011 second.append_null();
4012 second.append_value("Greenwich");
4013 second.append_value("Warsaw");
4014 let second = Arc::new(second.finish()) as ArrayRef;
4015 let second_type = second.data_type().clone();
4016
4017 let converter = RowConverter::new(vec![
4018 SortField::new(first_type.clone()),
4019 SortField::new(second_type.clone()),
4020 ])
4021 .unwrap();
4022
4023 let rows = converter
4024 .convert_columns(&[Arc::clone(&first), Arc::clone(&second)])
4025 .unwrap();
4026
4027 let back = converter.convert_rows(&rows).unwrap();
4028 assert_eq!(back.len(), 2);
4029 back[0].to_data().validate_full().unwrap();
4030 assert_eq!(&back[0], &first);
4031 back[1].to_data().validate_full().unwrap();
4032 assert_eq!(&back[1], &second);
4033 }
4034
4035 fn generate_primitive_array<K>(
4036 rng: &mut impl RngCore,
4037 len: usize,
4038 valid_percent: f64,
4039 ) -> PrimitiveArray<K>
4040 where
4041 K: ArrowPrimitiveType,
4042 StandardUniform: Distribution<K::Native>,
4043 {
4044 (0..len)
4045 .map(|_| rng.random_bool(valid_percent).then(|| rng.random()))
4046 .collect()
4047 }
4048
4049 fn generate_boolean_array(
4050 rng: &mut impl RngCore,
4051 len: usize,
4052 valid_percent: f64,
4053 ) -> BooleanArray {
4054 (0..len)
4055 .map(|_| rng.random_bool(valid_percent).then(|| rng.random_bool(0.5)))
4056 .collect()
4057 }
4058
4059 fn generate_strings<O: OffsetSizeTrait>(
4060 rng: &mut impl RngCore,
4061 len: usize,
4062 valid_percent: f64,
4063 ) -> GenericStringArray<O> {
4064 (0..len)
4065 .map(|_| {
4066 rng.random_bool(valid_percent).then(|| {
4067 let len = rng.random_range(0..100);
4068 let bytes = (0..len).map(|_| rng.random_range(0..128)).collect();
4069 String::from_utf8(bytes).unwrap()
4070 })
4071 })
4072 .collect()
4073 }
4074
4075 fn generate_string_view(
4076 rng: &mut impl RngCore,
4077 len: usize,
4078 valid_percent: f64,
4079 ) -> StringViewArray {
4080 (0..len)
4081 .map(|_| {
4082 rng.random_bool(valid_percent).then(|| {
4083 let len = rng.random_range(0..100);
4084 let bytes = (0..len).map(|_| rng.random_range(0..128)).collect();
4085 String::from_utf8(bytes).unwrap()
4086 })
4087 })
4088 .collect()
4089 }
4090
4091 fn generate_byte_view(
4092 rng: &mut impl RngCore,
4093 len: usize,
4094 valid_percent: f64,
4095 ) -> BinaryViewArray {
4096 (0..len)
4097 .map(|_| {
4098 rng.random_bool(valid_percent).then(|| {
4099 let len = rng.random_range(0..100);
4100 let bytes: Vec<_> = (0..len).map(|_| rng.random_range(0..128)).collect();
4101 bytes
4102 })
4103 })
4104 .collect()
4105 }
4106
4107 fn generate_fixed_stringview_column(len: usize) -> StringViewArray {
4108 let edge_cases = vec![
4109 Some("bar".to_string()),
4110 Some("bar\0".to_string()),
4111 Some("LongerThan12Bytes".to_string()),
4112 Some("LongerThan12Bytez".to_string()),
4113 Some("LongerThan12Bytes\0".to_string()),
4114 Some("LongerThan12Byt".to_string()),
4115 Some("backend one".to_string()),
4116 Some("backend two".to_string()),
4117 Some("a".repeat(257)),
4118 Some("a".repeat(300)),
4119 ];
4120
4121 let mut values = Vec::with_capacity(len);
4123 for i in 0..len {
4124 values.push(
4125 edge_cases
4126 .get(i % edge_cases.len())
4127 .cloned()
4128 .unwrap_or(None),
4129 );
4130 }
4131
4132 StringViewArray::from(values)
4133 }
4134
4135 fn generate_dictionary<K>(
4136 rng: &mut impl RngCore,
4137 values: ArrayRef,
4138 len: usize,
4139 valid_percent: f64,
4140 ) -> DictionaryArray<K>
4141 where
4142 K: ArrowDictionaryKeyType,
4143 K::Native: SampleUniform,
4144 {
4145 let min_key = K::Native::from_usize(0).unwrap();
4146 let max_key = K::Native::from_usize(values.len()).unwrap();
4147 let keys: PrimitiveArray<K> = (0..len)
4148 .map(|_| {
4149 rng.random_bool(valid_percent)
4150 .then(|| rng.random_range(min_key..max_key))
4151 })
4152 .collect();
4153
4154 let data_type =
4155 DataType::Dictionary(Box::new(K::DATA_TYPE), Box::new(values.data_type().clone()));
4156
4157 let data = keys
4158 .into_data()
4159 .into_builder()
4160 .data_type(data_type)
4161 .add_child_data(values.to_data())
4162 .build()
4163 .unwrap();
4164
4165 DictionaryArray::from(data)
4166 }
4167
4168 fn generate_fixed_size_binary(
4169 rng: &mut impl RngCore,
4170 len: usize,
4171 valid_percent: f64,
4172 ) -> FixedSizeBinaryArray {
4173 let width = rng.random_range(0..20);
4174 let mut builder = FixedSizeBinaryBuilder::new(width);
4175
4176 let mut b = vec![0; width as usize];
4177 for _ in 0..len {
4178 match rng.random_bool(valid_percent) {
4179 true => {
4180 b.iter_mut().for_each(|x| *x = rng.random());
4181 builder.append_value(&b).unwrap();
4182 }
4183 false => builder.append_null(),
4184 }
4185 }
4186
4187 builder.finish()
4188 }
4189
4190 fn generate_struct(rng: &mut impl RngCore, len: usize, valid_percent: f64) -> StructArray {
4191 let nulls = NullBuffer::from_iter((0..len).map(|_| rng.random_bool(valid_percent)));
4192 let a = generate_primitive_array::<Int32Type>(rng, len, valid_percent);
4193 let b = generate_strings::<i32>(rng, len, valid_percent);
4194 let fields = Fields::from(vec![
4195 Field::new("a", DataType::Int32, true),
4196 Field::new("b", DataType::Utf8, true),
4197 ]);
4198 let values = vec![Arc::new(a) as _, Arc::new(b) as _];
4199 StructArray::new(fields, values, Some(nulls))
4200 }
4201
4202 fn generate_list<R: RngCore, F>(
4203 rng: &mut R,
4204 len: usize,
4205 valid_percent: f64,
4206 values: F,
4207 ) -> ListArray
4208 where
4209 F: FnOnce(&mut R, usize) -> ArrayRef,
4210 {
4211 let offsets = OffsetBuffer::<i32>::from_lengths((0..len).map(|_| rng.random_range(0..10)));
4212 let values_len = offsets.last().unwrap().to_usize().unwrap();
4213 let values = values(rng, values_len);
4214 let nulls = NullBuffer::from_iter((0..len).map(|_| rng.random_bool(valid_percent)));
4215 let field = Arc::new(Field::new_list_field(values.data_type().clone(), true));
4216 ListArray::new(field, offsets, values, Some(nulls))
4217 }
4218
4219 fn generate_list_view<F>(
4220 rng: &mut impl RngCore,
4221 len: usize,
4222 valid_percent: f64,
4223 values: F,
4224 ) -> ListViewArray
4225 where
4226 F: FnOnce(usize) -> ArrayRef,
4227 {
4228 let sizes: Vec<i32> = (0..len).map(|_| rng.random_range(0..10)).collect();
4230 let values_len: usize = sizes.iter().map(|s| *s as usize).sum::<usize>().max(1);
4231 let values = values(values_len);
4232
4233 let offsets: Vec<i32> = sizes
4235 .iter()
4236 .map(|&size| {
4237 if size == 0 {
4238 0
4239 } else {
4240 rng.random_range(0..=(values_len as i32 - size))
4241 }
4242 })
4243 .collect();
4244
4245 let nulls = NullBuffer::from_iter((0..len).map(|_| rng.random_bool(valid_percent)));
4246 let field = Arc::new(Field::new_list_field(values.data_type().clone(), true));
4247 ListViewArray::new(
4248 field,
4249 ScalarBuffer::from(offsets),
4250 ScalarBuffer::from(sizes),
4251 values,
4252 Some(nulls),
4253 )
4254 }
4255
4256 fn generate_nulls(rng: &mut impl RngCore, len: usize) -> Option<NullBuffer> {
4257 Some(NullBuffer::from_iter(
4258 (0..len).map(|_| rng.random_bool(0.8)),
4259 ))
4260 }
4261
4262 fn change_underlying_null_values_for_primitive<T: ArrowPrimitiveType>(
4263 array: &PrimitiveArray<T>,
4264 ) -> PrimitiveArray<T> {
4265 let (dt, values, nulls) = array.clone().into_parts();
4266
4267 let new_values = ScalarBuffer::<T::Native>::from_iter(
4268 values
4269 .iter()
4270 .zip(nulls.as_ref().unwrap().iter())
4271 .map(|(val, is_valid)| {
4272 if is_valid {
4273 *val
4274 } else {
4275 val.add_wrapping(T::Native::usize_as(1))
4276 }
4277 }),
4278 );
4279
4280 PrimitiveArray::new(new_values, nulls).with_data_type(dt)
4281 }
4282
4283 fn change_underline_null_values_for_byte_array<T: ByteArrayType>(
4284 array: &GenericByteArray<T>,
4285 ) -> GenericByteArray<T> {
4286 let (offsets, values, nulls) = array.clone().into_parts();
4287
4288 let new_offsets = OffsetBuffer::<T::Offset>::from_lengths(
4289 offsets
4290 .lengths()
4291 .zip(nulls.as_ref().unwrap().iter())
4292 .map(|(len, is_valid)| if is_valid { len } else { len + 1 }),
4293 );
4294
4295 let mut new_bytes = Vec::<u8>::with_capacity(new_offsets[new_offsets.len() - 1].as_usize());
4296
4297 offsets
4298 .windows(2)
4299 .zip(nulls.as_ref().unwrap().iter())
4300 .for_each(|(start_and_end, is_valid)| {
4301 let start = start_and_end[0].as_usize();
4302 let end = start_and_end[1].as_usize();
4303 new_bytes.extend_from_slice(&values.as_slice()[start..end]);
4304
4305 if !is_valid {
4307 new_bytes.push(b'c');
4308 }
4309 });
4310
4311 GenericByteArray::<T>::new(new_offsets, Buffer::from_vec(new_bytes), nulls)
4312 }
4313
4314 fn change_underline_null_values_for_list_array<O: OffsetSizeTrait>(
4315 array: &GenericListArray<O>,
4316 ) -> GenericListArray<O> {
4317 let (field, offsets, values, nulls) = array.clone().into_parts();
4318
4319 let (new_values, new_offsets) = {
4320 let concat_values = offsets
4321 .windows(2)
4322 .zip(nulls.as_ref().unwrap().iter())
4323 .map(|(start_and_end, is_valid)| {
4324 let start = start_and_end[0].as_usize();
4325 let end = start_and_end[1].as_usize();
4326 if is_valid {
4327 return (start, end - start);
4328 }
4329
4330 if end == values.len() {
4332 (start, (end - start).saturating_sub(1))
4333 } else {
4334 (start, end - start + 1)
4335 }
4336 })
4337 .map(|(start, length)| values.slice(start, length))
4338 .collect::<Vec<_>>();
4339
4340 let new_offsets =
4341 OffsetBuffer::<O>::from_lengths(concat_values.iter().map(|s| s.len()));
4342
4343 let new_values = {
4344 let values = concat_values.iter().map(|a| a.as_ref()).collect::<Vec<_>>();
4345 arrow_select::concat::concat(&values).expect("should be able to concat")
4346 };
4347
4348 (new_values, new_offsets)
4349 };
4350
4351 GenericListArray::<O>::new(field, new_offsets, new_values, nulls)
4352 }
4353
4354 fn change_underline_null_values(array: &ArrayRef) -> ArrayRef {
4355 if array.null_count() == 0 {
4356 return Arc::clone(array);
4357 }
4358
4359 downcast_primitive_array!(
4360 array => {
4361 let output = change_underlying_null_values_for_primitive(array);
4362
4363 Arc::new(output)
4364 }
4365
4366 DataType::Utf8 => {
4367 Arc::new(change_underline_null_values_for_byte_array(array.as_string::<i32>()))
4368 }
4369 DataType::LargeUtf8 => {
4370 Arc::new(change_underline_null_values_for_byte_array(array.as_string::<i64>()))
4371 }
4372 DataType::Binary => {
4373 Arc::new(change_underline_null_values_for_byte_array(array.as_binary::<i32>()))
4374 }
4375 DataType::LargeBinary => {
4376 Arc::new(change_underline_null_values_for_byte_array(array.as_binary::<i64>()))
4377 }
4378 DataType::List(_) => {
4379 Arc::new(change_underline_null_values_for_list_array(array.as_list::<i32>()))
4380 }
4381 DataType::LargeList(_) => {
4382 Arc::new(change_underline_null_values_for_list_array(array.as_list::<i64>()))
4383 }
4384 _ => {
4385 Arc::clone(array)
4386 }
4387 )
4388 }
4389
4390 fn generate_column(rng: &mut (impl RngCore + Clone), len: usize) -> ArrayRef {
4391 match rng.random_range(0..23) {
4392 0 => Arc::new(generate_primitive_array::<Int32Type>(rng, len, 0.8)),
4393 1 => Arc::new(generate_primitive_array::<UInt32Type>(rng, len, 0.8)),
4394 2 => Arc::new(generate_primitive_array::<Int64Type>(rng, len, 0.8)),
4395 3 => Arc::new(generate_primitive_array::<UInt64Type>(rng, len, 0.8)),
4396 4 => Arc::new(generate_primitive_array::<Float32Type>(rng, len, 0.8)),
4397 5 => Arc::new(generate_primitive_array::<Float64Type>(rng, len, 0.8)),
4398 6 => Arc::new(generate_strings::<i32>(rng, len, 0.8)),
4399 7 => {
4400 let dict_values_len = rng.random_range(1..len);
4401 let strings = Arc::new(generate_strings::<i32>(rng, dict_values_len, 1.0));
4403 Arc::new(generate_dictionary::<Int64Type>(rng, strings, len, 0.8))
4404 }
4405 8 => {
4406 let dict_values_len = rng.random_range(1..len);
4407 let values = Arc::new(generate_primitive_array::<Int64Type>(
4409 rng,
4410 dict_values_len,
4411 1.0,
4412 ));
4413 Arc::new(generate_dictionary::<Int64Type>(rng, values, len, 0.8))
4414 }
4415 9 => Arc::new(generate_fixed_size_binary(rng, len, 0.8)),
4416 10 => Arc::new(generate_struct(rng, len, 0.8)),
4417 11 => Arc::new(generate_list(rng, len, 0.8, |rng, values_len| {
4418 Arc::new(generate_primitive_array::<Int64Type>(rng, values_len, 0.8))
4419 })),
4420 12 => Arc::new(generate_list(rng, len, 0.8, |rng, values_len| {
4421 Arc::new(generate_strings::<i32>(rng, values_len, 0.8))
4422 })),
4423 13 => Arc::new(generate_list(rng, len, 0.8, |rng, values_len| {
4424 Arc::new(generate_struct(rng, values_len, 0.8))
4425 })),
4426 14 => Arc::new(generate_string_view(rng, len, 0.8)),
4427 15 => Arc::new(generate_byte_view(rng, len, 0.8)),
4428 16 => Arc::new(generate_fixed_stringview_column(len)),
4429 17 => Arc::new(
4430 generate_list(&mut rng.clone(), len + 1000, 0.8, |rng, values_len| {
4431 Arc::new(generate_primitive_array::<Int64Type>(rng, values_len, 0.8))
4432 })
4433 .slice(500, len),
4434 ),
4435 18 => Arc::new(generate_boolean_array(rng, len, 0.8)),
4436 19 => Arc::new(generate_list_view(
4437 &mut rng.clone(),
4438 len,
4439 0.8,
4440 |values_len| Arc::new(generate_primitive_array::<Int64Type>(rng, values_len, 0.8)),
4441 )),
4442 20 => Arc::new(generate_list_view(
4443 &mut rng.clone(),
4444 len,
4445 0.8,
4446 |values_len| Arc::new(generate_strings::<i32>(rng, values_len, 0.8)),
4447 )),
4448 21 => Arc::new(generate_list_view(
4449 &mut rng.clone(),
4450 len,
4451 0.8,
4452 |values_len| Arc::new(generate_struct(rng, values_len, 0.8)),
4453 )),
4454 22 => Arc::new(
4455 generate_list_view(&mut rng.clone(), len + 1000, 0.8, |values_len| {
4456 Arc::new(generate_primitive_array::<Int64Type>(rng, values_len, 0.8))
4457 })
4458 .slice(500, len),
4459 ),
4460 _ => unreachable!(),
4461 }
4462 }
4463
4464 fn print_row(cols: &[SortColumn], row: usize) -> String {
4465 let t: Vec<_> = cols
4466 .iter()
4467 .map(|x| match x.values.is_valid(row) {
4468 true => {
4469 let opts = FormatOptions::default().with_null("NULL");
4470 let formatter = ArrayFormatter::try_new(x.values.as_ref(), &opts).unwrap();
4471 formatter.value(row).to_string()
4472 }
4473 false => "NULL".to_string(),
4474 })
4475 .collect();
4476 t.join(",")
4477 }
4478
4479 fn print_col_types(cols: &[SortColumn]) -> String {
4480 let t: Vec<_> = cols
4481 .iter()
4482 .map(|x| x.values.data_type().to_string())
4483 .collect();
4484 t.join(",")
4485 }
4486
4487 #[derive(Debug, PartialEq)]
4488 enum Nulls {
4489 AsIs,
4491
4492 Different,
4494
4495 None,
4497 }
4498
4499 #[test]
4500 #[cfg_attr(miri, ignore)]
4501 fn fuzz_test() {
4502 let mut rng = StdRng::seed_from_u64(42);
4503 for _ in 0..100 {
4504 for null_behavior in [Nulls::AsIs, Nulls::Different, Nulls::None] {
4505 let num_columns = rng.random_range(1..5);
4506 let len = rng.random_range(5..100);
4507 let mut arrays: Vec<_> = (0..num_columns)
4508 .map(|_| generate_column(&mut rng, len))
4509 .collect();
4510
4511 match null_behavior {
4512 Nulls::AsIs => {
4513 }
4515 Nulls::Different => {
4516 arrays = arrays
4518 .into_iter()
4519 .map(|a| replace_array_nulls(a, generate_nulls(&mut rng, len)))
4520 .collect()
4521 }
4522 Nulls::None => {
4523 arrays = arrays
4525 .into_iter()
4526 .map(|a| replace_array_nulls(a, None))
4527 .collect()
4528 }
4529 }
4530
4531 let options: Vec<_> = (0..num_columns)
4532 .map(|_| SortOptions {
4533 descending: rng.random_bool(0.5),
4534 nulls_first: rng.random_bool(0.5),
4535 })
4536 .collect();
4537
4538 let sort_columns: Vec<_> = options
4539 .iter()
4540 .zip(&arrays)
4541 .map(|(o, c)| SortColumn {
4542 values: Arc::clone(c),
4543 options: Some(*o),
4544 })
4545 .collect();
4546
4547 let comparator = LexicographicalComparator::try_new(&sort_columns).unwrap();
4548
4549 let columns: Vec<SortField> = options
4550 .into_iter()
4551 .zip(&arrays)
4552 .map(|(o, a)| SortField::new_with_options(a.data_type().clone(), o))
4553 .collect();
4554
4555 let converter = RowConverter::new(columns).unwrap();
4556 let rows = converter.convert_columns(&arrays).unwrap();
4557
4558 if !matches!(null_behavior, Nulls::None) {
4561 assert_same_rows_when_changing_input_underlying_null_values(
4562 &arrays, &converter, &rows,
4563 );
4564 }
4565
4566 for i in 0..len {
4567 for j in 0..len {
4568 let row_i = rows.row(i);
4569 let row_j = rows.row(j);
4570 let row_cmp = row_i.cmp(&row_j);
4571 let lex_cmp = comparator.compare(i, j);
4572 assert_eq!(
4573 row_cmp,
4574 lex_cmp,
4575 "({:?} vs {:?}) vs ({:?} vs {:?}) for types {}",
4576 print_row(&sort_columns, i),
4577 print_row(&sort_columns, j),
4578 row_i,
4579 row_j,
4580 print_col_types(&sort_columns)
4581 );
4582 }
4583 }
4584
4585 {
4587 let mut rows_iter = rows.iter();
4588 let mut rows_lengths_iter = rows.lengths();
4589 for (index, row) in rows_iter.by_ref().enumerate() {
4590 let len = rows_lengths_iter
4591 .next()
4592 .expect("Reached end of length iterator while still have rows");
4593 assert_eq!(
4594 row.data.len(),
4595 len,
4596 "Row length mismatch: {} vs {}",
4597 row.data.len(),
4598 len
4599 );
4600 assert_eq!(
4601 len,
4602 rows.row_len(index),
4603 "Row length mismatch at index {}: {} vs {}",
4604 index,
4605 len,
4606 rows.row_len(index)
4607 );
4608 }
4609
4610 assert_eq!(
4611 rows_lengths_iter.next(),
4612 None,
4613 "Length iterator did not reach end"
4614 );
4615 }
4616
4617 let back = converter.convert_rows(&rows).unwrap();
4620 for (actual, expected) in back.iter().zip(&arrays) {
4621 actual.to_data().validate_full().unwrap();
4622 dictionary_eq(actual, expected)
4623 }
4624
4625 let rows = rows.try_into_binary().expect("reasonable size");
4628 let parser = converter.parser();
4629 let back = converter
4630 .convert_rows(rows.iter().map(|b| parser.parse(b.expect("valid bytes"))))
4631 .unwrap();
4632 for (actual, expected) in back.iter().zip(&arrays) {
4633 actual.to_data().validate_full().unwrap();
4634 dictionary_eq(actual, expected)
4635 }
4636
4637 let rows = converter.from_binary(rows);
4638 let back = converter.convert_rows(&rows).unwrap();
4639 for (actual, expected) in back.iter().zip(&arrays) {
4640 actual.to_data().validate_full().unwrap();
4641 dictionary_eq(actual, expected)
4642 }
4643 }
4644 }
4645 }
4646
4647 fn replace_array_nulls(array: ArrayRef, new_nulls: Option<NullBuffer>) -> ArrayRef {
4648 make_array(
4649 array
4650 .into_data()
4651 .into_builder()
4652 .nulls(new_nulls)
4654 .build()
4655 .unwrap(),
4656 )
4657 }
4658
4659 fn assert_same_rows_when_changing_input_underlying_null_values(
4660 arrays: &[ArrayRef],
4661 converter: &RowConverter,
4662 rows: &Rows,
4663 ) {
4664 let arrays_with_different_data_behind_nulls = arrays
4665 .iter()
4666 .map(|arr| change_underline_null_values(arr))
4667 .collect::<Vec<_>>();
4668
4669 if arrays
4671 .iter()
4672 .zip(arrays_with_different_data_behind_nulls.iter())
4673 .all(|(a, b)| Arc::ptr_eq(a, b))
4674 {
4675 return;
4676 }
4677
4678 let rows_with_different_nulls = converter
4679 .convert_columns(&arrays_with_different_data_behind_nulls)
4680 .unwrap();
4681
4682 assert_eq!(
4683 rows.iter().collect::<Vec<_>>(),
4684 rows_with_different_nulls.iter().collect::<Vec<_>>(),
4685 "Different underlying nulls should not output different rows"
4686 )
4687 }
4688
4689 #[test]
4690 fn test_clear() {
4691 let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
4692 let mut rows = converter.empty_rows(3, 128);
4693
4694 let first = Int32Array::from(vec![None, Some(2), Some(4)]);
4695 let second = Int32Array::from(vec![Some(2), None, Some(4)]);
4696 let arrays = [Arc::new(first) as ArrayRef, Arc::new(second) as ArrayRef];
4697
4698 for array in arrays.iter() {
4699 rows.clear();
4700 converter
4701 .append(&mut rows, std::slice::from_ref(array))
4702 .unwrap();
4703 let back = converter.convert_rows(&rows).unwrap();
4704 assert_eq!(&back[0], array);
4705 }
4706
4707 let mut rows_expected = converter.empty_rows(3, 128);
4708 converter.append(&mut rows_expected, &arrays[1..]).unwrap();
4709
4710 for (i, (actual, expected)) in rows.iter().zip(rows_expected.iter()).enumerate() {
4711 assert_eq!(
4712 actual, expected,
4713 "For row {i}: expected {expected:?}, actual: {actual:?}",
4714 );
4715 }
4716 }
4717
4718 #[test]
4719 fn test_append_codec_dictionary_binary() {
4720 use DataType::*;
4721 let converter = RowConverter::new(vec![SortField::new(Dictionary(
4723 Box::new(Int32),
4724 Box::new(Binary),
4725 ))])
4726 .unwrap();
4727 let mut rows = converter.empty_rows(4, 128);
4728
4729 let keys = Int32Array::from_iter_values([0, 1, 2, 3]);
4730 let values = BinaryArray::from(vec![
4731 Some("a".as_bytes()),
4732 Some(b"b"),
4733 Some(b"c"),
4734 Some(b"d"),
4735 ]);
4736 let dict_array = DictionaryArray::new(keys, Arc::new(values));
4737
4738 rows.clear();
4739 let array = Arc::new(dict_array) as ArrayRef;
4740 converter
4741 .append(&mut rows, std::slice::from_ref(&array))
4742 .unwrap();
4743 let back = converter.convert_rows(&rows).unwrap();
4744
4745 dictionary_eq(&back[0], &array);
4746 }
4747
4748 #[test]
4749 fn test_list_prefix() {
4750 let mut a = ListBuilder::new(Int8Builder::new());
4751 a.append_value([None]);
4752 a.append_value([None, None]);
4753 let a = a.finish();
4754
4755 let converter = RowConverter::new(vec![SortField::new(a.data_type().clone())]).unwrap();
4756 let rows = converter.convert_columns(&[Arc::new(a) as _]).unwrap();
4757 assert_eq!(rows.row(0).cmp(&rows.row(1)), Ordering::Less);
4758 }
4759
4760 #[test]
4761 fn map_should_be_marked_as_unsupported() {
4762 let map_data_type = Field::new_map(
4763 "map",
4764 "entries",
4765 Field::new("key", DataType::Utf8, false),
4766 Field::new("value", DataType::Utf8, true),
4767 false,
4768 true,
4769 )
4770 .data_type()
4771 .clone();
4772
4773 let is_supported = RowConverter::supports_fields(&[SortField::new(map_data_type)]);
4774
4775 assert!(!is_supported, "Map should not be supported");
4776 }
4777
4778 #[test]
4779 fn should_fail_to_create_row_converter_for_unsupported_map_type() {
4780 let map_data_type = Field::new_map(
4781 "map",
4782 "entries",
4783 Field::new("key", DataType::Utf8, false),
4784 Field::new("value", DataType::Utf8, true),
4785 false,
4786 true,
4787 )
4788 .data_type()
4789 .clone();
4790
4791 let converter = RowConverter::new(vec![SortField::new(map_data_type)]);
4792
4793 match converter {
4794 Err(ArrowError::NotYetImplemented(message)) => {
4795 assert!(
4796 message.contains("Row format support not yet implemented for"),
4797 "Expected NotYetImplemented error for map data type, got: {message}",
4798 );
4799 }
4800 Err(e) => panic!("Expected NotYetImplemented error, got: {e}"),
4801 Ok(_) => panic!("Expected NotYetImplemented error for map data type"),
4802 }
4803 }
4804
4805 #[test]
4806 fn test_values_buffer_smaller_when_utf8_validation_disabled() {
4807 fn get_values_buffer_len(col: ArrayRef) -> (usize, usize) {
4808 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8View)]).unwrap();
4810
4811 let rows = converter.convert_columns(&[col]).unwrap();
4813 let converted = converter.convert_rows(&rows).unwrap();
4814 let unchecked_values_len = converted[0].as_string_view().data_buffers()[0].len();
4815
4816 let rows = rows.try_into_binary().expect("reasonable size");
4818 let parser = converter.parser();
4819 let converted = converter
4820 .convert_rows(rows.iter().map(|b| parser.parse(b.expect("valid bytes"))))
4821 .unwrap();
4822 let checked_values_len = converted[0].as_string_view().data_buffers()[0].len();
4823 (unchecked_values_len, checked_values_len)
4824 }
4825
4826 let col = Arc::new(StringViewArray::from_iter([
4828 Some("hello"), None, Some("short"), Some("tiny"), ])) as ArrayRef;
4833
4834 let (unchecked_values_len, checked_values_len) = get_values_buffer_len(col);
4835 assert_eq!(unchecked_values_len, 0);
4837 assert_eq!(checked_values_len, 14);
4839
4840 let col = Arc::new(StringViewArray::from_iter([
4842 Some("this is a very long string over 12 bytes"),
4843 Some("another long string to test the buffer"),
4844 ])) as ArrayRef;
4845
4846 let (unchecked_values_len, checked_values_len) = get_values_buffer_len(col);
4847 assert!(unchecked_values_len > 0);
4849 assert_eq!(unchecked_values_len, checked_values_len);
4850
4851 let col = Arc::new(StringViewArray::from_iter([
4853 Some("tiny"), Some("thisisexact13"), None,
4856 Some("short"), ])) as ArrayRef;
4858
4859 let (unchecked_values_len, checked_values_len) = get_values_buffer_len(col);
4860 assert_eq!(unchecked_values_len, 13);
4862 assert!(checked_values_len > unchecked_values_len);
4863 }
4864
4865 #[test]
4866 fn test_sparse_union() {
4867 let int_array = Int32Array::from(vec![Some(1), None, Some(3), None, Some(5)]);
4869 let str_array = StringArray::from(vec![None, Some("b"), None, Some("d"), None]);
4870
4871 let type_ids = vec![0, 1, 0, 1, 0].into();
4873
4874 let union_fields = [
4875 (0, Arc::new(Field::new("int", DataType::Int32, false))),
4876 (1, Arc::new(Field::new("str", DataType::Utf8, false))),
4877 ]
4878 .into_iter()
4879 .collect();
4880
4881 let union_array = UnionArray::try_new(
4882 union_fields,
4883 type_ids,
4884 None,
4885 vec![Arc::new(int_array) as ArrayRef, Arc::new(str_array)],
4886 )
4887 .unwrap();
4888
4889 let union_type = union_array.data_type().clone();
4890 let converter = RowConverter::new(vec![SortField::new(union_type)]).unwrap();
4891
4892 let rows = converter
4893 .convert_columns(&[Arc::new(union_array.clone())])
4894 .unwrap();
4895
4896 let back = converter.convert_rows(&rows).unwrap();
4898 let back_union = back[0].as_any().downcast_ref::<UnionArray>().unwrap();
4899
4900 assert_eq!(union_array.len(), back_union.len());
4901 for i in 0..union_array.len() {
4902 assert_eq!(union_array.type_id(i), back_union.type_id(i));
4903 }
4904 }
4905
4906 #[test]
4907 fn test_sparse_union_with_nulls() {
4908 let int_array = Int32Array::from(vec![Some(1), None, Some(3), None, Some(5)]);
4910 let str_array = StringArray::from(vec![None::<&str>; 5]);
4911
4912 let type_ids = vec![0, 1, 0, 1, 0].into();
4914
4915 let union_fields = [
4916 (0, Arc::new(Field::new("int", DataType::Int32, true))),
4917 (1, Arc::new(Field::new("str", DataType::Utf8, true))),
4918 ]
4919 .into_iter()
4920 .collect();
4921
4922 let union_array = UnionArray::try_new(
4923 union_fields,
4924 type_ids,
4925 None,
4926 vec![Arc::new(int_array) as ArrayRef, Arc::new(str_array)],
4927 )
4928 .unwrap();
4929
4930 let union_type = union_array.data_type().clone();
4931 let converter = RowConverter::new(vec![SortField::new(union_type)]).unwrap();
4932
4933 let rows = converter
4934 .convert_columns(&[Arc::new(union_array.clone())])
4935 .unwrap();
4936
4937 let back = converter.convert_rows(&rows).unwrap();
4939 let back_union = back[0].as_any().downcast_ref::<UnionArray>().unwrap();
4940
4941 assert_eq!(union_array.len(), back_union.len());
4942 for i in 0..union_array.len() {
4943 let expected_null = union_array.is_null(i);
4944 let actual_null = back_union.is_null(i);
4945 assert_eq!(expected_null, actual_null, "Null mismatch at index {i}");
4946 if !expected_null {
4947 assert_eq!(union_array.type_id(i), back_union.type_id(i));
4948 }
4949 }
4950 }
4951
4952 #[test]
4953 fn test_dense_union() {
4954 let int_array = Int32Array::from(vec![1, 3, 5]);
4956 let str_array = StringArray::from(vec!["a", "b"]);
4957
4958 let type_ids = vec![0, 1, 0, 1, 0].into();
4959
4960 let offsets = vec![0, 0, 1, 1, 2].into();
4962
4963 let union_fields = [
4964 (0, Arc::new(Field::new("int", DataType::Int32, false))),
4965 (1, Arc::new(Field::new("str", DataType::Utf8, false))),
4966 ]
4967 .into_iter()
4968 .collect();
4969
4970 let union_array = UnionArray::try_new(
4971 union_fields,
4972 type_ids,
4973 Some(offsets), vec![Arc::new(int_array) as ArrayRef, Arc::new(str_array)],
4975 )
4976 .unwrap();
4977
4978 let union_type = union_array.data_type().clone();
4979 let converter = RowConverter::new(vec![SortField::new(union_type)]).unwrap();
4980
4981 let rows = converter
4982 .convert_columns(&[Arc::new(union_array.clone())])
4983 .unwrap();
4984
4985 let back = converter.convert_rows(&rows).unwrap();
4987 let back_union = back[0].as_any().downcast_ref::<UnionArray>().unwrap();
4988
4989 assert_eq!(union_array.len(), back_union.len());
4990 for i in 0..union_array.len() {
4991 assert_eq!(union_array.type_id(i), back_union.type_id(i));
4992 }
4993 }
4994
4995 #[test]
4996 fn test_dense_union_with_nulls() {
4997 let int_array = Int32Array::from(vec![Some(1), None, Some(5)]);
4999 let str_array = StringArray::from(vec![Some("a"), None]);
5000
5001 let type_ids = vec![0, 1, 0, 1, 0].into();
5003 let offsets = vec![0, 0, 1, 1, 2].into();
5004
5005 let union_fields = [
5006 (0, Arc::new(Field::new("int", DataType::Int32, true))),
5007 (1, Arc::new(Field::new("str", DataType::Utf8, true))),
5008 ]
5009 .into_iter()
5010 .collect();
5011
5012 let union_array = UnionArray::try_new(
5013 union_fields,
5014 type_ids,
5015 Some(offsets),
5016 vec![Arc::new(int_array) as ArrayRef, Arc::new(str_array)],
5017 )
5018 .unwrap();
5019
5020 let union_type = union_array.data_type().clone();
5021 let converter = RowConverter::new(vec![SortField::new(union_type)]).unwrap();
5022
5023 let rows = converter
5024 .convert_columns(&[Arc::new(union_array.clone())])
5025 .unwrap();
5026
5027 let back = converter.convert_rows(&rows).unwrap();
5029 let back_union = back[0].as_any().downcast_ref::<UnionArray>().unwrap();
5030
5031 assert_eq!(union_array.len(), back_union.len());
5032 for i in 0..union_array.len() {
5033 let expected_null = union_array.is_null(i);
5034 let actual_null = back_union.is_null(i);
5035 assert_eq!(expected_null, actual_null, "Null mismatch at index {i}");
5036 if !expected_null {
5037 assert_eq!(union_array.type_id(i), back_union.type_id(i));
5038 }
5039 }
5040 }
5041
5042 #[test]
5043 fn test_union_ordering() {
5044 let int_array = Int32Array::from(vec![100, 5, 20]);
5045 let str_array = StringArray::from(vec!["z", "a"]);
5046
5047 let type_ids = vec![0, 1, 0, 1, 0].into();
5049 let offsets = vec![0, 0, 1, 1, 2].into();
5050
5051 let union_fields = [
5052 (0, Arc::new(Field::new("int", DataType::Int32, false))),
5053 (1, Arc::new(Field::new("str", DataType::Utf8, false))),
5054 ]
5055 .into_iter()
5056 .collect();
5057
5058 let union_array = UnionArray::try_new(
5059 union_fields,
5060 type_ids,
5061 Some(offsets),
5062 vec![Arc::new(int_array) as ArrayRef, Arc::new(str_array)],
5063 )
5064 .unwrap();
5065
5066 let union_type = union_array.data_type().clone();
5067 let converter = RowConverter::new(vec![SortField::new(union_type)]).unwrap();
5068
5069 let rows = converter.convert_columns(&[Arc::new(union_array)]).unwrap();
5070
5071 assert!(rows.row(2) < rows.row(1));
5083
5084 assert!(rows.row(0) < rows.row(3));
5086
5087 assert!(rows.row(2) < rows.row(4));
5090 assert!(rows.row(4) < rows.row(0));
5092
5093 assert!(rows.row(3) < rows.row(1));
5096 }
5097
5098 #[test]
5099 fn test_row_converter_roundtrip_with_many_union_columns() {
5100 let fields1 = UnionFields::try_new(
5102 vec![0, 1],
5103 vec![
5104 Field::new("int", DataType::Int32, true),
5105 Field::new("string", DataType::Utf8, true),
5106 ],
5107 )
5108 .unwrap();
5109
5110 let int_array1 = Int32Array::from(vec![Some(67), None]);
5111 let string_array1 = StringArray::from(vec![None::<&str>, Some("hello")]);
5112 let type_ids1 = vec![0i8, 1].into();
5113
5114 let union_array1 = UnionArray::try_new(
5115 fields1.clone(),
5116 type_ids1,
5117 None,
5118 vec![
5119 Arc::new(int_array1) as ArrayRef,
5120 Arc::new(string_array1) as ArrayRef,
5121 ],
5122 )
5123 .unwrap();
5124
5125 let fields2 = UnionFields::try_new(
5127 vec![0, 1],
5128 vec![
5129 Field::new("int", DataType::Int32, true),
5130 Field::new("string", DataType::Utf8, true),
5131 ],
5132 )
5133 .unwrap();
5134
5135 let int_array2 = Int32Array::from(vec![Some(100), None]);
5136 let string_array2 = StringArray::from(vec![None::<&str>, Some("world")]);
5137 let type_ids2 = vec![0i8, 1].into();
5138
5139 let union_array2 = UnionArray::try_new(
5140 fields2.clone(),
5141 type_ids2,
5142 None,
5143 vec![
5144 Arc::new(int_array2) as ArrayRef,
5145 Arc::new(string_array2) as ArrayRef,
5146 ],
5147 )
5148 .unwrap();
5149
5150 let field1 = Field::new("col1", DataType::Union(fields1, UnionMode::Sparse), true);
5152 let field2 = Field::new("col2", DataType::Union(fields2, UnionMode::Sparse), true);
5153
5154 let sort_field1 = SortField::new(field1.data_type().clone());
5155 let sort_field2 = SortField::new(field2.data_type().clone());
5156
5157 let converter = RowConverter::new(vec![sort_field1, sort_field2]).unwrap();
5158
5159 let rows = converter
5160 .convert_columns(&[
5161 Arc::new(union_array1.clone()) as ArrayRef,
5162 Arc::new(union_array2.clone()) as ArrayRef,
5163 ])
5164 .unwrap();
5165
5166 let out = converter.convert_rows(&rows).unwrap();
5168
5169 let [col1, col2] = out.as_slice() else {
5170 panic!("expected 2 columns")
5171 };
5172
5173 let col1 = col1.as_any().downcast_ref::<UnionArray>().unwrap();
5174 let col2 = col2.as_any().downcast_ref::<UnionArray>().unwrap();
5175
5176 for (expected, got) in [union_array1, union_array2].iter().zip([col1, col2]) {
5177 assert_eq!(expected.len(), got.len());
5178 assert_eq!(expected.type_ids(), got.type_ids());
5179
5180 for i in 0..expected.len() {
5181 assert_eq!(expected.value(i).as_ref(), got.value(i).as_ref());
5182 }
5183 }
5184 }
5185
5186 #[test]
5187 fn test_row_converter_roundtrip_with_one_union_column() {
5188 let fields = UnionFields::try_new(
5189 vec![0, 1],
5190 vec![
5191 Field::new("int", DataType::Int32, true),
5192 Field::new("string", DataType::Utf8, true),
5193 ],
5194 )
5195 .unwrap();
5196
5197 let int_array = Int32Array::from(vec![Some(67), None]);
5198 let string_array = StringArray::from(vec![None::<&str>, Some("hello")]);
5199 let type_ids = vec![0i8, 1].into();
5200
5201 let union_array = UnionArray::try_new(
5202 fields.clone(),
5203 type_ids,
5204 None,
5205 vec![
5206 Arc::new(int_array) as ArrayRef,
5207 Arc::new(string_array) as ArrayRef,
5208 ],
5209 )
5210 .unwrap();
5211
5212 let field = Field::new("col", DataType::Union(fields, UnionMode::Sparse), true);
5213 let sort_field = SortField::new(field.data_type().clone());
5214 let converter = RowConverter::new(vec![sort_field]).unwrap();
5215
5216 let rows = converter
5217 .convert_columns(&[Arc::new(union_array.clone()) as ArrayRef])
5218 .unwrap();
5219
5220 let out = converter.convert_rows(&rows).unwrap();
5222
5223 let [col1] = out.as_slice() else {
5224 panic!("expected 1 column")
5225 };
5226
5227 let col = col1.as_any().downcast_ref::<UnionArray>().unwrap();
5228 assert_eq!(col.len(), union_array.len());
5229 assert_eq!(col.type_ids(), union_array.type_ids());
5230
5231 for i in 0..col.len() {
5232 assert_eq!(col.value(i).as_ref(), union_array.value(i).as_ref());
5233 }
5234 }
5235
5236 #[test]
5237 fn test_row_converter_roundtrip_with_non_default_union_type_ids() {
5238 let fields = UnionFields::try_new(
5240 vec![70, 85],
5241 vec![
5242 Field::new("int", DataType::Int32, true),
5243 Field::new("string", DataType::Utf8, true),
5244 ],
5245 )
5246 .unwrap();
5247
5248 let int_array = Int32Array::from(vec![Some(67), None]);
5249 let string_array = StringArray::from(vec![None::<&str>, Some("hello")]);
5250 let type_ids = vec![70i8, 85].into();
5251
5252 let union_array = UnionArray::try_new(
5253 fields.clone(),
5254 type_ids,
5255 None,
5256 vec![
5257 Arc::new(int_array) as ArrayRef,
5258 Arc::new(string_array) as ArrayRef,
5259 ],
5260 )
5261 .unwrap();
5262
5263 let field = Field::new("col", DataType::Union(fields, UnionMode::Sparse), true);
5264 let sort_field = SortField::new(field.data_type().clone());
5265 let converter = RowConverter::new(vec![sort_field]).unwrap();
5266
5267 let rows = converter
5268 .convert_columns(&[Arc::new(union_array.clone()) as ArrayRef])
5269 .unwrap();
5270
5271 let out = converter.convert_rows(&rows).unwrap();
5273
5274 let [col1] = out.as_slice() else {
5275 panic!("expected 1 column")
5276 };
5277
5278 let col = col1.as_any().downcast_ref::<UnionArray>().unwrap();
5279 assert_eq!(col.len(), union_array.len());
5280 assert_eq!(col.type_ids(), union_array.type_ids());
5281
5282 for i in 0..col.len() {
5283 assert_eq!(col.value(i).as_ref(), union_array.value(i).as_ref());
5284 }
5285 }
5286
5287 #[test]
5288 fn rows_size_should_count_for_capacity() {
5289 let row_converter = RowConverter::new(vec![SortField::new(DataType::UInt8)]).unwrap();
5290
5291 let empty_rows_size_with_preallocate_rows_and_data = {
5292 let rows = row_converter.empty_rows(1000, 1000);
5293
5294 rows.size()
5295 };
5296 let empty_rows_size_with_preallocate_rows = {
5297 let rows = row_converter.empty_rows(1000, 0);
5298
5299 rows.size()
5300 };
5301 let empty_rows_size_with_preallocate_data = {
5302 let rows = row_converter.empty_rows(0, 1000);
5303
5304 rows.size()
5305 };
5306 let empty_rows_size_without_preallocate = {
5307 let rows = row_converter.empty_rows(0, 0);
5308
5309 rows.size()
5310 };
5311
5312 assert!(
5313 empty_rows_size_with_preallocate_rows_and_data > empty_rows_size_with_preallocate_rows,
5314 "{empty_rows_size_with_preallocate_rows_and_data} should be larger than {empty_rows_size_with_preallocate_rows}"
5315 );
5316 assert!(
5317 empty_rows_size_with_preallocate_rows_and_data > empty_rows_size_with_preallocate_data,
5318 "{empty_rows_size_with_preallocate_rows_and_data} should be larger than {empty_rows_size_with_preallocate_data}"
5319 );
5320 assert!(
5321 empty_rows_size_with_preallocate_rows > empty_rows_size_without_preallocate,
5322 "{empty_rows_size_with_preallocate_rows} should be larger than {empty_rows_size_without_preallocate}"
5323 );
5324 assert!(
5325 empty_rows_size_with_preallocate_data > empty_rows_size_without_preallocate,
5326 "{empty_rows_size_with_preallocate_data} should be larger than {empty_rows_size_without_preallocate}"
5327 );
5328 }
5329
5330 #[test]
5331 fn test_struct_no_child_fields() {
5332 fn run_test(array: ArrayRef) {
5333 let sort_fields = vec![SortField::new(array.data_type().clone())];
5334 let converter = RowConverter::new(sort_fields).unwrap();
5335 let r = converter.convert_columns(&[Arc::clone(&array)]).unwrap();
5336
5337 let back = converter.convert_rows(&r).unwrap();
5338 assert_eq!(back.len(), 1);
5339 assert_eq!(&back[0], &array);
5340 }
5341
5342 let s = Arc::new(StructArray::new_empty_fields(5, None)) as ArrayRef;
5343 run_test(s);
5344
5345 let s = Arc::new(StructArray::new_empty_fields(
5346 5,
5347 Some(vec![true, false, true, false, false].into()),
5348 )) as ArrayRef;
5349 run_test(s);
5350 }
5351
5352 #[test]
5353 fn reserve_should_increase_capacity_to_the_requested_size() {
5354 let row_converter = RowConverter::new(vec![SortField::new(DataType::UInt8)]).unwrap();
5355 let mut empty_rows = row_converter.empty_rows(0, 0);
5356 empty_rows.reserve(50, 50);
5357 let before_size = empty_rows.size();
5358 empty_rows.reserve(50, 50);
5359 assert_eq!(
5360 empty_rows.size(),
5361 before_size,
5362 "Size should not change when reserving already reserved space"
5363 );
5364 empty_rows.reserve(10, 20);
5365 assert_eq!(
5366 empty_rows.size(),
5367 before_size,
5368 "Size should not change when already have space for the expected reserved data"
5369 );
5370
5371 empty_rows.reserve(100, 20);
5372 assert!(
5373 empty_rows.size() > before_size,
5374 "Size should increase when reserving more space than previously reserved"
5375 );
5376
5377 let before_size = empty_rows.size();
5378
5379 empty_rows.reserve(20, 100);
5380 assert!(
5381 empty_rows.size() > before_size,
5382 "Size should increase when reserving more space than previously reserved"
5383 );
5384 }
5385
5386 #[test]
5387 fn empty_rows_should_return_empty_lengths_iterator() {
5388 let rows = RowConverter::new(vec![SortField::new(DataType::UInt8)])
5389 .unwrap()
5390 .empty_rows(0, 0);
5391 let mut lengths_iter = rows.lengths();
5392 assert_eq!(lengths_iter.next(), None);
5393 }
5394}