1#![doc(
129 html_logo_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_white-bg.svg",
130 html_favicon_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_transparent-bg.svg"
131)]
132#![cfg_attr(docsrs, feature(doc_auto_cfg))]
133#![warn(missing_docs)]
134use std::cmp::Ordering;
135use std::hash::{Hash, Hasher};
136use std::sync::Arc;
137
138use arrow_array::cast::*;
139use arrow_array::types::ArrowDictionaryKeyType;
140use arrow_array::*;
141use arrow_buffer::{ArrowNativeType, Buffer, OffsetBuffer, ScalarBuffer};
142use arrow_data::ArrayDataBuilder;
143use arrow_schema::*;
144use variable::{decode_binary_view, decode_string_view};
145
146use crate::fixed::{decode_bool, decode_fixed_size_binary, decode_primitive};
147use crate::list::{compute_lengths_fixed_size_list, encode_fixed_size_list};
148use crate::variable::{decode_binary, decode_string};
149use arrow_array::types::{Int16Type, Int32Type, Int64Type};
150
151mod fixed;
152mod list;
153mod run;
154mod variable;
155
156#[derive(Debug)]
413pub struct RowConverter {
414 fields: Arc<[SortField]>,
415 codecs: Vec<Codec>,
417}
418
419#[derive(Debug)]
420enum Codec {
421 Stateless,
423 Dictionary(RowConverter, OwnedRow),
426 Struct(RowConverter, OwnedRow),
429 List(RowConverter),
431 RunEndEncoded(RowConverter),
433}
434
435impl Codec {
436 fn new(sort_field: &SortField) -> Result<Self, ArrowError> {
437 match &sort_field.data_type {
438 DataType::Dictionary(_, values) => {
439 let sort_field =
440 SortField::new_with_options(values.as_ref().clone(), sort_field.options);
441
442 let converter = RowConverter::new(vec![sort_field])?;
443 let null_array = new_null_array(values.as_ref(), 1);
444 let nulls = converter.convert_columns(&[null_array])?;
445
446 let owned = OwnedRow {
447 data: nulls.buffer.into(),
448 config: nulls.config,
449 };
450 Ok(Self::Dictionary(converter, owned))
451 }
452 DataType::RunEndEncoded(_, values) => {
453 let options = SortOptions {
455 descending: false,
456 nulls_first: sort_field.options.nulls_first != sort_field.options.descending,
457 };
458
459 let field = SortField::new_with_options(values.data_type().clone(), options);
460 let converter = RowConverter::new(vec![field])?;
461 Ok(Self::RunEndEncoded(converter))
462 }
463 d if !d.is_nested() => Ok(Self::Stateless),
464 DataType::List(f) | DataType::LargeList(f) => {
465 let options = SortOptions {
469 descending: false,
470 nulls_first: sort_field.options.nulls_first != sort_field.options.descending,
471 };
472
473 let field = SortField::new_with_options(f.data_type().clone(), options);
474 let converter = RowConverter::new(vec![field])?;
475 Ok(Self::List(converter))
476 }
477 DataType::FixedSizeList(f, _) => {
478 let field = SortField::new_with_options(f.data_type().clone(), sort_field.options);
479 let converter = RowConverter::new(vec![field])?;
480 Ok(Self::List(converter))
481 }
482 DataType::Struct(f) => {
483 let sort_fields = f
484 .iter()
485 .map(|x| SortField::new_with_options(x.data_type().clone(), sort_field.options))
486 .collect();
487
488 let converter = RowConverter::new(sort_fields)?;
489 let nulls: Vec<_> = f.iter().map(|x| new_null_array(x.data_type(), 1)).collect();
490
491 let nulls = converter.convert_columns(&nulls)?;
492 let owned = OwnedRow {
493 data: nulls.buffer.into(),
494 config: nulls.config,
495 };
496
497 Ok(Self::Struct(converter, owned))
498 }
499 _ => Err(ArrowError::NotYetImplemented(format!(
500 "not yet implemented: {:?}",
501 sort_field.data_type
502 ))),
503 }
504 }
505
506 fn encoder(&self, array: &dyn Array) -> Result<Encoder<'_>, ArrowError> {
507 match self {
508 Codec::Stateless => Ok(Encoder::Stateless),
509 Codec::Dictionary(converter, nulls) => {
510 let values = array.as_any_dictionary().values().clone();
511 let rows = converter.convert_columns(&[values])?;
512 Ok(Encoder::Dictionary(rows, nulls.row()))
513 }
514 Codec::Struct(converter, null) => {
515 let v = as_struct_array(array);
516 let rows = converter.convert_columns(v.columns())?;
517 Ok(Encoder::Struct(rows, null.row()))
518 }
519 Codec::List(converter) => {
520 let values = match array.data_type() {
521 DataType::List(_) => as_list_array(array).values(),
522 DataType::LargeList(_) => as_large_list_array(array).values(),
523 DataType::FixedSizeList(_, _) => as_fixed_size_list_array(array).values(),
524 _ => unreachable!(),
525 };
526 let rows = converter.convert_columns(&[values.clone()])?;
527 Ok(Encoder::List(rows))
528 }
529 Codec::RunEndEncoded(converter) => {
530 let values = match array.data_type() {
531 DataType::RunEndEncoded(r, _) => match r.data_type() {
532 DataType::Int16 => array.as_run::<Int16Type>().values(),
533 DataType::Int32 => array.as_run::<Int32Type>().values(),
534 DataType::Int64 => array.as_run::<Int64Type>().values(),
535 _ => unreachable!("Unsupported run end index type: {r:?}"),
536 },
537 _ => unreachable!(),
538 };
539 let rows = converter.convert_columns(&[values.clone()])?;
540 Ok(Encoder::RunEndEncoded(rows))
541 }
542 }
543 }
544
545 fn size(&self) -> usize {
546 match self {
547 Codec::Stateless => 0,
548 Codec::Dictionary(converter, nulls) => converter.size() + nulls.data.len(),
549 Codec::Struct(converter, nulls) => converter.size() + nulls.data.len(),
550 Codec::List(converter) => converter.size(),
551 Codec::RunEndEncoded(converter) => converter.size(),
552 }
553 }
554}
555
556#[derive(Debug)]
557enum Encoder<'a> {
558 Stateless,
560 Dictionary(Rows, Row<'a>),
562 Struct(Rows, Row<'a>),
568 List(Rows),
570 RunEndEncoded(Rows),
572}
573
574#[derive(Debug, Clone, PartialEq, Eq)]
576pub struct SortField {
577 options: SortOptions,
579 data_type: DataType,
581}
582
583impl SortField {
584 pub fn new(data_type: DataType) -> Self {
586 Self::new_with_options(data_type, Default::default())
587 }
588
589 pub fn new_with_options(data_type: DataType, options: SortOptions) -> Self {
591 Self { options, data_type }
592 }
593
594 pub fn size(&self) -> usize {
598 self.data_type.size() + std::mem::size_of::<Self>() - std::mem::size_of::<DataType>()
599 }
600}
601
602impl RowConverter {
603 pub fn new(fields: Vec<SortField>) -> Result<Self, ArrowError> {
605 if !Self::supports_fields(&fields) {
606 return Err(ArrowError::NotYetImplemented(format!(
607 "Row format support not yet implemented for: {fields:?}"
608 )));
609 }
610
611 let codecs = fields.iter().map(Codec::new).collect::<Result<_, _>>()?;
612 Ok(Self {
613 fields: fields.into(),
614 codecs,
615 })
616 }
617
618 pub fn supports_fields(fields: &[SortField]) -> bool {
620 fields.iter().all(|x| Self::supports_datatype(&x.data_type))
621 }
622
623 fn supports_datatype(d: &DataType) -> bool {
624 match d {
625 _ if !d.is_nested() => true,
626 DataType::List(f)
627 | DataType::LargeList(f)
628 | DataType::FixedSizeList(f, _)
629 | DataType::Map(f, _) => Self::supports_datatype(f.data_type()),
630 DataType::Struct(f) => f.iter().all(|x| Self::supports_datatype(x.data_type())),
631 DataType::RunEndEncoded(_, values) => Self::supports_datatype(values.data_type()),
632 _ => false,
633 }
634 }
635
636 pub fn convert_columns(&self, columns: &[ArrayRef]) -> Result<Rows, ArrowError> {
644 let num_rows = columns.first().map(|x| x.len()).unwrap_or(0);
645 let mut rows = self.empty_rows(num_rows, 0);
646 self.append(&mut rows, columns)?;
647 Ok(rows)
648 }
649
650 pub fn append(&self, rows: &mut Rows, columns: &[ArrayRef]) -> Result<(), ArrowError> {
681 assert!(
682 Arc::ptr_eq(&rows.config.fields, &self.fields),
683 "rows were not produced by this RowConverter"
684 );
685
686 if columns.len() != self.fields.len() {
687 return Err(ArrowError::InvalidArgumentError(format!(
688 "Incorrect number of arrays provided to RowConverter, expected {} got {}",
689 self.fields.len(),
690 columns.len()
691 )));
692 }
693
694 let encoders = columns
695 .iter()
696 .zip(&self.codecs)
697 .zip(self.fields.iter())
698 .map(|((column, codec), field)| {
699 if !column.data_type().equals_datatype(&field.data_type) {
700 return Err(ArrowError::InvalidArgumentError(format!(
701 "RowConverter column schema mismatch, expected {} got {}",
702 field.data_type,
703 column.data_type()
704 )));
705 }
706 codec.encoder(column.as_ref())
707 })
708 .collect::<Result<Vec<_>, _>>()?;
709
710 let write_offset = rows.num_rows();
711 let lengths = row_lengths(columns, &encoders);
712 let total = lengths.extend_offsets(rows.offsets[write_offset], &mut rows.offsets);
713 rows.buffer.resize(total, 0);
714
715 for ((column, field), encoder) in columns.iter().zip(self.fields.iter()).zip(encoders) {
716 encode_column(
718 &mut rows.buffer,
719 &mut rows.offsets[write_offset..],
720 column.as_ref(),
721 field.options,
722 &encoder,
723 )
724 }
725
726 if cfg!(debug_assertions) {
727 assert_eq!(*rows.offsets.last().unwrap(), rows.buffer.len());
728 rows.offsets
729 .windows(2)
730 .for_each(|w| assert!(w[0] <= w[1], "offsets should be monotonic"));
731 }
732
733 Ok(())
734 }
735
736 pub fn convert_rows<'a, I>(&self, rows: I) -> Result<Vec<ArrayRef>, ArrowError>
742 where
743 I: IntoIterator<Item = Row<'a>>,
744 {
745 let mut validate_utf8 = false;
746 let mut rows: Vec<_> = rows
747 .into_iter()
748 .map(|row| {
749 assert!(
750 Arc::ptr_eq(&row.config.fields, &self.fields),
751 "rows were not produced by this RowConverter"
752 );
753 validate_utf8 |= row.config.validate_utf8;
754 row.data
755 })
756 .collect();
757
758 unsafe { self.convert_raw(&mut rows, validate_utf8) }
762 }
763
764 pub fn empty_rows(&self, row_capacity: usize, data_capacity: usize) -> Rows {
793 let mut offsets = Vec::with_capacity(row_capacity.saturating_add(1));
794 offsets.push(0);
795
796 Rows {
797 offsets,
798 buffer: Vec::with_capacity(data_capacity),
799 config: RowConfig {
800 fields: self.fields.clone(),
801 validate_utf8: false,
802 },
803 }
804 }
805
806 pub fn from_binary(&self, array: BinaryArray) -> Rows {
833 assert_eq!(
834 array.null_count(),
835 0,
836 "can't construct Rows instance from array with nulls"
837 );
838 Rows {
839 buffer: array.values().to_vec(),
840 offsets: array.offsets().iter().map(|&i| i.as_usize()).collect(),
841 config: RowConfig {
842 fields: Arc::clone(&self.fields),
843 validate_utf8: true,
844 },
845 }
846 }
847
848 unsafe fn convert_raw(
854 &self,
855 rows: &mut [&[u8]],
856 validate_utf8: bool,
857 ) -> Result<Vec<ArrayRef>, ArrowError> {
858 self.fields
859 .iter()
860 .zip(&self.codecs)
861 .map(|(field, codec)| decode_column(field, rows, codec, validate_utf8))
862 .collect()
863 }
864
865 pub fn parser(&self) -> RowParser {
867 RowParser::new(Arc::clone(&self.fields))
868 }
869
870 pub fn size(&self) -> usize {
874 std::mem::size_of::<Self>()
875 + self.fields.iter().map(|x| x.size()).sum::<usize>()
876 + self.codecs.capacity() * std::mem::size_of::<Codec>()
877 + self.codecs.iter().map(Codec::size).sum::<usize>()
878 }
879}
880
881#[derive(Debug)]
883pub struct RowParser {
884 config: RowConfig,
885}
886
887impl RowParser {
888 fn new(fields: Arc<[SortField]>) -> Self {
889 Self {
890 config: RowConfig {
891 fields,
892 validate_utf8: true,
893 },
894 }
895 }
896
897 pub fn parse<'a>(&'a self, bytes: &'a [u8]) -> Row<'a> {
902 Row {
903 data: bytes,
904 config: &self.config,
905 }
906 }
907}
908
909#[derive(Debug, Clone)]
911struct RowConfig {
912 fields: Arc<[SortField]>,
914 validate_utf8: bool,
916}
917
918#[derive(Debug)]
922pub struct Rows {
923 buffer: Vec<u8>,
925 offsets: Vec<usize>,
927 config: RowConfig,
929}
930
931impl Rows {
932 pub fn push(&mut self, row: Row<'_>) {
934 assert!(
935 Arc::ptr_eq(&row.config.fields, &self.config.fields),
936 "row was not produced by this RowConverter"
937 );
938 self.config.validate_utf8 |= row.config.validate_utf8;
939 self.buffer.extend_from_slice(row.data);
940 self.offsets.push(self.buffer.len())
941 }
942
943 pub fn row(&self, row: usize) -> Row<'_> {
945 assert!(row + 1 < self.offsets.len());
946 unsafe { self.row_unchecked(row) }
947 }
948
949 pub unsafe fn row_unchecked(&self, index: usize) -> Row<'_> {
954 let end = unsafe { self.offsets.get_unchecked(index + 1) };
955 let start = unsafe { self.offsets.get_unchecked(index) };
956 let data = unsafe { self.buffer.get_unchecked(*start..*end) };
957 Row {
958 data,
959 config: &self.config,
960 }
961 }
962
963 pub fn clear(&mut self) {
965 self.offsets.truncate(1);
966 self.buffer.clear();
967 }
968
969 pub fn num_rows(&self) -> usize {
971 self.offsets.len() - 1
972 }
973
974 pub fn iter(&self) -> RowsIter<'_> {
976 self.into_iter()
977 }
978
979 pub fn size(&self) -> usize {
983 std::mem::size_of::<Self>()
985 + self.buffer.len()
986 + self.offsets.len() * std::mem::size_of::<usize>()
987 }
988
989 pub fn try_into_binary(self) -> Result<BinaryArray, ArrowError> {
1019 if self.buffer.len() > i32::MAX as usize {
1020 return Err(ArrowError::InvalidArgumentError(format!(
1021 "{}-byte rows buffer too long to convert into a i32-indexed BinaryArray",
1022 self.buffer.len()
1023 )));
1024 }
1025 let offsets_scalar = ScalarBuffer::from_iter(self.offsets.into_iter().map(i32::usize_as));
1027 let array = unsafe {
1029 BinaryArray::new_unchecked(
1030 OffsetBuffer::new_unchecked(offsets_scalar),
1031 Buffer::from_vec(self.buffer),
1032 None,
1033 )
1034 };
1035 Ok(array)
1036 }
1037}
1038
1039impl<'a> IntoIterator for &'a Rows {
1040 type Item = Row<'a>;
1041 type IntoIter = RowsIter<'a>;
1042
1043 fn into_iter(self) -> Self::IntoIter {
1044 RowsIter {
1045 rows: self,
1046 start: 0,
1047 end: self.num_rows(),
1048 }
1049 }
1050}
1051
1052#[derive(Debug)]
1054pub struct RowsIter<'a> {
1055 rows: &'a Rows,
1056 start: usize,
1057 end: usize,
1058}
1059
1060impl<'a> Iterator for RowsIter<'a> {
1061 type Item = Row<'a>;
1062
1063 fn next(&mut self) -> Option<Self::Item> {
1064 if self.end == self.start {
1065 return None;
1066 }
1067
1068 let row = unsafe { self.rows.row_unchecked(self.start) };
1070 self.start += 1;
1071 Some(row)
1072 }
1073
1074 fn size_hint(&self) -> (usize, Option<usize>) {
1075 let len = self.len();
1076 (len, Some(len))
1077 }
1078}
1079
1080impl ExactSizeIterator for RowsIter<'_> {
1081 fn len(&self) -> usize {
1082 self.end - self.start
1083 }
1084}
1085
1086impl DoubleEndedIterator for RowsIter<'_> {
1087 fn next_back(&mut self) -> Option<Self::Item> {
1088 if self.end == self.start {
1089 return None;
1090 }
1091 let row = unsafe { self.rows.row_unchecked(self.end) };
1093 self.end -= 1;
1094 Some(row)
1095 }
1096}
1097
1098#[derive(Debug, Copy, Clone)]
1107pub struct Row<'a> {
1108 data: &'a [u8],
1109 config: &'a RowConfig,
1110}
1111
1112impl<'a> Row<'a> {
1113 pub fn owned(&self) -> OwnedRow {
1115 OwnedRow {
1116 data: self.data.into(),
1117 config: self.config.clone(),
1118 }
1119 }
1120
1121 pub fn data(&self) -> &'a [u8] {
1123 self.data
1124 }
1125}
1126
1127impl PartialEq for Row<'_> {
1130 #[inline]
1131 fn eq(&self, other: &Self) -> bool {
1132 self.data.eq(other.data)
1133 }
1134}
1135
1136impl Eq for Row<'_> {}
1137
1138impl PartialOrd for Row<'_> {
1139 #[inline]
1140 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1141 Some(self.cmp(other))
1142 }
1143}
1144
1145impl Ord for Row<'_> {
1146 #[inline]
1147 fn cmp(&self, other: &Self) -> Ordering {
1148 self.data.cmp(other.data)
1149 }
1150}
1151
1152impl Hash for Row<'_> {
1153 #[inline]
1154 fn hash<H: Hasher>(&self, state: &mut H) {
1155 self.data.hash(state)
1156 }
1157}
1158
1159impl AsRef<[u8]> for Row<'_> {
1160 #[inline]
1161 fn as_ref(&self) -> &[u8] {
1162 self.data
1163 }
1164}
1165
1166#[derive(Debug, Clone)]
1170pub struct OwnedRow {
1171 data: Box<[u8]>,
1172 config: RowConfig,
1173}
1174
1175impl OwnedRow {
1176 pub fn row(&self) -> Row<'_> {
1180 Row {
1181 data: &self.data,
1182 config: &self.config,
1183 }
1184 }
1185}
1186
1187impl PartialEq for OwnedRow {
1190 #[inline]
1191 fn eq(&self, other: &Self) -> bool {
1192 self.row().eq(&other.row())
1193 }
1194}
1195
1196impl Eq for OwnedRow {}
1197
1198impl PartialOrd for OwnedRow {
1199 #[inline]
1200 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1201 Some(self.cmp(other))
1202 }
1203}
1204
1205impl Ord for OwnedRow {
1206 #[inline]
1207 fn cmp(&self, other: &Self) -> Ordering {
1208 self.row().cmp(&other.row())
1209 }
1210}
1211
1212impl Hash for OwnedRow {
1213 #[inline]
1214 fn hash<H: Hasher>(&self, state: &mut H) {
1215 self.row().hash(state)
1216 }
1217}
1218
1219impl AsRef<[u8]> for OwnedRow {
1220 #[inline]
1221 fn as_ref(&self) -> &[u8] {
1222 &self.data
1223 }
1224}
1225
1226#[inline]
1228fn null_sentinel(options: SortOptions) -> u8 {
1229 match options.nulls_first {
1230 true => 0,
1231 false => 0xFF,
1232 }
1233}
1234
1235enum LengthTracker {
1237 Fixed { length: usize, num_rows: usize },
1239 Variable {
1241 fixed_length: usize,
1242 lengths: Vec<usize>,
1243 },
1244}
1245
1246impl LengthTracker {
1247 fn new(num_rows: usize) -> Self {
1248 Self::Fixed {
1249 length: 0,
1250 num_rows,
1251 }
1252 }
1253
1254 fn push_fixed(&mut self, new_length: usize) {
1256 match self {
1257 LengthTracker::Fixed { length, .. } => *length += new_length,
1258 LengthTracker::Variable { fixed_length, .. } => *fixed_length += new_length,
1259 }
1260 }
1261
1262 fn push_variable(&mut self, new_lengths: impl ExactSizeIterator<Item = usize>) {
1264 match self {
1265 LengthTracker::Fixed { length, .. } => {
1266 *self = LengthTracker::Variable {
1267 fixed_length: *length,
1268 lengths: new_lengths.collect(),
1269 }
1270 }
1271 LengthTracker::Variable { lengths, .. } => {
1272 assert_eq!(lengths.len(), new_lengths.len());
1273 lengths
1274 .iter_mut()
1275 .zip(new_lengths)
1276 .for_each(|(length, new_length)| *length += new_length);
1277 }
1278 }
1279 }
1280
1281 fn materialized(&mut self) -> &mut [usize] {
1283 if let LengthTracker::Fixed { length, num_rows } = *self {
1284 *self = LengthTracker::Variable {
1285 fixed_length: length,
1286 lengths: vec![0; num_rows],
1287 };
1288 }
1289
1290 match self {
1291 LengthTracker::Variable { lengths, .. } => lengths,
1292 LengthTracker::Fixed { .. } => unreachable!(),
1293 }
1294 }
1295
1296 fn extend_offsets(&self, initial_offset: usize, offsets: &mut Vec<usize>) -> usize {
1314 match self {
1315 LengthTracker::Fixed { length, num_rows } => {
1316 offsets.extend((0..*num_rows).map(|i| initial_offset + i * length));
1317
1318 initial_offset + num_rows * length
1319 }
1320 LengthTracker::Variable {
1321 fixed_length,
1322 lengths,
1323 } => {
1324 let mut acc = initial_offset;
1325
1326 offsets.extend(lengths.iter().map(|length| {
1327 let current = acc;
1328 acc += length + fixed_length;
1329 current
1330 }));
1331
1332 acc
1333 }
1334 }
1335 }
1336}
1337
1338fn row_lengths(cols: &[ArrayRef], encoders: &[Encoder]) -> LengthTracker {
1340 use fixed::FixedLengthEncoding;
1341
1342 let num_rows = cols.first().map(|x| x.len()).unwrap_or(0);
1343 let mut tracker = LengthTracker::new(num_rows);
1344
1345 for (array, encoder) in cols.iter().zip(encoders) {
1346 match encoder {
1347 Encoder::Stateless => {
1348 downcast_primitive_array! {
1349 array => tracker.push_fixed(fixed::encoded_len(array)),
1350 DataType::Null => {},
1351 DataType::Boolean => tracker.push_fixed(bool::ENCODED_LEN),
1352 DataType::Binary => tracker.push_variable(
1353 as_generic_binary_array::<i32>(array)
1354 .iter()
1355 .map(|slice| variable::encoded_len(slice))
1356 ),
1357 DataType::LargeBinary => tracker.push_variable(
1358 as_generic_binary_array::<i64>(array)
1359 .iter()
1360 .map(|slice| variable::encoded_len(slice))
1361 ),
1362 DataType::BinaryView => tracker.push_variable(
1363 array.as_binary_view()
1364 .iter()
1365 .map(|slice| variable::encoded_len(slice))
1366 ),
1367 DataType::Utf8 => tracker.push_variable(
1368 array.as_string::<i32>()
1369 .iter()
1370 .map(|slice| variable::encoded_len(slice.map(|x| x.as_bytes())))
1371 ),
1372 DataType::LargeUtf8 => tracker.push_variable(
1373 array.as_string::<i64>()
1374 .iter()
1375 .map(|slice| variable::encoded_len(slice.map(|x| x.as_bytes())))
1376 ),
1377 DataType::Utf8View => tracker.push_variable(
1378 array.as_string_view()
1379 .iter()
1380 .map(|slice| variable::encoded_len(slice.map(|x| x.as_bytes())))
1381 ),
1382 DataType::FixedSizeBinary(len) => {
1383 let len = len.to_usize().unwrap();
1384 tracker.push_fixed(1 + len)
1385 }
1386 _ => unimplemented!("unsupported data type: {}", array.data_type()),
1387 }
1388 }
1389 Encoder::Dictionary(values, null) => {
1390 downcast_dictionary_array! {
1391 array => {
1392 tracker.push_variable(
1393 array.keys().iter().map(|v| match v {
1394 Some(k) => values.row(k.as_usize()).data.len(),
1395 None => null.data.len(),
1396 })
1397 )
1398 }
1399 _ => unreachable!(),
1400 }
1401 }
1402 Encoder::Struct(rows, null) => {
1403 let array = as_struct_array(array);
1404 tracker.push_variable((0..array.len()).map(|idx| match array.is_valid(idx) {
1405 true => 1 + rows.row(idx).as_ref().len(),
1406 false => 1 + null.data.len(),
1407 }));
1408 }
1409 Encoder::List(rows) => match array.data_type() {
1410 DataType::List(_) => {
1411 list::compute_lengths(tracker.materialized(), rows, as_list_array(array))
1412 }
1413 DataType::LargeList(_) => {
1414 list::compute_lengths(tracker.materialized(), rows, as_large_list_array(array))
1415 }
1416 DataType::FixedSizeList(_, _) => compute_lengths_fixed_size_list(
1417 &mut tracker,
1418 rows,
1419 as_fixed_size_list_array(array),
1420 ),
1421 _ => unreachable!(),
1422 },
1423 Encoder::RunEndEncoded(rows) => match array.data_type() {
1424 DataType::RunEndEncoded(r, _) => match r.data_type() {
1425 DataType::Int16 => run::compute_lengths(
1426 tracker.materialized(),
1427 rows,
1428 array.as_run::<Int16Type>(),
1429 ),
1430 DataType::Int32 => run::compute_lengths(
1431 tracker.materialized(),
1432 rows,
1433 array.as_run::<Int32Type>(),
1434 ),
1435 DataType::Int64 => run::compute_lengths(
1436 tracker.materialized(),
1437 rows,
1438 array.as_run::<Int64Type>(),
1439 ),
1440 _ => unreachable!("Unsupported run end index type: {r:?}"),
1441 },
1442 _ => unreachable!(),
1443 },
1444 }
1445 }
1446
1447 tracker
1448}
1449
1450fn encode_column(
1452 data: &mut [u8],
1453 offsets: &mut [usize],
1454 column: &dyn Array,
1455 opts: SortOptions,
1456 encoder: &Encoder<'_>,
1457) {
1458 match encoder {
1459 Encoder::Stateless => {
1460 downcast_primitive_array! {
1461 column => {
1462 if let Some(nulls) = column.nulls().filter(|n| n.null_count() > 0){
1463 fixed::encode(data, offsets, column.values(), nulls, opts)
1464 } else {
1465 fixed::encode_not_null(data, offsets, column.values(), opts)
1466 }
1467 }
1468 DataType::Null => {}
1469 DataType::Boolean => {
1470 if let Some(nulls) = column.nulls().filter(|n| n.null_count() > 0){
1471 fixed::encode_boolean(data, offsets, column.as_boolean().values(), nulls, opts)
1472 } else {
1473 fixed::encode_boolean_not_null(data, offsets, column.as_boolean().values(), opts)
1474 }
1475 }
1476 DataType::Binary => {
1477 variable::encode(data, offsets, as_generic_binary_array::<i32>(column).iter(), opts)
1478 }
1479 DataType::BinaryView => {
1480 variable::encode(data, offsets, column.as_binary_view().iter(), opts)
1481 }
1482 DataType::LargeBinary => {
1483 variable::encode(data, offsets, as_generic_binary_array::<i64>(column).iter(), opts)
1484 }
1485 DataType::Utf8 => variable::encode(
1486 data, offsets,
1487 column.as_string::<i32>().iter().map(|x| x.map(|x| x.as_bytes())),
1488 opts,
1489 ),
1490 DataType::LargeUtf8 => variable::encode(
1491 data, offsets,
1492 column.as_string::<i64>()
1493 .iter()
1494 .map(|x| x.map(|x| x.as_bytes())),
1495 opts,
1496 ),
1497 DataType::Utf8View => variable::encode(
1498 data, offsets,
1499 column.as_string_view().iter().map(|x| x.map(|x| x.as_bytes())),
1500 opts,
1501 ),
1502 DataType::FixedSizeBinary(_) => {
1503 let array = column.as_any().downcast_ref().unwrap();
1504 fixed::encode_fixed_size_binary(data, offsets, array, opts)
1505 }
1506 _ => unimplemented!("unsupported data type: {}", column.data_type()),
1507 }
1508 }
1509 Encoder::Dictionary(values, nulls) => {
1510 downcast_dictionary_array! {
1511 column => encode_dictionary_values(data, offsets, column, values, nulls),
1512 _ => unreachable!()
1513 }
1514 }
1515 Encoder::Struct(rows, null) => {
1516 let array = as_struct_array(column);
1517 let null_sentinel = null_sentinel(opts);
1518 offsets
1519 .iter_mut()
1520 .skip(1)
1521 .enumerate()
1522 .for_each(|(idx, offset)| {
1523 let (row, sentinel) = match array.is_valid(idx) {
1524 true => (rows.row(idx), 0x01),
1525 false => (*null, null_sentinel),
1526 };
1527 let end_offset = *offset + 1 + row.as_ref().len();
1528 data[*offset] = sentinel;
1529 data[*offset + 1..end_offset].copy_from_slice(row.as_ref());
1530 *offset = end_offset;
1531 })
1532 }
1533 Encoder::List(rows) => match column.data_type() {
1534 DataType::List(_) => list::encode(data, offsets, rows, opts, as_list_array(column)),
1535 DataType::LargeList(_) => {
1536 list::encode(data, offsets, rows, opts, as_large_list_array(column))
1537 }
1538 DataType::FixedSizeList(_, _) => {
1539 encode_fixed_size_list(data, offsets, rows, opts, as_fixed_size_list_array(column))
1540 }
1541 _ => unreachable!(),
1542 },
1543 Encoder::RunEndEncoded(rows) => match column.data_type() {
1544 DataType::RunEndEncoded(r, _) => match r.data_type() {
1545 DataType::Int16 => {
1546 run::encode(data, offsets, rows, opts, column.as_run::<Int16Type>())
1547 }
1548 DataType::Int32 => {
1549 run::encode(data, offsets, rows, opts, column.as_run::<Int32Type>())
1550 }
1551 DataType::Int64 => {
1552 run::encode(data, offsets, rows, opts, column.as_run::<Int64Type>())
1553 }
1554 _ => unreachable!("Unsupported run end index type: {r:?}"),
1555 },
1556 _ => unreachable!(),
1557 },
1558 }
1559}
1560
1561pub fn encode_dictionary_values<K: ArrowDictionaryKeyType>(
1563 data: &mut [u8],
1564 offsets: &mut [usize],
1565 column: &DictionaryArray<K>,
1566 values: &Rows,
1567 null: &Row<'_>,
1568) {
1569 for (offset, k) in offsets.iter_mut().skip(1).zip(column.keys()) {
1570 let row = match k {
1571 Some(k) => values.row(k.as_usize()).data,
1572 None => null.data,
1573 };
1574 let end_offset = *offset + row.len();
1575 data[*offset..end_offset].copy_from_slice(row);
1576 *offset = end_offset;
1577 }
1578}
1579
1580macro_rules! decode_primitive_helper {
1581 ($t:ty, $rows:ident, $data_type:ident, $options:ident) => {
1582 Arc::new(decode_primitive::<$t>($rows, $data_type, $options))
1583 };
1584}
1585
1586unsafe fn decode_column(
1592 field: &SortField,
1593 rows: &mut [&[u8]],
1594 codec: &Codec,
1595 validate_utf8: bool,
1596) -> Result<ArrayRef, ArrowError> {
1597 let options = field.options;
1598
1599 let array: ArrayRef = match codec {
1600 Codec::Stateless => {
1601 let data_type = field.data_type.clone();
1602 downcast_primitive! {
1603 data_type => (decode_primitive_helper, rows, data_type, options),
1604 DataType::Null => Arc::new(NullArray::new(rows.len())),
1605 DataType::Boolean => Arc::new(decode_bool(rows, options)),
1606 DataType::Binary => Arc::new(decode_binary::<i32>(rows, options)),
1607 DataType::LargeBinary => Arc::new(decode_binary::<i64>(rows, options)),
1608 DataType::BinaryView => Arc::new(decode_binary_view(rows, options)),
1609 DataType::FixedSizeBinary(size) => Arc::new(decode_fixed_size_binary(rows, size, options)),
1610 DataType::Utf8 => Arc::new(decode_string::<i32>(rows, options, validate_utf8)),
1611 DataType::LargeUtf8 => Arc::new(decode_string::<i64>(rows, options, validate_utf8)),
1612 DataType::Utf8View => Arc::new(decode_string_view(rows, options, validate_utf8)),
1613 _ => return Err(ArrowError::NotYetImplemented(format!("unsupported data type: {data_type}" )))
1614 }
1615 }
1616 Codec::Dictionary(converter, _) => {
1617 let cols = converter.convert_raw(rows, validate_utf8)?;
1618 cols.into_iter().next().unwrap()
1619 }
1620 Codec::Struct(converter, _) => {
1621 let (null_count, nulls) = fixed::decode_nulls(rows);
1622 rows.iter_mut().for_each(|row| *row = &row[1..]);
1623 let children = converter.convert_raw(rows, validate_utf8)?;
1624
1625 let child_data = children.iter().map(|c| c.to_data()).collect();
1626 let builder = ArrayDataBuilder::new(field.data_type.clone())
1627 .len(rows.len())
1628 .null_count(null_count)
1629 .null_bit_buffer(Some(nulls))
1630 .child_data(child_data);
1631
1632 Arc::new(StructArray::from(builder.build_unchecked()))
1633 }
1634 Codec::List(converter) => match &field.data_type {
1635 DataType::List(_) => {
1636 Arc::new(list::decode::<i32>(converter, rows, field, validate_utf8)?)
1637 }
1638 DataType::LargeList(_) => {
1639 Arc::new(list::decode::<i64>(converter, rows, field, validate_utf8)?)
1640 }
1641 DataType::FixedSizeList(_, value_length) => Arc::new(list::decode_fixed_size_list(
1642 converter,
1643 rows,
1644 field,
1645 validate_utf8,
1646 value_length.as_usize(),
1647 )?),
1648 _ => unreachable!(),
1649 },
1650 Codec::RunEndEncoded(converter) => match &field.data_type {
1651 DataType::RunEndEncoded(run_ends, _) => match run_ends.data_type() {
1652 DataType::Int16 => Arc::new(run::decode::<Int16Type>(
1653 converter,
1654 rows,
1655 field,
1656 validate_utf8,
1657 )?),
1658 DataType::Int32 => Arc::new(run::decode::<Int32Type>(
1659 converter,
1660 rows,
1661 field,
1662 validate_utf8,
1663 )?),
1664 DataType::Int64 => Arc::new(run::decode::<Int64Type>(
1665 converter,
1666 rows,
1667 field,
1668 validate_utf8,
1669 )?),
1670 _ => unreachable!(),
1671 },
1672 _ => unreachable!(),
1673 },
1674 };
1675 Ok(array)
1676}
1677
1678#[cfg(test)]
1679mod tests {
1680 use rand::distr::uniform::SampleUniform;
1681 use rand::distr::{Distribution, StandardUniform};
1682 use rand::{rng, Rng};
1683
1684 use arrow_array::builder::*;
1685 use arrow_array::types::*;
1686 use arrow_array::*;
1687 use arrow_buffer::{i256, NullBuffer};
1688 use arrow_buffer::{Buffer, OffsetBuffer};
1689 use arrow_cast::display::{ArrayFormatter, FormatOptions};
1690 use arrow_ord::sort::{LexicographicalComparator, SortColumn};
1691
1692 use super::*;
1693
1694 #[test]
1695 fn test_fixed_width() {
1696 let cols = [
1697 Arc::new(Int16Array::from_iter([
1698 Some(1),
1699 Some(2),
1700 None,
1701 Some(-5),
1702 Some(2),
1703 Some(2),
1704 Some(0),
1705 ])) as ArrayRef,
1706 Arc::new(Float32Array::from_iter([
1707 Some(1.3),
1708 Some(2.5),
1709 None,
1710 Some(4.),
1711 Some(0.1),
1712 Some(-4.),
1713 Some(-0.),
1714 ])) as ArrayRef,
1715 ];
1716
1717 let converter = RowConverter::new(vec![
1718 SortField::new(DataType::Int16),
1719 SortField::new(DataType::Float32),
1720 ])
1721 .unwrap();
1722 let rows = converter.convert_columns(&cols).unwrap();
1723
1724 assert_eq!(rows.offsets, &[0, 8, 16, 24, 32, 40, 48, 56]);
1725 assert_eq!(
1726 rows.buffer,
1727 &[
1728 1, 128, 1, 1, 191, 166, 102, 102, 1, 128, 2, 1, 192, 32, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 127, 251, 1, 192, 128, 0, 0, 1, 128, 2, 1, 189, 204, 204, 205, 1, 128, 2, 1, 63, 127, 255, 255, 1, 128, 0, 1, 127, 255, 255, 255 ]
1743 );
1744
1745 assert!(rows.row(3) < rows.row(6));
1746 assert!(rows.row(0) < rows.row(1));
1747 assert!(rows.row(3) < rows.row(0));
1748 assert!(rows.row(4) < rows.row(1));
1749 assert!(rows.row(5) < rows.row(4));
1750
1751 let back = converter.convert_rows(&rows).unwrap();
1752 for (expected, actual) in cols.iter().zip(&back) {
1753 assert_eq!(expected, actual);
1754 }
1755 }
1756
1757 #[test]
1758 fn test_decimal128() {
1759 let converter = RowConverter::new(vec![SortField::new(DataType::Decimal128(
1760 DECIMAL128_MAX_PRECISION,
1761 7,
1762 ))])
1763 .unwrap();
1764 let col = Arc::new(
1765 Decimal128Array::from_iter([
1766 None,
1767 Some(i128::MIN),
1768 Some(-13),
1769 Some(46_i128),
1770 Some(5456_i128),
1771 Some(i128::MAX),
1772 ])
1773 .with_precision_and_scale(38, 7)
1774 .unwrap(),
1775 ) as ArrayRef;
1776
1777 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1778 for i in 0..rows.num_rows() - 1 {
1779 assert!(rows.row(i) < rows.row(i + 1));
1780 }
1781
1782 let back = converter.convert_rows(&rows).unwrap();
1783 assert_eq!(back.len(), 1);
1784 assert_eq!(col.as_ref(), back[0].as_ref())
1785 }
1786
1787 #[test]
1788 fn test_decimal256() {
1789 let converter = RowConverter::new(vec![SortField::new(DataType::Decimal256(
1790 DECIMAL256_MAX_PRECISION,
1791 7,
1792 ))])
1793 .unwrap();
1794 let col = Arc::new(
1795 Decimal256Array::from_iter([
1796 None,
1797 Some(i256::MIN),
1798 Some(i256::from_parts(0, -1)),
1799 Some(i256::from_parts(u128::MAX, -1)),
1800 Some(i256::from_parts(u128::MAX, 0)),
1801 Some(i256::from_parts(0, 46_i128)),
1802 Some(i256::from_parts(5, 46_i128)),
1803 Some(i256::MAX),
1804 ])
1805 .with_precision_and_scale(DECIMAL256_MAX_PRECISION, 7)
1806 .unwrap(),
1807 ) as ArrayRef;
1808
1809 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1810 for i in 0..rows.num_rows() - 1 {
1811 assert!(rows.row(i) < rows.row(i + 1));
1812 }
1813
1814 let back = converter.convert_rows(&rows).unwrap();
1815 assert_eq!(back.len(), 1);
1816 assert_eq!(col.as_ref(), back[0].as_ref())
1817 }
1818
1819 #[test]
1820 fn test_bool() {
1821 let converter = RowConverter::new(vec![SortField::new(DataType::Boolean)]).unwrap();
1822
1823 let col = Arc::new(BooleanArray::from_iter([None, Some(false), Some(true)])) as ArrayRef;
1824
1825 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1826 assert!(rows.row(2) > rows.row(1));
1827 assert!(rows.row(2) > rows.row(0));
1828 assert!(rows.row(1) > rows.row(0));
1829
1830 let cols = converter.convert_rows(&rows).unwrap();
1831 assert_eq!(&cols[0], &col);
1832
1833 let converter = RowConverter::new(vec![SortField::new_with_options(
1834 DataType::Boolean,
1835 SortOptions::default().desc().with_nulls_first(false),
1836 )])
1837 .unwrap();
1838
1839 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1840 assert!(rows.row(2) < rows.row(1));
1841 assert!(rows.row(2) < rows.row(0));
1842 assert!(rows.row(1) < rows.row(0));
1843 let cols = converter.convert_rows(&rows).unwrap();
1844 assert_eq!(&cols[0], &col);
1845 }
1846
1847 #[test]
1848 fn test_timezone() {
1849 let a =
1850 TimestampNanosecondArray::from(vec![1, 2, 3, 4, 5]).with_timezone("+01:00".to_string());
1851 let d = a.data_type().clone();
1852
1853 let converter = RowConverter::new(vec![SortField::new(a.data_type().clone())]).unwrap();
1854 let rows = converter.convert_columns(&[Arc::new(a) as _]).unwrap();
1855 let back = converter.convert_rows(&rows).unwrap();
1856 assert_eq!(back.len(), 1);
1857 assert_eq!(back[0].data_type(), &d);
1858
1859 let mut a = PrimitiveDictionaryBuilder::<Int32Type, TimestampNanosecondType>::new();
1861 a.append(34).unwrap();
1862 a.append_null();
1863 a.append(345).unwrap();
1864
1865 let dict = a.finish();
1867 let values = TimestampNanosecondArray::from(dict.values().to_data());
1868 let dict_with_tz = dict.with_values(Arc::new(values.with_timezone("+02:00")));
1869 let v = DataType::Timestamp(TimeUnit::Nanosecond, Some("+02:00".into()));
1870 let d = DataType::Dictionary(Box::new(DataType::Int32), Box::new(v.clone()));
1871
1872 assert_eq!(dict_with_tz.data_type(), &d);
1873 let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
1874 let rows = converter
1875 .convert_columns(&[Arc::new(dict_with_tz) as _])
1876 .unwrap();
1877 let back = converter.convert_rows(&rows).unwrap();
1878 assert_eq!(back.len(), 1);
1879 assert_eq!(back[0].data_type(), &v);
1880 }
1881
1882 #[test]
1883 fn test_null_encoding() {
1884 let col = Arc::new(NullArray::new(10));
1885 let converter = RowConverter::new(vec![SortField::new(DataType::Null)]).unwrap();
1886 let rows = converter.convert_columns(&[col]).unwrap();
1887 assert_eq!(rows.num_rows(), 10);
1888 assert_eq!(rows.row(1).data.len(), 0);
1889 }
1890
1891 #[test]
1892 fn test_variable_width() {
1893 let col = Arc::new(StringArray::from_iter([
1894 Some("hello"),
1895 Some("he"),
1896 None,
1897 Some("foo"),
1898 Some(""),
1899 ])) as ArrayRef;
1900
1901 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
1902 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1903
1904 assert!(rows.row(1) < rows.row(0));
1905 assert!(rows.row(2) < rows.row(4));
1906 assert!(rows.row(3) < rows.row(0));
1907 assert!(rows.row(3) < rows.row(1));
1908
1909 let cols = converter.convert_rows(&rows).unwrap();
1910 assert_eq!(&cols[0], &col);
1911
1912 let col = Arc::new(BinaryArray::from_iter([
1913 None,
1914 Some(vec![0_u8; 0]),
1915 Some(vec![0_u8; 6]),
1916 Some(vec![0_u8; variable::MINI_BLOCK_SIZE]),
1917 Some(vec![0_u8; variable::MINI_BLOCK_SIZE + 1]),
1918 Some(vec![0_u8; variable::BLOCK_SIZE]),
1919 Some(vec![0_u8; variable::BLOCK_SIZE + 1]),
1920 Some(vec![1_u8; 6]),
1921 Some(vec![1_u8; variable::MINI_BLOCK_SIZE]),
1922 Some(vec![1_u8; variable::MINI_BLOCK_SIZE + 1]),
1923 Some(vec![1_u8; variable::BLOCK_SIZE]),
1924 Some(vec![1_u8; variable::BLOCK_SIZE + 1]),
1925 Some(vec![0xFF_u8; 6]),
1926 Some(vec![0xFF_u8; variable::MINI_BLOCK_SIZE]),
1927 Some(vec![0xFF_u8; variable::MINI_BLOCK_SIZE + 1]),
1928 Some(vec![0xFF_u8; variable::BLOCK_SIZE]),
1929 Some(vec![0xFF_u8; variable::BLOCK_SIZE + 1]),
1930 ])) as ArrayRef;
1931
1932 let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
1933 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1934
1935 for i in 0..rows.num_rows() {
1936 for j in i + 1..rows.num_rows() {
1937 assert!(
1938 rows.row(i) < rows.row(j),
1939 "{} < {} - {:?} < {:?}",
1940 i,
1941 j,
1942 rows.row(i),
1943 rows.row(j)
1944 );
1945 }
1946 }
1947
1948 let cols = converter.convert_rows(&rows).unwrap();
1949 assert_eq!(&cols[0], &col);
1950
1951 let converter = RowConverter::new(vec![SortField::new_with_options(
1952 DataType::Binary,
1953 SortOptions::default().desc().with_nulls_first(false),
1954 )])
1955 .unwrap();
1956 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1957
1958 for i in 0..rows.num_rows() {
1959 for j in i + 1..rows.num_rows() {
1960 assert!(
1961 rows.row(i) > rows.row(j),
1962 "{} > {} - {:?} > {:?}",
1963 i,
1964 j,
1965 rows.row(i),
1966 rows.row(j)
1967 );
1968 }
1969 }
1970
1971 let cols = converter.convert_rows(&rows).unwrap();
1972 assert_eq!(&cols[0], &col);
1973 }
1974
1975 fn dictionary_eq(a: &dyn Array, b: &dyn Array) {
1977 match b.data_type() {
1978 DataType::Dictionary(_, v) => {
1979 assert_eq!(a.data_type(), v.as_ref());
1980 let b = arrow_cast::cast(b, v).unwrap();
1981 assert_eq!(a, b.as_ref())
1982 }
1983 _ => assert_eq!(a, b),
1984 }
1985 }
1986
1987 #[test]
1988 fn test_string_dictionary() {
1989 let a = Arc::new(DictionaryArray::<Int32Type>::from_iter([
1990 Some("foo"),
1991 Some("hello"),
1992 Some("he"),
1993 None,
1994 Some("hello"),
1995 Some(""),
1996 Some("hello"),
1997 Some("hello"),
1998 ])) as ArrayRef;
1999
2000 let field = SortField::new(a.data_type().clone());
2001 let converter = RowConverter::new(vec![field]).unwrap();
2002 let rows_a = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
2003
2004 assert!(rows_a.row(3) < rows_a.row(5));
2005 assert!(rows_a.row(2) < rows_a.row(1));
2006 assert!(rows_a.row(0) < rows_a.row(1));
2007 assert!(rows_a.row(3) < rows_a.row(0));
2008
2009 assert_eq!(rows_a.row(1), rows_a.row(4));
2010 assert_eq!(rows_a.row(1), rows_a.row(6));
2011 assert_eq!(rows_a.row(1), rows_a.row(7));
2012
2013 let cols = converter.convert_rows(&rows_a).unwrap();
2014 dictionary_eq(&cols[0], &a);
2015
2016 let b = Arc::new(DictionaryArray::<Int32Type>::from_iter([
2017 Some("hello"),
2018 None,
2019 Some("cupcakes"),
2020 ])) as ArrayRef;
2021
2022 let rows_b = converter.convert_columns(&[Arc::clone(&b)]).unwrap();
2023 assert_eq!(rows_a.row(1), rows_b.row(0));
2024 assert_eq!(rows_a.row(3), rows_b.row(1));
2025 assert!(rows_b.row(2) < rows_a.row(0));
2026
2027 let cols = converter.convert_rows(&rows_b).unwrap();
2028 dictionary_eq(&cols[0], &b);
2029
2030 let converter = RowConverter::new(vec![SortField::new_with_options(
2031 a.data_type().clone(),
2032 SortOptions::default().desc().with_nulls_first(false),
2033 )])
2034 .unwrap();
2035
2036 let rows_c = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
2037 assert!(rows_c.row(3) > rows_c.row(5));
2038 assert!(rows_c.row(2) > rows_c.row(1));
2039 assert!(rows_c.row(0) > rows_c.row(1));
2040 assert!(rows_c.row(3) > rows_c.row(0));
2041
2042 let cols = converter.convert_rows(&rows_c).unwrap();
2043 dictionary_eq(&cols[0], &a);
2044
2045 let converter = RowConverter::new(vec![SortField::new_with_options(
2046 a.data_type().clone(),
2047 SortOptions::default().desc().with_nulls_first(true),
2048 )])
2049 .unwrap();
2050
2051 let rows_c = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
2052 assert!(rows_c.row(3) < rows_c.row(5));
2053 assert!(rows_c.row(2) > rows_c.row(1));
2054 assert!(rows_c.row(0) > rows_c.row(1));
2055 assert!(rows_c.row(3) < rows_c.row(0));
2056
2057 let cols = converter.convert_rows(&rows_c).unwrap();
2058 dictionary_eq(&cols[0], &a);
2059 }
2060
2061 #[test]
2062 fn test_struct() {
2063 let a = Arc::new(Int32Array::from(vec![1, 1, 2, 2])) as ArrayRef;
2065 let a_f = Arc::new(Field::new("int", DataType::Int32, false));
2066 let u = Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as ArrayRef;
2067 let u_f = Arc::new(Field::new("s", DataType::Utf8, false));
2068 let s1 = Arc::new(StructArray::from(vec![(a_f, a), (u_f, u)])) as ArrayRef;
2069
2070 let sort_fields = vec![SortField::new(s1.data_type().clone())];
2071 let converter = RowConverter::new(sort_fields).unwrap();
2072 let r1 = converter.convert_columns(&[Arc::clone(&s1)]).unwrap();
2073
2074 for (a, b) in r1.iter().zip(r1.iter().skip(1)) {
2075 assert!(a < b);
2076 }
2077
2078 let back = converter.convert_rows(&r1).unwrap();
2079 assert_eq!(back.len(), 1);
2080 assert_eq!(&back[0], &s1);
2081
2082 let data = s1
2084 .to_data()
2085 .into_builder()
2086 .null_bit_buffer(Some(Buffer::from_slice_ref([0b00001010])))
2087 .null_count(2)
2088 .build()
2089 .unwrap();
2090
2091 let s2 = Arc::new(StructArray::from(data)) as ArrayRef;
2092 let r2 = converter.convert_columns(&[Arc::clone(&s2)]).unwrap();
2093 assert_eq!(r2.row(0), r2.row(2)); assert!(r2.row(0) < r2.row(1)); assert_ne!(r1.row(0), r2.row(0)); assert_eq!(r1.row(1), r2.row(1)); let back = converter.convert_rows(&r2).unwrap();
2099 assert_eq!(back.len(), 1);
2100 assert_eq!(&back[0], &s2);
2101
2102 back[0].to_data().validate_full().unwrap();
2103 }
2104
2105 #[test]
2106 fn test_primitive_dictionary() {
2107 let mut builder = PrimitiveDictionaryBuilder::<Int32Type, Int32Type>::new();
2108 builder.append(2).unwrap();
2109 builder.append(3).unwrap();
2110 builder.append(0).unwrap();
2111 builder.append_null();
2112 builder.append(5).unwrap();
2113 builder.append(3).unwrap();
2114 builder.append(-1).unwrap();
2115
2116 let a = builder.finish();
2117 let data_type = a.data_type().clone();
2118 let columns = [Arc::new(a) as ArrayRef];
2119
2120 let field = SortField::new(data_type.clone());
2121 let converter = RowConverter::new(vec![field]).unwrap();
2122 let rows = converter.convert_columns(&columns).unwrap();
2123 assert!(rows.row(0) < rows.row(1));
2124 assert!(rows.row(2) < rows.row(0));
2125 assert!(rows.row(3) < rows.row(2));
2126 assert!(rows.row(6) < rows.row(2));
2127 assert!(rows.row(3) < rows.row(6));
2128 }
2129
2130 #[test]
2131 fn test_dictionary_nulls() {
2132 let values = Int32Array::from_iter([Some(1), Some(-1), None, Some(4), None]).into_data();
2133 let keys =
2134 Int32Array::from_iter([Some(0), Some(0), Some(1), Some(2), Some(4), None]).into_data();
2135
2136 let data_type = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Int32));
2137 let data = keys
2138 .into_builder()
2139 .data_type(data_type.clone())
2140 .child_data(vec![values])
2141 .build()
2142 .unwrap();
2143
2144 let columns = [Arc::new(DictionaryArray::<Int32Type>::from(data)) as ArrayRef];
2145 let field = SortField::new(data_type.clone());
2146 let converter = RowConverter::new(vec![field]).unwrap();
2147 let rows = converter.convert_columns(&columns).unwrap();
2148
2149 assert_eq!(rows.row(0), rows.row(1));
2150 assert_eq!(rows.row(3), rows.row(4));
2151 assert_eq!(rows.row(4), rows.row(5));
2152 assert!(rows.row(3) < rows.row(0));
2153 }
2154
2155 #[test]
2156 #[should_panic(expected = "Encountered non UTF-8 data")]
2157 fn test_invalid_utf8() {
2158 let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
2159 let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _;
2160 let rows = converter.convert_columns(&[array]).unwrap();
2161 let binary_row = rows.row(0);
2162
2163 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2164 let parser = converter.parser();
2165 let utf8_row = parser.parse(binary_row.as_ref());
2166
2167 converter.convert_rows(std::iter::once(utf8_row)).unwrap();
2168 }
2169
2170 #[test]
2171 #[should_panic(expected = "Encountered non UTF-8 data")]
2172 fn test_invalid_utf8_array() {
2173 let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
2174 let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _;
2175 let rows = converter.convert_columns(&[array]).unwrap();
2176 let binary_rows = rows.try_into_binary().expect("known-small rows");
2177
2178 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2179 let parsed = converter.from_binary(binary_rows);
2180
2181 converter.convert_rows(parsed.iter()).unwrap();
2182 }
2183
2184 #[test]
2185 #[should_panic(expected = "index out of bounds")]
2186 fn test_invalid_empty() {
2187 let binary_row: &[u8] = &[];
2188
2189 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2190 let parser = converter.parser();
2191 let utf8_row = parser.parse(binary_row.as_ref());
2192
2193 converter.convert_rows(std::iter::once(utf8_row)).unwrap();
2194 }
2195
2196 #[test]
2197 #[should_panic(expected = "index out of bounds")]
2198 fn test_invalid_empty_array() {
2199 let row: &[u8] = &[];
2200 let binary_rows = BinaryArray::from(vec![row]);
2201
2202 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2203 let parsed = converter.from_binary(binary_rows);
2204
2205 converter.convert_rows(parsed.iter()).unwrap();
2206 }
2207
2208 #[test]
2209 #[should_panic(expected = "index out of bounds")]
2210 fn test_invalid_truncated() {
2211 let binary_row: &[u8] = &[0x02];
2212
2213 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2214 let parser = converter.parser();
2215 let utf8_row = parser.parse(binary_row.as_ref());
2216
2217 converter.convert_rows(std::iter::once(utf8_row)).unwrap();
2218 }
2219
2220 #[test]
2221 #[should_panic(expected = "index out of bounds")]
2222 fn test_invalid_truncated_array() {
2223 let row: &[u8] = &[0x02];
2224 let binary_rows = BinaryArray::from(vec![row]);
2225
2226 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2227 let parsed = converter.from_binary(binary_rows);
2228
2229 converter.convert_rows(parsed.iter()).unwrap();
2230 }
2231
2232 #[test]
2233 #[should_panic(expected = "rows were not produced by this RowConverter")]
2234 fn test_different_converter() {
2235 let values = Arc::new(Int32Array::from_iter([Some(1), Some(-1)]));
2236 let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
2237 let rows = converter.convert_columns(&[values]).unwrap();
2238
2239 let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
2240 let _ = converter.convert_rows(&rows);
2241 }
2242
2243 fn test_single_list<O: OffsetSizeTrait>() {
2244 let mut builder = GenericListBuilder::<O, _>::new(Int32Builder::new());
2245 builder.values().append_value(32);
2246 builder.values().append_value(52);
2247 builder.values().append_value(32);
2248 builder.append(true);
2249 builder.values().append_value(32);
2250 builder.values().append_value(52);
2251 builder.values().append_value(12);
2252 builder.append(true);
2253 builder.values().append_value(32);
2254 builder.values().append_value(52);
2255 builder.append(true);
2256 builder.values().append_value(32); builder.values().append_value(52); builder.append(false);
2259 builder.values().append_value(32);
2260 builder.values().append_null();
2261 builder.append(true);
2262 builder.append(true);
2263 builder.values().append_value(17); builder.values().append_null(); builder.append(false);
2266
2267 let list = Arc::new(builder.finish()) as ArrayRef;
2268 let d = list.data_type().clone();
2269
2270 let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
2271
2272 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2273 assert!(rows.row(0) > rows.row(1)); assert!(rows.row(2) < rows.row(1)); assert!(rows.row(3) < rows.row(2)); assert!(rows.row(4) < rows.row(2)); assert!(rows.row(5) < rows.row(2)); assert!(rows.row(3) < rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
2282 assert_eq!(back.len(), 1);
2283 back[0].to_data().validate_full().unwrap();
2284 assert_eq!(&back[0], &list);
2285
2286 let options = SortOptions::default().asc().with_nulls_first(false);
2287 let field = SortField::new_with_options(d.clone(), options);
2288 let converter = RowConverter::new(vec![field]).unwrap();
2289 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2290
2291 assert!(rows.row(0) > rows.row(1)); assert!(rows.row(2) < rows.row(1)); assert!(rows.row(3) > rows.row(2)); assert!(rows.row(4) > rows.row(2)); assert!(rows.row(5) < rows.row(2)); assert!(rows.row(3) > rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
2300 assert_eq!(back.len(), 1);
2301 back[0].to_data().validate_full().unwrap();
2302 assert_eq!(&back[0], &list);
2303
2304 let options = SortOptions::default().desc().with_nulls_first(false);
2305 let field = SortField::new_with_options(d.clone(), options);
2306 let converter = RowConverter::new(vec![field]).unwrap();
2307 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2308
2309 assert!(rows.row(0) < rows.row(1)); assert!(rows.row(2) > rows.row(1)); assert!(rows.row(3) > rows.row(2)); assert!(rows.row(4) > rows.row(2)); assert!(rows.row(5) > rows.row(2)); assert!(rows.row(3) > rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
2318 assert_eq!(back.len(), 1);
2319 back[0].to_data().validate_full().unwrap();
2320 assert_eq!(&back[0], &list);
2321
2322 let options = SortOptions::default().desc().with_nulls_first(true);
2323 let field = SortField::new_with_options(d, options);
2324 let converter = RowConverter::new(vec![field]).unwrap();
2325 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2326
2327 assert!(rows.row(0) < rows.row(1)); assert!(rows.row(2) > rows.row(1)); assert!(rows.row(3) < rows.row(2)); assert!(rows.row(4) < rows.row(2)); assert!(rows.row(5) > rows.row(2)); assert!(rows.row(3) < rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
2336 assert_eq!(back.len(), 1);
2337 back[0].to_data().validate_full().unwrap();
2338 assert_eq!(&back[0], &list);
2339 }
2340
2341 fn test_nested_list<O: OffsetSizeTrait>() {
2342 let mut builder =
2343 GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(Int32Builder::new()));
2344
2345 builder.values().values().append_value(1);
2346 builder.values().values().append_value(2);
2347 builder.values().append(true);
2348 builder.values().values().append_value(1);
2349 builder.values().values().append_null();
2350 builder.values().append(true);
2351 builder.append(true);
2352
2353 builder.values().values().append_value(1);
2354 builder.values().values().append_null();
2355 builder.values().append(true);
2356 builder.values().values().append_value(1);
2357 builder.values().values().append_null();
2358 builder.values().append(true);
2359 builder.append(true);
2360
2361 builder.values().values().append_value(1);
2362 builder.values().values().append_null();
2363 builder.values().append(true);
2364 builder.values().append(false);
2365 builder.append(true);
2366 builder.append(false);
2367
2368 builder.values().values().append_value(1);
2369 builder.values().values().append_value(2);
2370 builder.values().append(true);
2371 builder.append(true);
2372
2373 let list = Arc::new(builder.finish()) as ArrayRef;
2374 let d = list.data_type().clone();
2375
2376 let options = SortOptions::default().asc().with_nulls_first(true);
2384 let field = SortField::new_with_options(d.clone(), options);
2385 let converter = RowConverter::new(vec![field]).unwrap();
2386 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2387
2388 assert!(rows.row(0) > rows.row(1));
2389 assert!(rows.row(1) > rows.row(2));
2390 assert!(rows.row(2) > rows.row(3));
2391 assert!(rows.row(4) < rows.row(0));
2392 assert!(rows.row(4) > rows.row(1));
2393
2394 let back = converter.convert_rows(&rows).unwrap();
2395 assert_eq!(back.len(), 1);
2396 back[0].to_data().validate_full().unwrap();
2397 assert_eq!(&back[0], &list);
2398
2399 let options = SortOptions::default().desc().with_nulls_first(true);
2400 let field = SortField::new_with_options(d.clone(), options);
2401 let converter = RowConverter::new(vec![field]).unwrap();
2402 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2403
2404 assert!(rows.row(0) > rows.row(1));
2405 assert!(rows.row(1) > rows.row(2));
2406 assert!(rows.row(2) > rows.row(3));
2407 assert!(rows.row(4) > rows.row(0));
2408 assert!(rows.row(4) > rows.row(1));
2409
2410 let back = converter.convert_rows(&rows).unwrap();
2411 assert_eq!(back.len(), 1);
2412 back[0].to_data().validate_full().unwrap();
2413 assert_eq!(&back[0], &list);
2414
2415 let options = SortOptions::default().desc().with_nulls_first(false);
2416 let field = SortField::new_with_options(d, options);
2417 let converter = RowConverter::new(vec![field]).unwrap();
2418 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2419
2420 assert!(rows.row(0) < rows.row(1));
2421 assert!(rows.row(1) < rows.row(2));
2422 assert!(rows.row(2) < rows.row(3));
2423 assert!(rows.row(4) > rows.row(0));
2424 assert!(rows.row(4) < rows.row(1));
2425
2426 let back = converter.convert_rows(&rows).unwrap();
2427 assert_eq!(back.len(), 1);
2428 back[0].to_data().validate_full().unwrap();
2429 assert_eq!(&back[0], &list);
2430 }
2431
2432 #[test]
2433 fn test_list() {
2434 test_single_list::<i32>();
2435 test_nested_list::<i32>();
2436 }
2437
2438 #[test]
2439 fn test_large_list() {
2440 test_single_list::<i64>();
2441 test_nested_list::<i64>();
2442 }
2443
2444 #[test]
2445 fn test_fixed_size_list() {
2446 let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 3);
2447 builder.values().append_value(32);
2448 builder.values().append_value(52);
2449 builder.values().append_value(32);
2450 builder.append(true);
2451 builder.values().append_value(32);
2452 builder.values().append_value(52);
2453 builder.values().append_value(12);
2454 builder.append(true);
2455 builder.values().append_value(32);
2456 builder.values().append_value(52);
2457 builder.values().append_null();
2458 builder.append(true);
2459 builder.values().append_value(32); builder.values().append_value(52); builder.values().append_value(13); builder.append(false);
2463 builder.values().append_value(32);
2464 builder.values().append_null();
2465 builder.values().append_null();
2466 builder.append(true);
2467 builder.values().append_null();
2468 builder.values().append_null();
2469 builder.values().append_null();
2470 builder.append(true);
2471 builder.values().append_value(17); builder.values().append_null(); builder.values().append_value(77); builder.append(false);
2475
2476 let list = Arc::new(builder.finish()) as ArrayRef;
2477 let d = list.data_type().clone();
2478
2479 let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
2481
2482 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2483 assert!(rows.row(0) > rows.row(1)); assert!(rows.row(2) < rows.row(1)); assert!(rows.row(3) < rows.row(2)); assert!(rows.row(4) < rows.row(2)); assert!(rows.row(5) < rows.row(2)); assert!(rows.row(3) < rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
2492 assert_eq!(back.len(), 1);
2493 back[0].to_data().validate_full().unwrap();
2494 assert_eq!(&back[0], &list);
2495
2496 let options = SortOptions::default().asc().with_nulls_first(false);
2498 let field = SortField::new_with_options(d.clone(), options);
2499 let converter = RowConverter::new(vec![field]).unwrap();
2500 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2501 assert!(rows.row(0) > rows.row(1)); assert!(rows.row(2) > rows.row(1)); assert!(rows.row(3) > rows.row(2)); assert!(rows.row(4) > rows.row(2)); assert!(rows.row(5) > rows.row(2)); assert!(rows.row(3) > rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
2510 assert_eq!(back.len(), 1);
2511 back[0].to_data().validate_full().unwrap();
2512 assert_eq!(&back[0], &list);
2513
2514 let options = SortOptions::default().desc().with_nulls_first(false);
2516 let field = SortField::new_with_options(d.clone(), options);
2517 let converter = RowConverter::new(vec![field]).unwrap();
2518 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2519 assert!(rows.row(0) < rows.row(1)); assert!(rows.row(2) > rows.row(1)); assert!(rows.row(3) > rows.row(2)); assert!(rows.row(4) > rows.row(2)); assert!(rows.row(5) > rows.row(2)); assert!(rows.row(3) > rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
2528 assert_eq!(back.len(), 1);
2529 back[0].to_data().validate_full().unwrap();
2530 assert_eq!(&back[0], &list);
2531
2532 let options = SortOptions::default().desc().with_nulls_first(true);
2534 let field = SortField::new_with_options(d, options);
2535 let converter = RowConverter::new(vec![field]).unwrap();
2536 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2537
2538 assert!(rows.row(0) < rows.row(1)); assert!(rows.row(2) < rows.row(1)); assert!(rows.row(3) < rows.row(2)); assert!(rows.row(4) < rows.row(2)); assert!(rows.row(5) < rows.row(2)); assert!(rows.row(3) < rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
2547 assert_eq!(back.len(), 1);
2548 back[0].to_data().validate_full().unwrap();
2549 assert_eq!(&back[0], &list);
2550 }
2551
2552 fn generate_primitive_array<K>(len: usize, valid_percent: f64) -> PrimitiveArray<K>
2553 where
2554 K: ArrowPrimitiveType,
2555 StandardUniform: Distribution<K::Native>,
2556 {
2557 let mut rng = rng();
2558 (0..len)
2559 .map(|_| rng.random_bool(valid_percent).then(|| rng.random()))
2560 .collect()
2561 }
2562
2563 fn generate_strings<O: OffsetSizeTrait>(
2564 len: usize,
2565 valid_percent: f64,
2566 ) -> GenericStringArray<O> {
2567 let mut rng = rng();
2568 (0..len)
2569 .map(|_| {
2570 rng.random_bool(valid_percent).then(|| {
2571 let len = rng.random_range(0..100);
2572 let bytes = (0..len).map(|_| rng.random_range(0..128)).collect();
2573 String::from_utf8(bytes).unwrap()
2574 })
2575 })
2576 .collect()
2577 }
2578
2579 fn generate_string_view(len: usize, valid_percent: f64) -> StringViewArray {
2580 let mut rng = rng();
2581 (0..len)
2582 .map(|_| {
2583 rng.random_bool(valid_percent).then(|| {
2584 let len = rng.random_range(0..100);
2585 let bytes = (0..len).map(|_| rng.random_range(0..128)).collect();
2586 String::from_utf8(bytes).unwrap()
2587 })
2588 })
2589 .collect()
2590 }
2591
2592 fn generate_byte_view(len: usize, valid_percent: f64) -> BinaryViewArray {
2593 let mut rng = rng();
2594 (0..len)
2595 .map(|_| {
2596 rng.random_bool(valid_percent).then(|| {
2597 let len = rng.random_range(0..100);
2598 let bytes: Vec<_> = (0..len).map(|_| rng.random_range(0..128)).collect();
2599 bytes
2600 })
2601 })
2602 .collect()
2603 }
2604
2605 fn generate_dictionary<K>(
2606 values: ArrayRef,
2607 len: usize,
2608 valid_percent: f64,
2609 ) -> DictionaryArray<K>
2610 where
2611 K: ArrowDictionaryKeyType,
2612 K::Native: SampleUniform,
2613 {
2614 let mut rng = rng();
2615 let min_key = K::Native::from_usize(0).unwrap();
2616 let max_key = K::Native::from_usize(values.len()).unwrap();
2617 let keys: PrimitiveArray<K> = (0..len)
2618 .map(|_| {
2619 rng.random_bool(valid_percent)
2620 .then(|| rng.random_range(min_key..max_key))
2621 })
2622 .collect();
2623
2624 let data_type =
2625 DataType::Dictionary(Box::new(K::DATA_TYPE), Box::new(values.data_type().clone()));
2626
2627 let data = keys
2628 .into_data()
2629 .into_builder()
2630 .data_type(data_type)
2631 .add_child_data(values.to_data())
2632 .build()
2633 .unwrap();
2634
2635 DictionaryArray::from(data)
2636 }
2637
2638 fn generate_fixed_size_binary(len: usize, valid_percent: f64) -> FixedSizeBinaryArray {
2639 let mut rng = rng();
2640 let width = rng.random_range(0..20);
2641 let mut builder = FixedSizeBinaryBuilder::new(width);
2642
2643 let mut b = vec![0; width as usize];
2644 for _ in 0..len {
2645 match rng.random_bool(valid_percent) {
2646 true => {
2647 b.iter_mut().for_each(|x| *x = rng.random());
2648 builder.append_value(&b).unwrap();
2649 }
2650 false => builder.append_null(),
2651 }
2652 }
2653
2654 builder.finish()
2655 }
2656
2657 fn generate_struct(len: usize, valid_percent: f64) -> StructArray {
2658 let mut rng = rng();
2659 let nulls = NullBuffer::from_iter((0..len).map(|_| rng.random_bool(valid_percent)));
2660 let a = generate_primitive_array::<Int32Type>(len, valid_percent);
2661 let b = generate_strings::<i32>(len, valid_percent);
2662 let fields = Fields::from(vec![
2663 Field::new("a", DataType::Int32, true),
2664 Field::new("b", DataType::Utf8, true),
2665 ]);
2666 let values = vec![Arc::new(a) as _, Arc::new(b) as _];
2667 StructArray::new(fields, values, Some(nulls))
2668 }
2669
2670 fn generate_list<F>(len: usize, valid_percent: f64, values: F) -> ListArray
2671 where
2672 F: FnOnce(usize) -> ArrayRef,
2673 {
2674 let mut rng = rng();
2675 let offsets = OffsetBuffer::<i32>::from_lengths((0..len).map(|_| rng.random_range(0..10)));
2676 let values_len = offsets.last().unwrap().to_usize().unwrap();
2677 let values = values(values_len);
2678 let nulls = NullBuffer::from_iter((0..len).map(|_| rng.random_bool(valid_percent)));
2679 let field = Arc::new(Field::new_list_field(values.data_type().clone(), true));
2680 ListArray::new(field, offsets, values, Some(nulls))
2681 }
2682
2683 fn generate_column(len: usize) -> ArrayRef {
2684 let mut rng = rng();
2685 match rng.random_range(0..16) {
2686 0 => Arc::new(generate_primitive_array::<Int32Type>(len, 0.8)),
2687 1 => Arc::new(generate_primitive_array::<UInt32Type>(len, 0.8)),
2688 2 => Arc::new(generate_primitive_array::<Int64Type>(len, 0.8)),
2689 3 => Arc::new(generate_primitive_array::<UInt64Type>(len, 0.8)),
2690 4 => Arc::new(generate_primitive_array::<Float32Type>(len, 0.8)),
2691 5 => Arc::new(generate_primitive_array::<Float64Type>(len, 0.8)),
2692 6 => Arc::new(generate_strings::<i32>(len, 0.8)),
2693 7 => Arc::new(generate_dictionary::<Int64Type>(
2694 Arc::new(generate_strings::<i32>(rng.random_range(1..len), 1.0)),
2696 len,
2697 0.8,
2698 )),
2699 8 => Arc::new(generate_dictionary::<Int64Type>(
2700 Arc::new(generate_primitive_array::<Int64Type>(
2702 rng.random_range(1..len),
2703 1.0,
2704 )),
2705 len,
2706 0.8,
2707 )),
2708 9 => Arc::new(generate_fixed_size_binary(len, 0.8)),
2709 10 => Arc::new(generate_struct(len, 0.8)),
2710 11 => Arc::new(generate_list(len, 0.8, |values_len| {
2711 Arc::new(generate_primitive_array::<Int64Type>(values_len, 0.8))
2712 })),
2713 12 => Arc::new(generate_list(len, 0.8, |values_len| {
2714 Arc::new(generate_strings::<i32>(values_len, 0.8))
2715 })),
2716 13 => Arc::new(generate_list(len, 0.8, |values_len| {
2717 Arc::new(generate_struct(values_len, 0.8))
2718 })),
2719 14 => Arc::new(generate_string_view(len, 0.8)),
2720 15 => Arc::new(generate_byte_view(len, 0.8)),
2721 _ => unreachable!(),
2722 }
2723 }
2724
2725 fn print_row(cols: &[SortColumn], row: usize) -> String {
2726 let t: Vec<_> = cols
2727 .iter()
2728 .map(|x| match x.values.is_valid(row) {
2729 true => {
2730 let opts = FormatOptions::default().with_null("NULL");
2731 let formatter = ArrayFormatter::try_new(x.values.as_ref(), &opts).unwrap();
2732 formatter.value(row).to_string()
2733 }
2734 false => "NULL".to_string(),
2735 })
2736 .collect();
2737 t.join(",")
2738 }
2739
2740 fn print_col_types(cols: &[SortColumn]) -> String {
2741 let t: Vec<_> = cols
2742 .iter()
2743 .map(|x| x.values.data_type().to_string())
2744 .collect();
2745 t.join(",")
2746 }
2747
2748 #[test]
2749 #[cfg_attr(miri, ignore)]
2750 fn fuzz_test() {
2751 for _ in 0..100 {
2752 let mut rng = rng();
2753 let num_columns = rng.random_range(1..5);
2754 let len = rng.random_range(5..100);
2755 let arrays: Vec<_> = (0..num_columns).map(|_| generate_column(len)).collect();
2756
2757 let options: Vec<_> = (0..num_columns)
2758 .map(|_| SortOptions {
2759 descending: rng.random_bool(0.5),
2760 nulls_first: rng.random_bool(0.5),
2761 })
2762 .collect();
2763
2764 let sort_columns: Vec<_> = options
2765 .iter()
2766 .zip(&arrays)
2767 .map(|(o, c)| SortColumn {
2768 values: Arc::clone(c),
2769 options: Some(*o),
2770 })
2771 .collect();
2772
2773 let comparator = LexicographicalComparator::try_new(&sort_columns).unwrap();
2774
2775 let columns: Vec<SortField> = options
2776 .into_iter()
2777 .zip(&arrays)
2778 .map(|(o, a)| SortField::new_with_options(a.data_type().clone(), o))
2779 .collect();
2780
2781 let converter = RowConverter::new(columns).unwrap();
2782 let rows = converter.convert_columns(&arrays).unwrap();
2783
2784 for i in 0..len {
2785 for j in 0..len {
2786 let row_i = rows.row(i);
2787 let row_j = rows.row(j);
2788 let row_cmp = row_i.cmp(&row_j);
2789 let lex_cmp = comparator.compare(i, j);
2790 assert_eq!(
2791 row_cmp,
2792 lex_cmp,
2793 "({:?} vs {:?}) vs ({:?} vs {:?}) for types {}",
2794 print_row(&sort_columns, i),
2795 print_row(&sort_columns, j),
2796 row_i,
2797 row_j,
2798 print_col_types(&sort_columns)
2799 );
2800 }
2801 }
2802
2803 let back = converter.convert_rows(&rows).unwrap();
2804 for (actual, expected) in back.iter().zip(&arrays) {
2805 actual.to_data().validate_full().unwrap();
2806 dictionary_eq(actual, expected)
2807 }
2808
2809 let rows = rows.try_into_binary().expect("reasonable size");
2811 let parser = converter.parser();
2812 let back = converter
2813 .convert_rows(rows.iter().map(|b| parser.parse(b.expect("valid bytes"))))
2814 .unwrap();
2815 for (actual, expected) in back.iter().zip(&arrays) {
2816 actual.to_data().validate_full().unwrap();
2817 dictionary_eq(actual, expected)
2818 }
2819
2820 let rows = converter.from_binary(rows);
2821 let back = converter.convert_rows(&rows).unwrap();
2822 for (actual, expected) in back.iter().zip(&arrays) {
2823 actual.to_data().validate_full().unwrap();
2824 dictionary_eq(actual, expected)
2825 }
2826 }
2827 }
2828
2829 #[test]
2830 fn test_clear() {
2831 let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
2832 let mut rows = converter.empty_rows(3, 128);
2833
2834 let first = Int32Array::from(vec![None, Some(2), Some(4)]);
2835 let second = Int32Array::from(vec![Some(2), None, Some(4)]);
2836 let arrays = [Arc::new(first) as ArrayRef, Arc::new(second) as ArrayRef];
2837
2838 for array in arrays.iter() {
2839 rows.clear();
2840 converter.append(&mut rows, &[array.clone()]).unwrap();
2841 let back = converter.convert_rows(&rows).unwrap();
2842 assert_eq!(&back[0], array);
2843 }
2844
2845 let mut rows_expected = converter.empty_rows(3, 128);
2846 converter.append(&mut rows_expected, &arrays[1..]).unwrap();
2847
2848 for (i, (actual, expected)) in rows.iter().zip(rows_expected.iter()).enumerate() {
2849 assert_eq!(
2850 actual, expected,
2851 "For row {i}: expected {expected:?}, actual: {actual:?}",
2852 );
2853 }
2854 }
2855
2856 #[test]
2857 fn test_append_codec_dictionary_binary() {
2858 use DataType::*;
2859 let converter = RowConverter::new(vec![SortField::new(Dictionary(
2861 Box::new(Int32),
2862 Box::new(Binary),
2863 ))])
2864 .unwrap();
2865 let mut rows = converter.empty_rows(4, 128);
2866
2867 let keys = Int32Array::from_iter_values([0, 1, 2, 3]);
2868 let values = BinaryArray::from(vec![
2869 Some("a".as_bytes()),
2870 Some(b"b"),
2871 Some(b"c"),
2872 Some(b"d"),
2873 ]);
2874 let dict_array = DictionaryArray::new(keys, Arc::new(values));
2875
2876 rows.clear();
2877 let array = Arc::new(dict_array) as ArrayRef;
2878 converter.append(&mut rows, &[array.clone()]).unwrap();
2879 let back = converter.convert_rows(&rows).unwrap();
2880
2881 dictionary_eq(&back[0], &array);
2882 }
2883
2884 #[test]
2885 fn test_list_prefix() {
2886 let mut a = ListBuilder::new(Int8Builder::new());
2887 a.append_value([None]);
2888 a.append_value([None, None]);
2889 let a = a.finish();
2890
2891 let converter = RowConverter::new(vec![SortField::new(a.data_type().clone())]).unwrap();
2892 let rows = converter.convert_columns(&[Arc::new(a) as _]).unwrap();
2893 assert_eq!(rows.row(0).cmp(&rows.row(1)), Ordering::Less);
2894 }
2895}