1#![doc(
129 html_logo_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_white-bg.svg",
130 html_favicon_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_transparent-bg.svg"
131)]
132#![cfg_attr(docsrs, feature(doc_auto_cfg))]
133#![warn(missing_docs)]
134use std::cmp::Ordering;
135use std::hash::{Hash, Hasher};
136use std::sync::Arc;
137
138use arrow_array::cast::*;
139use arrow_array::types::ArrowDictionaryKeyType;
140use arrow_array::*;
141use arrow_buffer::{ArrowNativeType, Buffer, OffsetBuffer, ScalarBuffer};
142use arrow_data::ArrayDataBuilder;
143use arrow_schema::*;
144use variable::{decode_binary_view, decode_string_view};
145
146use crate::fixed::{decode_bool, decode_fixed_size_binary, decode_primitive};
147use crate::list::{compute_lengths_fixed_size_list, encode_fixed_size_list};
148use crate::variable::{decode_binary, decode_string};
149use arrow_array::types::{Int16Type, Int32Type, Int64Type};
150
151mod fixed;
152mod list;
153mod run;
154mod variable;
155
156#[derive(Debug)]
413pub struct RowConverter {
414 fields: Arc<[SortField]>,
415 codecs: Vec<Codec>,
417}
418
419#[derive(Debug)]
420enum Codec {
421 Stateless,
423 Dictionary(RowConverter, OwnedRow),
426 Struct(RowConverter, OwnedRow),
429 List(RowConverter),
431 RunEndEncoded(RowConverter),
433}
434
435impl Codec {
436 fn new(sort_field: &SortField) -> Result<Self, ArrowError> {
437 match &sort_field.data_type {
438 DataType::Dictionary(_, values) => {
439 let sort_field =
440 SortField::new_with_options(values.as_ref().clone(), sort_field.options);
441
442 let converter = RowConverter::new(vec![sort_field])?;
443 let null_array = new_null_array(values.as_ref(), 1);
444 let nulls = converter.convert_columns(&[null_array])?;
445
446 let owned = OwnedRow {
447 data: nulls.buffer.into(),
448 config: nulls.config,
449 };
450 Ok(Self::Dictionary(converter, owned))
451 }
452 DataType::RunEndEncoded(_, values) => {
453 let options = SortOptions {
455 descending: false,
456 nulls_first: sort_field.options.nulls_first != sort_field.options.descending,
457 };
458
459 let field = SortField::new_with_options(values.data_type().clone(), options);
460 let converter = RowConverter::new(vec![field])?;
461 Ok(Self::RunEndEncoded(converter))
462 }
463 d if !d.is_nested() => Ok(Self::Stateless),
464 DataType::List(f) | DataType::LargeList(f) => {
465 let options = SortOptions {
469 descending: false,
470 nulls_first: sort_field.options.nulls_first != sort_field.options.descending,
471 };
472
473 let field = SortField::new_with_options(f.data_type().clone(), options);
474 let converter = RowConverter::new(vec![field])?;
475 Ok(Self::List(converter))
476 }
477 DataType::FixedSizeList(f, _) => {
478 let field = SortField::new_with_options(f.data_type().clone(), sort_field.options);
479 let converter = RowConverter::new(vec![field])?;
480 Ok(Self::List(converter))
481 }
482 DataType::Struct(f) => {
483 let sort_fields = f
484 .iter()
485 .map(|x| SortField::new_with_options(x.data_type().clone(), sort_field.options))
486 .collect();
487
488 let converter = RowConverter::new(sort_fields)?;
489 let nulls: Vec<_> = f.iter().map(|x| new_null_array(x.data_type(), 1)).collect();
490
491 let nulls = converter.convert_columns(&nulls)?;
492 let owned = OwnedRow {
493 data: nulls.buffer.into(),
494 config: nulls.config,
495 };
496
497 Ok(Self::Struct(converter, owned))
498 }
499 _ => Err(ArrowError::NotYetImplemented(format!(
500 "not yet implemented: {:?}",
501 sort_field.data_type
502 ))),
503 }
504 }
505
506 fn encoder(&self, array: &dyn Array) -> Result<Encoder<'_>, ArrowError> {
507 match self {
508 Codec::Stateless => Ok(Encoder::Stateless),
509 Codec::Dictionary(converter, nulls) => {
510 let values = array.as_any_dictionary().values().clone();
511 let rows = converter.convert_columns(&[values])?;
512 Ok(Encoder::Dictionary(rows, nulls.row()))
513 }
514 Codec::Struct(converter, null) => {
515 let v = as_struct_array(array);
516 let rows = converter.convert_columns(v.columns())?;
517 Ok(Encoder::Struct(rows, null.row()))
518 }
519 Codec::List(converter) => {
520 let values = match array.data_type() {
521 DataType::List(_) => {
522 let list_array = as_list_array(array);
523 let first_offset = list_array.offsets()[0] as usize;
524 let last_offset =
525 list_array.offsets()[list_array.offsets().len() - 1] as usize;
526
527 list_array
530 .values()
531 .slice(first_offset, last_offset - first_offset)
532 }
533 DataType::LargeList(_) => {
534 let list_array = as_large_list_array(array);
535
536 let first_offset = list_array.offsets()[0] as usize;
537 let last_offset =
538 list_array.offsets()[list_array.offsets().len() - 1] as usize;
539
540 list_array
543 .values()
544 .slice(first_offset, last_offset - first_offset)
545 }
546 DataType::FixedSizeList(_, _) => {
547 as_fixed_size_list_array(array).values().clone()
548 }
549 _ => unreachable!(),
550 };
551 let rows = converter.convert_columns(&[values])?;
552 Ok(Encoder::List(rows))
553 }
554 Codec::RunEndEncoded(converter) => {
555 let values = match array.data_type() {
556 DataType::RunEndEncoded(r, _) => match r.data_type() {
557 DataType::Int16 => array.as_run::<Int16Type>().values(),
558 DataType::Int32 => array.as_run::<Int32Type>().values(),
559 DataType::Int64 => array.as_run::<Int64Type>().values(),
560 _ => unreachable!("Unsupported run end index type: {r:?}"),
561 },
562 _ => unreachable!(),
563 };
564 let rows = converter.convert_columns(&[values.clone()])?;
565 Ok(Encoder::RunEndEncoded(rows))
566 }
567 }
568 }
569
570 fn size(&self) -> usize {
571 match self {
572 Codec::Stateless => 0,
573 Codec::Dictionary(converter, nulls) => converter.size() + nulls.data.len(),
574 Codec::Struct(converter, nulls) => converter.size() + nulls.data.len(),
575 Codec::List(converter) => converter.size(),
576 Codec::RunEndEncoded(converter) => converter.size(),
577 }
578 }
579}
580
581#[derive(Debug)]
582enum Encoder<'a> {
583 Stateless,
585 Dictionary(Rows, Row<'a>),
587 Struct(Rows, Row<'a>),
593 List(Rows),
595 RunEndEncoded(Rows),
597}
598
599#[derive(Debug, Clone, PartialEq, Eq)]
601pub struct SortField {
602 options: SortOptions,
604 data_type: DataType,
606}
607
608impl SortField {
609 pub fn new(data_type: DataType) -> Self {
611 Self::new_with_options(data_type, Default::default())
612 }
613
614 pub fn new_with_options(data_type: DataType, options: SortOptions) -> Self {
616 Self { options, data_type }
617 }
618
619 pub fn size(&self) -> usize {
623 self.data_type.size() + std::mem::size_of::<Self>() - std::mem::size_of::<DataType>()
624 }
625}
626
627impl RowConverter {
628 pub fn new(fields: Vec<SortField>) -> Result<Self, ArrowError> {
630 if !Self::supports_fields(&fields) {
631 return Err(ArrowError::NotYetImplemented(format!(
632 "Row format support not yet implemented for: {fields:?}"
633 )));
634 }
635
636 let codecs = fields.iter().map(Codec::new).collect::<Result<_, _>>()?;
637 Ok(Self {
638 fields: fields.into(),
639 codecs,
640 })
641 }
642
643 pub fn supports_fields(fields: &[SortField]) -> bool {
645 fields.iter().all(|x| Self::supports_datatype(&x.data_type))
646 }
647
648 fn supports_datatype(d: &DataType) -> bool {
649 match d {
650 _ if !d.is_nested() => true,
651 DataType::List(f) | DataType::LargeList(f) | DataType::FixedSizeList(f, _) => {
652 Self::supports_datatype(f.data_type())
653 }
654 DataType::Struct(f) => f.iter().all(|x| Self::supports_datatype(x.data_type())),
655 DataType::RunEndEncoded(_, values) => Self::supports_datatype(values.data_type()),
656 _ => false,
657 }
658 }
659
660 pub fn convert_columns(&self, columns: &[ArrayRef]) -> Result<Rows, ArrowError> {
668 let num_rows = columns.first().map(|x| x.len()).unwrap_or(0);
669 let mut rows = self.empty_rows(num_rows, 0);
670 self.append(&mut rows, columns)?;
671 Ok(rows)
672 }
673
674 pub fn append(&self, rows: &mut Rows, columns: &[ArrayRef]) -> Result<(), ArrowError> {
705 assert!(
706 Arc::ptr_eq(&rows.config.fields, &self.fields),
707 "rows were not produced by this RowConverter"
708 );
709
710 if columns.len() != self.fields.len() {
711 return Err(ArrowError::InvalidArgumentError(format!(
712 "Incorrect number of arrays provided to RowConverter, expected {} got {}",
713 self.fields.len(),
714 columns.len()
715 )));
716 }
717 for colum in columns.iter().skip(1) {
718 if colum.len() != columns[0].len() {
719 return Err(ArrowError::InvalidArgumentError(format!(
720 "RowConverter columns must all have the same length, expected {} got {}",
721 columns[0].len(),
722 colum.len()
723 )));
724 }
725 }
726
727 let encoders = columns
728 .iter()
729 .zip(&self.codecs)
730 .zip(self.fields.iter())
731 .map(|((column, codec), field)| {
732 if !column.data_type().equals_datatype(&field.data_type) {
733 return Err(ArrowError::InvalidArgumentError(format!(
734 "RowConverter column schema mismatch, expected {} got {}",
735 field.data_type,
736 column.data_type()
737 )));
738 }
739 codec.encoder(column.as_ref())
740 })
741 .collect::<Result<Vec<_>, _>>()?;
742
743 let write_offset = rows.num_rows();
744 let lengths = row_lengths(columns, &encoders);
745 let total = lengths.extend_offsets(rows.offsets[write_offset], &mut rows.offsets);
746 rows.buffer.resize(total, 0);
747
748 for ((column, field), encoder) in columns.iter().zip(self.fields.iter()).zip(encoders) {
749 encode_column(
751 &mut rows.buffer,
752 &mut rows.offsets[write_offset..],
753 column.as_ref(),
754 field.options,
755 &encoder,
756 )
757 }
758
759 if cfg!(debug_assertions) {
760 assert_eq!(*rows.offsets.last().unwrap(), rows.buffer.len());
761 rows.offsets
762 .windows(2)
763 .for_each(|w| assert!(w[0] <= w[1], "offsets should be monotonic"));
764 }
765
766 Ok(())
767 }
768
769 pub fn convert_rows<'a, I>(&self, rows: I) -> Result<Vec<ArrayRef>, ArrowError>
775 where
776 I: IntoIterator<Item = Row<'a>>,
777 {
778 let mut validate_utf8 = false;
779 let mut rows: Vec<_> = rows
780 .into_iter()
781 .map(|row| {
782 assert!(
783 Arc::ptr_eq(&row.config.fields, &self.fields),
784 "rows were not produced by this RowConverter"
785 );
786 validate_utf8 |= row.config.validate_utf8;
787 row.data
788 })
789 .collect();
790
791 let result = unsafe { self.convert_raw(&mut rows, validate_utf8) }?;
795
796 if cfg!(test) {
797 for (i, row) in rows.iter().enumerate() {
798 if !row.is_empty() {
799 return Err(ArrowError::InvalidArgumentError(format!(
800 "Codecs {codecs:?} did not consume all bytes for row {i}, remaining bytes: {row:?}",
801 codecs = &self.codecs
802 )));
803 }
804 }
805 }
806
807 Ok(result)
808 }
809
810 pub fn empty_rows(&self, row_capacity: usize, data_capacity: usize) -> Rows {
839 let mut offsets = Vec::with_capacity(row_capacity.saturating_add(1));
840 offsets.push(0);
841
842 Rows {
843 offsets,
844 buffer: Vec::with_capacity(data_capacity),
845 config: RowConfig {
846 fields: self.fields.clone(),
847 validate_utf8: false,
848 },
849 }
850 }
851
852 pub fn from_binary(&self, array: BinaryArray) -> Rows {
879 assert_eq!(
880 array.null_count(),
881 0,
882 "can't construct Rows instance from array with nulls"
883 );
884 Rows {
885 buffer: array.values().to_vec(),
886 offsets: array.offsets().iter().map(|&i| i.as_usize()).collect(),
887 config: RowConfig {
888 fields: Arc::clone(&self.fields),
889 validate_utf8: true,
890 },
891 }
892 }
893
894 unsafe fn convert_raw(
900 &self,
901 rows: &mut [&[u8]],
902 validate_utf8: bool,
903 ) -> Result<Vec<ArrayRef>, ArrowError> {
904 self.fields
905 .iter()
906 .zip(&self.codecs)
907 .map(|(field, codec)| decode_column(field, rows, codec, validate_utf8))
908 .collect()
909 }
910
911 pub fn parser(&self) -> RowParser {
913 RowParser::new(Arc::clone(&self.fields))
914 }
915
916 pub fn size(&self) -> usize {
920 std::mem::size_of::<Self>()
921 + self.fields.iter().map(|x| x.size()).sum::<usize>()
922 + self.codecs.capacity() * std::mem::size_of::<Codec>()
923 + self.codecs.iter().map(Codec::size).sum::<usize>()
924 }
925}
926
927#[derive(Debug)]
929pub struct RowParser {
930 config: RowConfig,
931}
932
933impl RowParser {
934 fn new(fields: Arc<[SortField]>) -> Self {
935 Self {
936 config: RowConfig {
937 fields,
938 validate_utf8: true,
939 },
940 }
941 }
942
943 pub fn parse<'a>(&'a self, bytes: &'a [u8]) -> Row<'a> {
948 Row {
949 data: bytes,
950 config: &self.config,
951 }
952 }
953}
954
955#[derive(Debug, Clone)]
957struct RowConfig {
958 fields: Arc<[SortField]>,
960 validate_utf8: bool,
962}
963
964#[derive(Debug)]
968pub struct Rows {
969 buffer: Vec<u8>,
971 offsets: Vec<usize>,
973 config: RowConfig,
975}
976
977impl Rows {
978 pub fn push(&mut self, row: Row<'_>) {
980 assert!(
981 Arc::ptr_eq(&row.config.fields, &self.config.fields),
982 "row was not produced by this RowConverter"
983 );
984 self.config.validate_utf8 |= row.config.validate_utf8;
985 self.buffer.extend_from_slice(row.data);
986 self.offsets.push(self.buffer.len())
987 }
988
989 pub fn row(&self, row: usize) -> Row<'_> {
991 assert!(row + 1 < self.offsets.len());
992 unsafe { self.row_unchecked(row) }
993 }
994
995 pub unsafe fn row_unchecked(&self, index: usize) -> Row<'_> {
1000 let end = unsafe { self.offsets.get_unchecked(index + 1) };
1001 let start = unsafe { self.offsets.get_unchecked(index) };
1002 let data = unsafe { self.buffer.get_unchecked(*start..*end) };
1003 Row {
1004 data,
1005 config: &self.config,
1006 }
1007 }
1008
1009 pub fn clear(&mut self) {
1011 self.offsets.truncate(1);
1012 self.buffer.clear();
1013 }
1014
1015 pub fn num_rows(&self) -> usize {
1017 self.offsets.len() - 1
1018 }
1019
1020 pub fn iter(&self) -> RowsIter<'_> {
1022 self.into_iter()
1023 }
1024
1025 pub fn size(&self) -> usize {
1029 std::mem::size_of::<Self>()
1031 + self.buffer.len()
1032 + self.offsets.len() * std::mem::size_of::<usize>()
1033 }
1034
1035 pub fn try_into_binary(self) -> Result<BinaryArray, ArrowError> {
1065 if self.buffer.len() > i32::MAX as usize {
1066 return Err(ArrowError::InvalidArgumentError(format!(
1067 "{}-byte rows buffer too long to convert into a i32-indexed BinaryArray",
1068 self.buffer.len()
1069 )));
1070 }
1071 let offsets_scalar = ScalarBuffer::from_iter(self.offsets.into_iter().map(i32::usize_as));
1073 let array = unsafe {
1075 BinaryArray::new_unchecked(
1076 OffsetBuffer::new_unchecked(offsets_scalar),
1077 Buffer::from_vec(self.buffer),
1078 None,
1079 )
1080 };
1081 Ok(array)
1082 }
1083}
1084
1085impl<'a> IntoIterator for &'a Rows {
1086 type Item = Row<'a>;
1087 type IntoIter = RowsIter<'a>;
1088
1089 fn into_iter(self) -> Self::IntoIter {
1090 RowsIter {
1091 rows: self,
1092 start: 0,
1093 end: self.num_rows(),
1094 }
1095 }
1096}
1097
1098#[derive(Debug)]
1100pub struct RowsIter<'a> {
1101 rows: &'a Rows,
1102 start: usize,
1103 end: usize,
1104}
1105
1106impl<'a> Iterator for RowsIter<'a> {
1107 type Item = Row<'a>;
1108
1109 fn next(&mut self) -> Option<Self::Item> {
1110 if self.end == self.start {
1111 return None;
1112 }
1113
1114 let row = unsafe { self.rows.row_unchecked(self.start) };
1116 self.start += 1;
1117 Some(row)
1118 }
1119
1120 fn size_hint(&self) -> (usize, Option<usize>) {
1121 let len = self.len();
1122 (len, Some(len))
1123 }
1124}
1125
1126impl ExactSizeIterator for RowsIter<'_> {
1127 fn len(&self) -> usize {
1128 self.end - self.start
1129 }
1130}
1131
1132impl DoubleEndedIterator for RowsIter<'_> {
1133 fn next_back(&mut self) -> Option<Self::Item> {
1134 if self.end == self.start {
1135 return None;
1136 }
1137 let row = unsafe { self.rows.row_unchecked(self.end) };
1139 self.end -= 1;
1140 Some(row)
1141 }
1142}
1143
1144#[derive(Debug, Copy, Clone)]
1153pub struct Row<'a> {
1154 data: &'a [u8],
1155 config: &'a RowConfig,
1156}
1157
1158impl<'a> Row<'a> {
1159 pub fn owned(&self) -> OwnedRow {
1161 OwnedRow {
1162 data: self.data.into(),
1163 config: self.config.clone(),
1164 }
1165 }
1166
1167 pub fn data(&self) -> &'a [u8] {
1169 self.data
1170 }
1171}
1172
1173impl PartialEq for Row<'_> {
1176 #[inline]
1177 fn eq(&self, other: &Self) -> bool {
1178 self.data.eq(other.data)
1179 }
1180}
1181
1182impl Eq for Row<'_> {}
1183
1184impl PartialOrd for Row<'_> {
1185 #[inline]
1186 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1187 Some(self.cmp(other))
1188 }
1189}
1190
1191impl Ord for Row<'_> {
1192 #[inline]
1193 fn cmp(&self, other: &Self) -> Ordering {
1194 self.data.cmp(other.data)
1195 }
1196}
1197
1198impl Hash for Row<'_> {
1199 #[inline]
1200 fn hash<H: Hasher>(&self, state: &mut H) {
1201 self.data.hash(state)
1202 }
1203}
1204
1205impl AsRef<[u8]> for Row<'_> {
1206 #[inline]
1207 fn as_ref(&self) -> &[u8] {
1208 self.data
1209 }
1210}
1211
1212#[derive(Debug, Clone)]
1216pub struct OwnedRow {
1217 data: Box<[u8]>,
1218 config: RowConfig,
1219}
1220
1221impl OwnedRow {
1222 pub fn row(&self) -> Row<'_> {
1226 Row {
1227 data: &self.data,
1228 config: &self.config,
1229 }
1230 }
1231}
1232
1233impl PartialEq for OwnedRow {
1236 #[inline]
1237 fn eq(&self, other: &Self) -> bool {
1238 self.row().eq(&other.row())
1239 }
1240}
1241
1242impl Eq for OwnedRow {}
1243
1244impl PartialOrd for OwnedRow {
1245 #[inline]
1246 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1247 Some(self.cmp(other))
1248 }
1249}
1250
1251impl Ord for OwnedRow {
1252 #[inline]
1253 fn cmp(&self, other: &Self) -> Ordering {
1254 self.row().cmp(&other.row())
1255 }
1256}
1257
1258impl Hash for OwnedRow {
1259 #[inline]
1260 fn hash<H: Hasher>(&self, state: &mut H) {
1261 self.row().hash(state)
1262 }
1263}
1264
1265impl AsRef<[u8]> for OwnedRow {
1266 #[inline]
1267 fn as_ref(&self) -> &[u8] {
1268 &self.data
1269 }
1270}
1271
1272#[inline]
1274fn null_sentinel(options: SortOptions) -> u8 {
1275 match options.nulls_first {
1276 true => 0,
1277 false => 0xFF,
1278 }
1279}
1280
1281enum LengthTracker {
1283 Fixed { length: usize, num_rows: usize },
1285 Variable {
1287 fixed_length: usize,
1288 lengths: Vec<usize>,
1289 },
1290}
1291
1292impl LengthTracker {
1293 fn new(num_rows: usize) -> Self {
1294 Self::Fixed {
1295 length: 0,
1296 num_rows,
1297 }
1298 }
1299
1300 fn push_fixed(&mut self, new_length: usize) {
1302 match self {
1303 LengthTracker::Fixed { length, .. } => *length += new_length,
1304 LengthTracker::Variable { fixed_length, .. } => *fixed_length += new_length,
1305 }
1306 }
1307
1308 fn push_variable(&mut self, new_lengths: impl ExactSizeIterator<Item = usize>) {
1310 match self {
1311 LengthTracker::Fixed { length, .. } => {
1312 *self = LengthTracker::Variable {
1313 fixed_length: *length,
1314 lengths: new_lengths.collect(),
1315 }
1316 }
1317 LengthTracker::Variable { lengths, .. } => {
1318 assert_eq!(lengths.len(), new_lengths.len());
1319 lengths
1320 .iter_mut()
1321 .zip(new_lengths)
1322 .for_each(|(length, new_length)| *length += new_length);
1323 }
1324 }
1325 }
1326
1327 fn materialized(&mut self) -> &mut [usize] {
1329 if let LengthTracker::Fixed { length, num_rows } = *self {
1330 *self = LengthTracker::Variable {
1331 fixed_length: length,
1332 lengths: vec![0; num_rows],
1333 };
1334 }
1335
1336 match self {
1337 LengthTracker::Variable { lengths, .. } => lengths,
1338 LengthTracker::Fixed { .. } => unreachable!(),
1339 }
1340 }
1341
1342 fn extend_offsets(&self, initial_offset: usize, offsets: &mut Vec<usize>) -> usize {
1360 match self {
1361 LengthTracker::Fixed { length, num_rows } => {
1362 offsets.extend((0..*num_rows).map(|i| initial_offset + i * length));
1363
1364 initial_offset + num_rows * length
1365 }
1366 LengthTracker::Variable {
1367 fixed_length,
1368 lengths,
1369 } => {
1370 let mut acc = initial_offset;
1371
1372 offsets.extend(lengths.iter().map(|length| {
1373 let current = acc;
1374 acc += length + fixed_length;
1375 current
1376 }));
1377
1378 acc
1379 }
1380 }
1381 }
1382}
1383
1384fn row_lengths(cols: &[ArrayRef], encoders: &[Encoder]) -> LengthTracker {
1386 use fixed::FixedLengthEncoding;
1387
1388 let num_rows = cols.first().map(|x| x.len()).unwrap_or(0);
1389 let mut tracker = LengthTracker::new(num_rows);
1390
1391 for (array, encoder) in cols.iter().zip(encoders) {
1392 match encoder {
1393 Encoder::Stateless => {
1394 downcast_primitive_array! {
1395 array => tracker.push_fixed(fixed::encoded_len(array)),
1396 DataType::Null => {},
1397 DataType::Boolean => tracker.push_fixed(bool::ENCODED_LEN),
1398 DataType::Binary => tracker.push_variable(
1399 as_generic_binary_array::<i32>(array)
1400 .iter()
1401 .map(|slice| variable::encoded_len(slice))
1402 ),
1403 DataType::LargeBinary => tracker.push_variable(
1404 as_generic_binary_array::<i64>(array)
1405 .iter()
1406 .map(|slice| variable::encoded_len(slice))
1407 ),
1408 DataType::BinaryView => tracker.push_variable(
1409 array.as_binary_view()
1410 .iter()
1411 .map(|slice| variable::encoded_len(slice))
1412 ),
1413 DataType::Utf8 => tracker.push_variable(
1414 array.as_string::<i32>()
1415 .iter()
1416 .map(|slice| variable::encoded_len(slice.map(|x| x.as_bytes())))
1417 ),
1418 DataType::LargeUtf8 => tracker.push_variable(
1419 array.as_string::<i64>()
1420 .iter()
1421 .map(|slice| variable::encoded_len(slice.map(|x| x.as_bytes())))
1422 ),
1423 DataType::Utf8View => tracker.push_variable(
1424 array.as_string_view()
1425 .iter()
1426 .map(|slice| variable::encoded_len(slice.map(|x| x.as_bytes())))
1427 ),
1428 DataType::FixedSizeBinary(len) => {
1429 let len = len.to_usize().unwrap();
1430 tracker.push_fixed(1 + len)
1431 }
1432 _ => unimplemented!("unsupported data type: {}", array.data_type()),
1433 }
1434 }
1435 Encoder::Dictionary(values, null) => {
1436 downcast_dictionary_array! {
1437 array => {
1438 tracker.push_variable(
1439 array.keys().iter().map(|v| match v {
1440 Some(k) => values.row(k.as_usize()).data.len(),
1441 None => null.data.len(),
1442 })
1443 )
1444 }
1445 _ => unreachable!(),
1446 }
1447 }
1448 Encoder::Struct(rows, null) => {
1449 let array = as_struct_array(array);
1450 tracker.push_variable((0..array.len()).map(|idx| match array.is_valid(idx) {
1451 true => 1 + rows.row(idx).as_ref().len(),
1452 false => 1 + null.data.len(),
1453 }));
1454 }
1455 Encoder::List(rows) => match array.data_type() {
1456 DataType::List(_) => {
1457 list::compute_lengths(tracker.materialized(), rows, as_list_array(array))
1458 }
1459 DataType::LargeList(_) => {
1460 list::compute_lengths(tracker.materialized(), rows, as_large_list_array(array))
1461 }
1462 DataType::FixedSizeList(_, _) => compute_lengths_fixed_size_list(
1463 &mut tracker,
1464 rows,
1465 as_fixed_size_list_array(array),
1466 ),
1467 _ => unreachable!(),
1468 },
1469 Encoder::RunEndEncoded(rows) => match array.data_type() {
1470 DataType::RunEndEncoded(r, _) => match r.data_type() {
1471 DataType::Int16 => run::compute_lengths(
1472 tracker.materialized(),
1473 rows,
1474 array.as_run::<Int16Type>(),
1475 ),
1476 DataType::Int32 => run::compute_lengths(
1477 tracker.materialized(),
1478 rows,
1479 array.as_run::<Int32Type>(),
1480 ),
1481 DataType::Int64 => run::compute_lengths(
1482 tracker.materialized(),
1483 rows,
1484 array.as_run::<Int64Type>(),
1485 ),
1486 _ => unreachable!("Unsupported run end index type: {r:?}"),
1487 },
1488 _ => unreachable!(),
1489 },
1490 }
1491 }
1492
1493 tracker
1494}
1495
1496fn encode_column(
1498 data: &mut [u8],
1499 offsets: &mut [usize],
1500 column: &dyn Array,
1501 opts: SortOptions,
1502 encoder: &Encoder<'_>,
1503) {
1504 match encoder {
1505 Encoder::Stateless => {
1506 downcast_primitive_array! {
1507 column => {
1508 if let Some(nulls) = column.nulls().filter(|n| n.null_count() > 0){
1509 fixed::encode(data, offsets, column.values(), nulls, opts)
1510 } else {
1511 fixed::encode_not_null(data, offsets, column.values(), opts)
1512 }
1513 }
1514 DataType::Null => {}
1515 DataType::Boolean => {
1516 if let Some(nulls) = column.nulls().filter(|n| n.null_count() > 0){
1517 fixed::encode_boolean(data, offsets, column.as_boolean().values(), nulls, opts)
1518 } else {
1519 fixed::encode_boolean_not_null(data, offsets, column.as_boolean().values(), opts)
1520 }
1521 }
1522 DataType::Binary => {
1523 variable::encode(data, offsets, as_generic_binary_array::<i32>(column).iter(), opts)
1524 }
1525 DataType::BinaryView => {
1526 variable::encode(data, offsets, column.as_binary_view().iter(), opts)
1527 }
1528 DataType::LargeBinary => {
1529 variable::encode(data, offsets, as_generic_binary_array::<i64>(column).iter(), opts)
1530 }
1531 DataType::Utf8 => variable::encode(
1532 data, offsets,
1533 column.as_string::<i32>().iter().map(|x| x.map(|x| x.as_bytes())),
1534 opts,
1535 ),
1536 DataType::LargeUtf8 => variable::encode(
1537 data, offsets,
1538 column.as_string::<i64>()
1539 .iter()
1540 .map(|x| x.map(|x| x.as_bytes())),
1541 opts,
1542 ),
1543 DataType::Utf8View => variable::encode(
1544 data, offsets,
1545 column.as_string_view().iter().map(|x| x.map(|x| x.as_bytes())),
1546 opts,
1547 ),
1548 DataType::FixedSizeBinary(_) => {
1549 let array = column.as_any().downcast_ref().unwrap();
1550 fixed::encode_fixed_size_binary(data, offsets, array, opts)
1551 }
1552 _ => unimplemented!("unsupported data type: {}", column.data_type()),
1553 }
1554 }
1555 Encoder::Dictionary(values, nulls) => {
1556 downcast_dictionary_array! {
1557 column => encode_dictionary_values(data, offsets, column, values, nulls),
1558 _ => unreachable!()
1559 }
1560 }
1561 Encoder::Struct(rows, null) => {
1562 let array = as_struct_array(column);
1563 let null_sentinel = null_sentinel(opts);
1564 offsets
1565 .iter_mut()
1566 .skip(1)
1567 .enumerate()
1568 .for_each(|(idx, offset)| {
1569 let (row, sentinel) = match array.is_valid(idx) {
1570 true => (rows.row(idx), 0x01),
1571 false => (*null, null_sentinel),
1572 };
1573 let end_offset = *offset + 1 + row.as_ref().len();
1574 data[*offset] = sentinel;
1575 data[*offset + 1..end_offset].copy_from_slice(row.as_ref());
1576 *offset = end_offset;
1577 })
1578 }
1579 Encoder::List(rows) => match column.data_type() {
1580 DataType::List(_) => list::encode(data, offsets, rows, opts, as_list_array(column)),
1581 DataType::LargeList(_) => {
1582 list::encode(data, offsets, rows, opts, as_large_list_array(column))
1583 }
1584 DataType::FixedSizeList(_, _) => {
1585 encode_fixed_size_list(data, offsets, rows, opts, as_fixed_size_list_array(column))
1586 }
1587 _ => unreachable!(),
1588 },
1589 Encoder::RunEndEncoded(rows) => match column.data_type() {
1590 DataType::RunEndEncoded(r, _) => match r.data_type() {
1591 DataType::Int16 => {
1592 run::encode(data, offsets, rows, opts, column.as_run::<Int16Type>())
1593 }
1594 DataType::Int32 => {
1595 run::encode(data, offsets, rows, opts, column.as_run::<Int32Type>())
1596 }
1597 DataType::Int64 => {
1598 run::encode(data, offsets, rows, opts, column.as_run::<Int64Type>())
1599 }
1600 _ => unreachable!("Unsupported run end index type: {r:?}"),
1601 },
1602 _ => unreachable!(),
1603 },
1604 }
1605}
1606
1607pub fn encode_dictionary_values<K: ArrowDictionaryKeyType>(
1609 data: &mut [u8],
1610 offsets: &mut [usize],
1611 column: &DictionaryArray<K>,
1612 values: &Rows,
1613 null: &Row<'_>,
1614) {
1615 for (offset, k) in offsets.iter_mut().skip(1).zip(column.keys()) {
1616 let row = match k {
1617 Some(k) => values.row(k.as_usize()).data,
1618 None => null.data,
1619 };
1620 let end_offset = *offset + row.len();
1621 data[*offset..end_offset].copy_from_slice(row);
1622 *offset = end_offset;
1623 }
1624}
1625
1626macro_rules! decode_primitive_helper {
1627 ($t:ty, $rows:ident, $data_type:ident, $options:ident) => {
1628 Arc::new(decode_primitive::<$t>($rows, $data_type, $options))
1629 };
1630}
1631
1632unsafe fn decode_column(
1638 field: &SortField,
1639 rows: &mut [&[u8]],
1640 codec: &Codec,
1641 validate_utf8: bool,
1642) -> Result<ArrayRef, ArrowError> {
1643 let options = field.options;
1644
1645 let array: ArrayRef = match codec {
1646 Codec::Stateless => {
1647 let data_type = field.data_type.clone();
1648 downcast_primitive! {
1649 data_type => (decode_primitive_helper, rows, data_type, options),
1650 DataType::Null => Arc::new(NullArray::new(rows.len())),
1651 DataType::Boolean => Arc::new(decode_bool(rows, options)),
1652 DataType::Binary => Arc::new(decode_binary::<i32>(rows, options)),
1653 DataType::LargeBinary => Arc::new(decode_binary::<i64>(rows, options)),
1654 DataType::BinaryView => Arc::new(decode_binary_view(rows, options)),
1655 DataType::FixedSizeBinary(size) => Arc::new(decode_fixed_size_binary(rows, size, options)),
1656 DataType::Utf8 => Arc::new(decode_string::<i32>(rows, options, validate_utf8)),
1657 DataType::LargeUtf8 => Arc::new(decode_string::<i64>(rows, options, validate_utf8)),
1658 DataType::Utf8View => Arc::new(decode_string_view(rows, options, validate_utf8)),
1659 _ => return Err(ArrowError::NotYetImplemented(format!("unsupported data type: {data_type}" )))
1660 }
1661 }
1662 Codec::Dictionary(converter, _) => {
1663 let cols = converter.convert_raw(rows, validate_utf8)?;
1664 cols.into_iter().next().unwrap()
1665 }
1666 Codec::Struct(converter, _) => {
1667 let (null_count, nulls) = fixed::decode_nulls(rows);
1668 rows.iter_mut().for_each(|row| *row = &row[1..]);
1669 let children = converter.convert_raw(rows, validate_utf8)?;
1670
1671 let child_data = children.iter().map(|c| c.to_data()).collect();
1672 let builder = ArrayDataBuilder::new(field.data_type.clone())
1673 .len(rows.len())
1674 .null_count(null_count)
1675 .null_bit_buffer(Some(nulls))
1676 .child_data(child_data);
1677
1678 Arc::new(StructArray::from(builder.build_unchecked()))
1679 }
1680 Codec::List(converter) => match &field.data_type {
1681 DataType::List(_) => {
1682 Arc::new(list::decode::<i32>(converter, rows, field, validate_utf8)?)
1683 }
1684 DataType::LargeList(_) => {
1685 Arc::new(list::decode::<i64>(converter, rows, field, validate_utf8)?)
1686 }
1687 DataType::FixedSizeList(_, value_length) => Arc::new(list::decode_fixed_size_list(
1688 converter,
1689 rows,
1690 field,
1691 validate_utf8,
1692 value_length.as_usize(),
1693 )?),
1694 _ => unreachable!(),
1695 },
1696 Codec::RunEndEncoded(converter) => match &field.data_type {
1697 DataType::RunEndEncoded(run_ends, _) => match run_ends.data_type() {
1698 DataType::Int16 => Arc::new(run::decode::<Int16Type>(
1699 converter,
1700 rows,
1701 field,
1702 validate_utf8,
1703 )?),
1704 DataType::Int32 => Arc::new(run::decode::<Int32Type>(
1705 converter,
1706 rows,
1707 field,
1708 validate_utf8,
1709 )?),
1710 DataType::Int64 => Arc::new(run::decode::<Int64Type>(
1711 converter,
1712 rows,
1713 field,
1714 validate_utf8,
1715 )?),
1716 _ => unreachable!(),
1717 },
1718 _ => unreachable!(),
1719 },
1720 };
1721 Ok(array)
1722}
1723
1724#[cfg(test)]
1725mod tests {
1726 use rand::distr::uniform::SampleUniform;
1727 use rand::distr::{Distribution, StandardUniform};
1728 use rand::{rng, Rng};
1729
1730 use arrow_array::builder::*;
1731 use arrow_array::types::*;
1732 use arrow_array::*;
1733 use arrow_buffer::{i256, NullBuffer};
1734 use arrow_buffer::{Buffer, OffsetBuffer};
1735 use arrow_cast::display::{ArrayFormatter, FormatOptions};
1736 use arrow_ord::sort::{LexicographicalComparator, SortColumn};
1737
1738 use super::*;
1739
1740 #[test]
1741 fn test_fixed_width() {
1742 let cols = [
1743 Arc::new(Int16Array::from_iter([
1744 Some(1),
1745 Some(2),
1746 None,
1747 Some(-5),
1748 Some(2),
1749 Some(2),
1750 Some(0),
1751 ])) as ArrayRef,
1752 Arc::new(Float32Array::from_iter([
1753 Some(1.3),
1754 Some(2.5),
1755 None,
1756 Some(4.),
1757 Some(0.1),
1758 Some(-4.),
1759 Some(-0.),
1760 ])) as ArrayRef,
1761 ];
1762
1763 let converter = RowConverter::new(vec![
1764 SortField::new(DataType::Int16),
1765 SortField::new(DataType::Float32),
1766 ])
1767 .unwrap();
1768 let rows = converter.convert_columns(&cols).unwrap();
1769
1770 assert_eq!(rows.offsets, &[0, 8, 16, 24, 32, 40, 48, 56]);
1771 assert_eq!(
1772 rows.buffer,
1773 &[
1774 1, 128, 1, 1, 191, 166, 102, 102, 1, 128, 2, 1, 192, 32, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 127, 251, 1, 192, 128, 0, 0, 1, 128, 2, 1, 189, 204, 204, 205, 1, 128, 2, 1, 63, 127, 255, 255, 1, 128, 0, 1, 127, 255, 255, 255 ]
1789 );
1790
1791 assert!(rows.row(3) < rows.row(6));
1792 assert!(rows.row(0) < rows.row(1));
1793 assert!(rows.row(3) < rows.row(0));
1794 assert!(rows.row(4) < rows.row(1));
1795 assert!(rows.row(5) < rows.row(4));
1796
1797 let back = converter.convert_rows(&rows).unwrap();
1798 for (expected, actual) in cols.iter().zip(&back) {
1799 assert_eq!(expected, actual);
1800 }
1801 }
1802
1803 #[test]
1804 fn test_decimal128() {
1805 let converter = RowConverter::new(vec![SortField::new(DataType::Decimal128(
1806 DECIMAL128_MAX_PRECISION,
1807 7,
1808 ))])
1809 .unwrap();
1810 let col = Arc::new(
1811 Decimal128Array::from_iter([
1812 None,
1813 Some(i128::MIN),
1814 Some(-13),
1815 Some(46_i128),
1816 Some(5456_i128),
1817 Some(i128::MAX),
1818 ])
1819 .with_precision_and_scale(38, 7)
1820 .unwrap(),
1821 ) as ArrayRef;
1822
1823 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1824 for i in 0..rows.num_rows() - 1 {
1825 assert!(rows.row(i) < rows.row(i + 1));
1826 }
1827
1828 let back = converter.convert_rows(&rows).unwrap();
1829 assert_eq!(back.len(), 1);
1830 assert_eq!(col.as_ref(), back[0].as_ref())
1831 }
1832
1833 #[test]
1834 fn test_decimal256() {
1835 let converter = RowConverter::new(vec![SortField::new(DataType::Decimal256(
1836 DECIMAL256_MAX_PRECISION,
1837 7,
1838 ))])
1839 .unwrap();
1840 let col = Arc::new(
1841 Decimal256Array::from_iter([
1842 None,
1843 Some(i256::MIN),
1844 Some(i256::from_parts(0, -1)),
1845 Some(i256::from_parts(u128::MAX, -1)),
1846 Some(i256::from_parts(u128::MAX, 0)),
1847 Some(i256::from_parts(0, 46_i128)),
1848 Some(i256::from_parts(5, 46_i128)),
1849 Some(i256::MAX),
1850 ])
1851 .with_precision_and_scale(DECIMAL256_MAX_PRECISION, 7)
1852 .unwrap(),
1853 ) as ArrayRef;
1854
1855 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1856 for i in 0..rows.num_rows() - 1 {
1857 assert!(rows.row(i) < rows.row(i + 1));
1858 }
1859
1860 let back = converter.convert_rows(&rows).unwrap();
1861 assert_eq!(back.len(), 1);
1862 assert_eq!(col.as_ref(), back[0].as_ref())
1863 }
1864
1865 #[test]
1866 fn test_bool() {
1867 let converter = RowConverter::new(vec![SortField::new(DataType::Boolean)]).unwrap();
1868
1869 let col = Arc::new(BooleanArray::from_iter([None, Some(false), Some(true)])) as ArrayRef;
1870
1871 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1872 assert!(rows.row(2) > rows.row(1));
1873 assert!(rows.row(2) > rows.row(0));
1874 assert!(rows.row(1) > rows.row(0));
1875
1876 let cols = converter.convert_rows(&rows).unwrap();
1877 assert_eq!(&cols[0], &col);
1878
1879 let converter = RowConverter::new(vec![SortField::new_with_options(
1880 DataType::Boolean,
1881 SortOptions::default().desc().with_nulls_first(false),
1882 )])
1883 .unwrap();
1884
1885 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1886 assert!(rows.row(2) < rows.row(1));
1887 assert!(rows.row(2) < rows.row(0));
1888 assert!(rows.row(1) < rows.row(0));
1889 let cols = converter.convert_rows(&rows).unwrap();
1890 assert_eq!(&cols[0], &col);
1891 }
1892
1893 #[test]
1894 fn test_timezone() {
1895 let a =
1896 TimestampNanosecondArray::from(vec![1, 2, 3, 4, 5]).with_timezone("+01:00".to_string());
1897 let d = a.data_type().clone();
1898
1899 let converter = RowConverter::new(vec![SortField::new(a.data_type().clone())]).unwrap();
1900 let rows = converter.convert_columns(&[Arc::new(a) as _]).unwrap();
1901 let back = converter.convert_rows(&rows).unwrap();
1902 assert_eq!(back.len(), 1);
1903 assert_eq!(back[0].data_type(), &d);
1904
1905 let mut a = PrimitiveDictionaryBuilder::<Int32Type, TimestampNanosecondType>::new();
1907 a.append(34).unwrap();
1908 a.append_null();
1909 a.append(345).unwrap();
1910
1911 let dict = a.finish();
1913 let values = TimestampNanosecondArray::from(dict.values().to_data());
1914 let dict_with_tz = dict.with_values(Arc::new(values.with_timezone("+02:00")));
1915 let v = DataType::Timestamp(TimeUnit::Nanosecond, Some("+02:00".into()));
1916 let d = DataType::Dictionary(Box::new(DataType::Int32), Box::new(v.clone()));
1917
1918 assert_eq!(dict_with_tz.data_type(), &d);
1919 let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
1920 let rows = converter
1921 .convert_columns(&[Arc::new(dict_with_tz) as _])
1922 .unwrap();
1923 let back = converter.convert_rows(&rows).unwrap();
1924 assert_eq!(back.len(), 1);
1925 assert_eq!(back[0].data_type(), &v);
1926 }
1927
1928 #[test]
1929 fn test_null_encoding() {
1930 let col = Arc::new(NullArray::new(10));
1931 let converter = RowConverter::new(vec![SortField::new(DataType::Null)]).unwrap();
1932 let rows = converter.convert_columns(&[col]).unwrap();
1933 assert_eq!(rows.num_rows(), 10);
1934 assert_eq!(rows.row(1).data.len(), 0);
1935 }
1936
1937 #[test]
1938 fn test_variable_width() {
1939 let col = Arc::new(StringArray::from_iter([
1940 Some("hello"),
1941 Some("he"),
1942 None,
1943 Some("foo"),
1944 Some(""),
1945 ])) as ArrayRef;
1946
1947 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
1948 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1949
1950 assert!(rows.row(1) < rows.row(0));
1951 assert!(rows.row(2) < rows.row(4));
1952 assert!(rows.row(3) < rows.row(0));
1953 assert!(rows.row(3) < rows.row(1));
1954
1955 let cols = converter.convert_rows(&rows).unwrap();
1956 assert_eq!(&cols[0], &col);
1957
1958 let col = Arc::new(BinaryArray::from_iter([
1959 None,
1960 Some(vec![0_u8; 0]),
1961 Some(vec![0_u8; 6]),
1962 Some(vec![0_u8; variable::MINI_BLOCK_SIZE]),
1963 Some(vec![0_u8; variable::MINI_BLOCK_SIZE + 1]),
1964 Some(vec![0_u8; variable::BLOCK_SIZE]),
1965 Some(vec![0_u8; variable::BLOCK_SIZE + 1]),
1966 Some(vec![1_u8; 6]),
1967 Some(vec![1_u8; variable::MINI_BLOCK_SIZE]),
1968 Some(vec![1_u8; variable::MINI_BLOCK_SIZE + 1]),
1969 Some(vec![1_u8; variable::BLOCK_SIZE]),
1970 Some(vec![1_u8; variable::BLOCK_SIZE + 1]),
1971 Some(vec![0xFF_u8; 6]),
1972 Some(vec![0xFF_u8; variable::MINI_BLOCK_SIZE]),
1973 Some(vec![0xFF_u8; variable::MINI_BLOCK_SIZE + 1]),
1974 Some(vec![0xFF_u8; variable::BLOCK_SIZE]),
1975 Some(vec![0xFF_u8; variable::BLOCK_SIZE + 1]),
1976 ])) as ArrayRef;
1977
1978 let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
1979 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1980
1981 for i in 0..rows.num_rows() {
1982 for j in i + 1..rows.num_rows() {
1983 assert!(
1984 rows.row(i) < rows.row(j),
1985 "{} < {} - {:?} < {:?}",
1986 i,
1987 j,
1988 rows.row(i),
1989 rows.row(j)
1990 );
1991 }
1992 }
1993
1994 let cols = converter.convert_rows(&rows).unwrap();
1995 assert_eq!(&cols[0], &col);
1996
1997 let converter = RowConverter::new(vec![SortField::new_with_options(
1998 DataType::Binary,
1999 SortOptions::default().desc().with_nulls_first(false),
2000 )])
2001 .unwrap();
2002 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2003
2004 for i in 0..rows.num_rows() {
2005 for j in i + 1..rows.num_rows() {
2006 assert!(
2007 rows.row(i) > rows.row(j),
2008 "{} > {} - {:?} > {:?}",
2009 i,
2010 j,
2011 rows.row(i),
2012 rows.row(j)
2013 );
2014 }
2015 }
2016
2017 let cols = converter.convert_rows(&rows).unwrap();
2018 assert_eq!(&cols[0], &col);
2019 }
2020
2021 fn dictionary_eq(a: &dyn Array, b: &dyn Array) {
2023 match b.data_type() {
2024 DataType::Dictionary(_, v) => {
2025 assert_eq!(a.data_type(), v.as_ref());
2026 let b = arrow_cast::cast(b, v).unwrap();
2027 assert_eq!(a, b.as_ref())
2028 }
2029 _ => assert_eq!(a, b),
2030 }
2031 }
2032
2033 #[test]
2034 fn test_string_dictionary() {
2035 let a = Arc::new(DictionaryArray::<Int32Type>::from_iter([
2036 Some("foo"),
2037 Some("hello"),
2038 Some("he"),
2039 None,
2040 Some("hello"),
2041 Some(""),
2042 Some("hello"),
2043 Some("hello"),
2044 ])) as ArrayRef;
2045
2046 let field = SortField::new(a.data_type().clone());
2047 let converter = RowConverter::new(vec![field]).unwrap();
2048 let rows_a = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
2049
2050 assert!(rows_a.row(3) < rows_a.row(5));
2051 assert!(rows_a.row(2) < rows_a.row(1));
2052 assert!(rows_a.row(0) < rows_a.row(1));
2053 assert!(rows_a.row(3) < rows_a.row(0));
2054
2055 assert_eq!(rows_a.row(1), rows_a.row(4));
2056 assert_eq!(rows_a.row(1), rows_a.row(6));
2057 assert_eq!(rows_a.row(1), rows_a.row(7));
2058
2059 let cols = converter.convert_rows(&rows_a).unwrap();
2060 dictionary_eq(&cols[0], &a);
2061
2062 let b = Arc::new(DictionaryArray::<Int32Type>::from_iter([
2063 Some("hello"),
2064 None,
2065 Some("cupcakes"),
2066 ])) as ArrayRef;
2067
2068 let rows_b = converter.convert_columns(&[Arc::clone(&b)]).unwrap();
2069 assert_eq!(rows_a.row(1), rows_b.row(0));
2070 assert_eq!(rows_a.row(3), rows_b.row(1));
2071 assert!(rows_b.row(2) < rows_a.row(0));
2072
2073 let cols = converter.convert_rows(&rows_b).unwrap();
2074 dictionary_eq(&cols[0], &b);
2075
2076 let converter = RowConverter::new(vec![SortField::new_with_options(
2077 a.data_type().clone(),
2078 SortOptions::default().desc().with_nulls_first(false),
2079 )])
2080 .unwrap();
2081
2082 let rows_c = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
2083 assert!(rows_c.row(3) > rows_c.row(5));
2084 assert!(rows_c.row(2) > rows_c.row(1));
2085 assert!(rows_c.row(0) > rows_c.row(1));
2086 assert!(rows_c.row(3) > rows_c.row(0));
2087
2088 let cols = converter.convert_rows(&rows_c).unwrap();
2089 dictionary_eq(&cols[0], &a);
2090
2091 let converter = RowConverter::new(vec![SortField::new_with_options(
2092 a.data_type().clone(),
2093 SortOptions::default().desc().with_nulls_first(true),
2094 )])
2095 .unwrap();
2096
2097 let rows_c = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
2098 assert!(rows_c.row(3) < rows_c.row(5));
2099 assert!(rows_c.row(2) > rows_c.row(1));
2100 assert!(rows_c.row(0) > rows_c.row(1));
2101 assert!(rows_c.row(3) < rows_c.row(0));
2102
2103 let cols = converter.convert_rows(&rows_c).unwrap();
2104 dictionary_eq(&cols[0], &a);
2105 }
2106
2107 #[test]
2108 fn test_struct() {
2109 let a = Arc::new(Int32Array::from(vec![1, 1, 2, 2])) as ArrayRef;
2111 let a_f = Arc::new(Field::new("int", DataType::Int32, false));
2112 let u = Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as ArrayRef;
2113 let u_f = Arc::new(Field::new("s", DataType::Utf8, false));
2114 let s1 = Arc::new(StructArray::from(vec![(a_f, a), (u_f, u)])) as ArrayRef;
2115
2116 let sort_fields = vec![SortField::new(s1.data_type().clone())];
2117 let converter = RowConverter::new(sort_fields).unwrap();
2118 let r1 = converter.convert_columns(&[Arc::clone(&s1)]).unwrap();
2119
2120 for (a, b) in r1.iter().zip(r1.iter().skip(1)) {
2121 assert!(a < b);
2122 }
2123
2124 let back = converter.convert_rows(&r1).unwrap();
2125 assert_eq!(back.len(), 1);
2126 assert_eq!(&back[0], &s1);
2127
2128 let data = s1
2130 .to_data()
2131 .into_builder()
2132 .null_bit_buffer(Some(Buffer::from_slice_ref([0b00001010])))
2133 .null_count(2)
2134 .build()
2135 .unwrap();
2136
2137 let s2 = Arc::new(StructArray::from(data)) as ArrayRef;
2138 let r2 = converter.convert_columns(&[Arc::clone(&s2)]).unwrap();
2139 assert_eq!(r2.row(0), r2.row(2)); assert!(r2.row(0) < r2.row(1)); assert_ne!(r1.row(0), r2.row(0)); assert_eq!(r1.row(1), r2.row(1)); let back = converter.convert_rows(&r2).unwrap();
2145 assert_eq!(back.len(), 1);
2146 assert_eq!(&back[0], &s2);
2147
2148 back[0].to_data().validate_full().unwrap();
2149 }
2150
2151 #[test]
2152 fn test_primitive_dictionary() {
2153 let mut builder = PrimitiveDictionaryBuilder::<Int32Type, Int32Type>::new();
2154 builder.append(2).unwrap();
2155 builder.append(3).unwrap();
2156 builder.append(0).unwrap();
2157 builder.append_null();
2158 builder.append(5).unwrap();
2159 builder.append(3).unwrap();
2160 builder.append(-1).unwrap();
2161
2162 let a = builder.finish();
2163 let data_type = a.data_type().clone();
2164 let columns = [Arc::new(a) as ArrayRef];
2165
2166 let field = SortField::new(data_type.clone());
2167 let converter = RowConverter::new(vec![field]).unwrap();
2168 let rows = converter.convert_columns(&columns).unwrap();
2169 assert!(rows.row(0) < rows.row(1));
2170 assert!(rows.row(2) < rows.row(0));
2171 assert!(rows.row(3) < rows.row(2));
2172 assert!(rows.row(6) < rows.row(2));
2173 assert!(rows.row(3) < rows.row(6));
2174 }
2175
2176 #[test]
2177 fn test_dictionary_nulls() {
2178 let values = Int32Array::from_iter([Some(1), Some(-1), None, Some(4), None]).into_data();
2179 let keys =
2180 Int32Array::from_iter([Some(0), Some(0), Some(1), Some(2), Some(4), None]).into_data();
2181
2182 let data_type = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Int32));
2183 let data = keys
2184 .into_builder()
2185 .data_type(data_type.clone())
2186 .child_data(vec![values])
2187 .build()
2188 .unwrap();
2189
2190 let columns = [Arc::new(DictionaryArray::<Int32Type>::from(data)) as ArrayRef];
2191 let field = SortField::new(data_type.clone());
2192 let converter = RowConverter::new(vec![field]).unwrap();
2193 let rows = converter.convert_columns(&columns).unwrap();
2194
2195 assert_eq!(rows.row(0), rows.row(1));
2196 assert_eq!(rows.row(3), rows.row(4));
2197 assert_eq!(rows.row(4), rows.row(5));
2198 assert!(rows.row(3) < rows.row(0));
2199 }
2200
2201 #[test]
2202 #[should_panic(expected = "Encountered non UTF-8 data")]
2203 fn test_invalid_utf8() {
2204 let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
2205 let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _;
2206 let rows = converter.convert_columns(&[array]).unwrap();
2207 let binary_row = rows.row(0);
2208
2209 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2210 let parser = converter.parser();
2211 let utf8_row = parser.parse(binary_row.as_ref());
2212
2213 converter.convert_rows(std::iter::once(utf8_row)).unwrap();
2214 }
2215
2216 #[test]
2217 #[should_panic(expected = "Encountered non UTF-8 data")]
2218 fn test_invalid_utf8_array() {
2219 let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
2220 let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _;
2221 let rows = converter.convert_columns(&[array]).unwrap();
2222 let binary_rows = rows.try_into_binary().expect("known-small rows");
2223
2224 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2225 let parsed = converter.from_binary(binary_rows);
2226
2227 converter.convert_rows(parsed.iter()).unwrap();
2228 }
2229
2230 #[test]
2231 #[should_panic(expected = "index out of bounds")]
2232 fn test_invalid_empty() {
2233 let binary_row: &[u8] = &[];
2234
2235 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2236 let parser = converter.parser();
2237 let utf8_row = parser.parse(binary_row.as_ref());
2238
2239 converter.convert_rows(std::iter::once(utf8_row)).unwrap();
2240 }
2241
2242 #[test]
2243 #[should_panic(expected = "index out of bounds")]
2244 fn test_invalid_empty_array() {
2245 let row: &[u8] = &[];
2246 let binary_rows = BinaryArray::from(vec![row]);
2247
2248 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2249 let parsed = converter.from_binary(binary_rows);
2250
2251 converter.convert_rows(parsed.iter()).unwrap();
2252 }
2253
2254 #[test]
2255 #[should_panic(expected = "index out of bounds")]
2256 fn test_invalid_truncated() {
2257 let binary_row: &[u8] = &[0x02];
2258
2259 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2260 let parser = converter.parser();
2261 let utf8_row = parser.parse(binary_row.as_ref());
2262
2263 converter.convert_rows(std::iter::once(utf8_row)).unwrap();
2264 }
2265
2266 #[test]
2267 #[should_panic(expected = "index out of bounds")]
2268 fn test_invalid_truncated_array() {
2269 let row: &[u8] = &[0x02];
2270 let binary_rows = BinaryArray::from(vec![row]);
2271
2272 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2273 let parsed = converter.from_binary(binary_rows);
2274
2275 converter.convert_rows(parsed.iter()).unwrap();
2276 }
2277
2278 #[test]
2279 #[should_panic(expected = "rows were not produced by this RowConverter")]
2280 fn test_different_converter() {
2281 let values = Arc::new(Int32Array::from_iter([Some(1), Some(-1)]));
2282 let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
2283 let rows = converter.convert_columns(&[values]).unwrap();
2284
2285 let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
2286 let _ = converter.convert_rows(&rows);
2287 }
2288
2289 fn test_single_list<O: OffsetSizeTrait>() {
2290 let mut builder = GenericListBuilder::<O, _>::new(Int32Builder::new());
2291 builder.values().append_value(32);
2292 builder.values().append_value(52);
2293 builder.values().append_value(32);
2294 builder.append(true);
2295 builder.values().append_value(32);
2296 builder.values().append_value(52);
2297 builder.values().append_value(12);
2298 builder.append(true);
2299 builder.values().append_value(32);
2300 builder.values().append_value(52);
2301 builder.append(true);
2302 builder.values().append_value(32); builder.values().append_value(52); builder.append(false);
2305 builder.values().append_value(32);
2306 builder.values().append_null();
2307 builder.append(true);
2308 builder.append(true);
2309 builder.values().append_value(17); builder.values().append_null(); builder.append(false);
2312
2313 let list = Arc::new(builder.finish()) as ArrayRef;
2314 let d = list.data_type().clone();
2315
2316 let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
2317
2318 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2319 assert!(rows.row(0) > rows.row(1)); assert!(rows.row(2) < rows.row(1)); assert!(rows.row(3) < rows.row(2)); assert!(rows.row(4) < rows.row(2)); assert!(rows.row(5) < rows.row(2)); assert!(rows.row(3) < rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
2328 assert_eq!(back.len(), 1);
2329 back[0].to_data().validate_full().unwrap();
2330 assert_eq!(&back[0], &list);
2331
2332 let options = SortOptions::default().asc().with_nulls_first(false);
2333 let field = SortField::new_with_options(d.clone(), options);
2334 let converter = RowConverter::new(vec![field]).unwrap();
2335 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2336
2337 assert!(rows.row(0) > rows.row(1)); assert!(rows.row(2) < rows.row(1)); assert!(rows.row(3) > rows.row(2)); assert!(rows.row(4) > rows.row(2)); assert!(rows.row(5) < rows.row(2)); assert!(rows.row(3) > rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
2346 assert_eq!(back.len(), 1);
2347 back[0].to_data().validate_full().unwrap();
2348 assert_eq!(&back[0], &list);
2349
2350 let options = SortOptions::default().desc().with_nulls_first(false);
2351 let field = SortField::new_with_options(d.clone(), options);
2352 let converter = RowConverter::new(vec![field]).unwrap();
2353 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2354
2355 assert!(rows.row(0) < rows.row(1)); assert!(rows.row(2) > rows.row(1)); assert!(rows.row(3) > rows.row(2)); assert!(rows.row(4) > rows.row(2)); assert!(rows.row(5) > rows.row(2)); assert!(rows.row(3) > rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
2364 assert_eq!(back.len(), 1);
2365 back[0].to_data().validate_full().unwrap();
2366 assert_eq!(&back[0], &list);
2367
2368 let options = SortOptions::default().desc().with_nulls_first(true);
2369 let field = SortField::new_with_options(d, options);
2370 let converter = RowConverter::new(vec![field]).unwrap();
2371 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2372
2373 assert!(rows.row(0) < rows.row(1)); assert!(rows.row(2) > rows.row(1)); assert!(rows.row(3) < rows.row(2)); assert!(rows.row(4) < rows.row(2)); assert!(rows.row(5) > rows.row(2)); assert!(rows.row(3) < rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
2382 assert_eq!(back.len(), 1);
2383 back[0].to_data().validate_full().unwrap();
2384 assert_eq!(&back[0], &list);
2385
2386 let sliced_list = list.slice(1, 5);
2387 let rows_on_sliced_list = converter
2388 .convert_columns(&[Arc::clone(&sliced_list)])
2389 .unwrap();
2390
2391 assert!(rows_on_sliced_list.row(1) > rows_on_sliced_list.row(0)); assert!(rows_on_sliced_list.row(2) < rows_on_sliced_list.row(1)); assert!(rows_on_sliced_list.row(3) < rows_on_sliced_list.row(1)); assert!(rows_on_sliced_list.row(4) > rows_on_sliced_list.row(1)); assert!(rows_on_sliced_list.row(2) < rows_on_sliced_list.row(4)); let back = converter.convert_rows(&rows_on_sliced_list).unwrap();
2398 assert_eq!(back.len(), 1);
2399 back[0].to_data().validate_full().unwrap();
2400 assert_eq!(&back[0], &sliced_list);
2401 }
2402
2403 fn test_nested_list<O: OffsetSizeTrait>() {
2404 let mut builder =
2405 GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(Int32Builder::new()));
2406
2407 builder.values().values().append_value(1);
2408 builder.values().values().append_value(2);
2409 builder.values().append(true);
2410 builder.values().values().append_value(1);
2411 builder.values().values().append_null();
2412 builder.values().append(true);
2413 builder.append(true);
2414
2415 builder.values().values().append_value(1);
2416 builder.values().values().append_null();
2417 builder.values().append(true);
2418 builder.values().values().append_value(1);
2419 builder.values().values().append_null();
2420 builder.values().append(true);
2421 builder.append(true);
2422
2423 builder.values().values().append_value(1);
2424 builder.values().values().append_null();
2425 builder.values().append(true);
2426 builder.values().append(false);
2427 builder.append(true);
2428 builder.append(false);
2429
2430 builder.values().values().append_value(1);
2431 builder.values().values().append_value(2);
2432 builder.values().append(true);
2433 builder.append(true);
2434
2435 let list = Arc::new(builder.finish()) as ArrayRef;
2436 let d = list.data_type().clone();
2437
2438 let options = SortOptions::default().asc().with_nulls_first(true);
2446 let field = SortField::new_with_options(d.clone(), options);
2447 let converter = RowConverter::new(vec![field]).unwrap();
2448 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2449
2450 assert!(rows.row(0) > rows.row(1));
2451 assert!(rows.row(1) > rows.row(2));
2452 assert!(rows.row(2) > rows.row(3));
2453 assert!(rows.row(4) < rows.row(0));
2454 assert!(rows.row(4) > rows.row(1));
2455
2456 let back = converter.convert_rows(&rows).unwrap();
2457 assert_eq!(back.len(), 1);
2458 back[0].to_data().validate_full().unwrap();
2459 assert_eq!(&back[0], &list);
2460
2461 let options = SortOptions::default().desc().with_nulls_first(true);
2462 let field = SortField::new_with_options(d.clone(), options);
2463 let converter = RowConverter::new(vec![field]).unwrap();
2464 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2465
2466 assert!(rows.row(0) > rows.row(1));
2467 assert!(rows.row(1) > rows.row(2));
2468 assert!(rows.row(2) > rows.row(3));
2469 assert!(rows.row(4) > rows.row(0));
2470 assert!(rows.row(4) > rows.row(1));
2471
2472 let back = converter.convert_rows(&rows).unwrap();
2473 assert_eq!(back.len(), 1);
2474 back[0].to_data().validate_full().unwrap();
2475 assert_eq!(&back[0], &list);
2476
2477 let options = SortOptions::default().desc().with_nulls_first(false);
2478 let field = SortField::new_with_options(d, options);
2479 let converter = RowConverter::new(vec![field]).unwrap();
2480 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2481
2482 assert!(rows.row(0) < rows.row(1));
2483 assert!(rows.row(1) < rows.row(2));
2484 assert!(rows.row(2) < rows.row(3));
2485 assert!(rows.row(4) > rows.row(0));
2486 assert!(rows.row(4) < rows.row(1));
2487
2488 let back = converter.convert_rows(&rows).unwrap();
2489 assert_eq!(back.len(), 1);
2490 back[0].to_data().validate_full().unwrap();
2491 assert_eq!(&back[0], &list);
2492
2493 let sliced_list = list.slice(1, 3);
2494 let rows = converter
2495 .convert_columns(&[Arc::clone(&sliced_list)])
2496 .unwrap();
2497
2498 assert!(rows.row(0) < rows.row(1));
2499 assert!(rows.row(1) < rows.row(2));
2500
2501 let back = converter.convert_rows(&rows).unwrap();
2502 assert_eq!(back.len(), 1);
2503 back[0].to_data().validate_full().unwrap();
2504 assert_eq!(&back[0], &sliced_list);
2505 }
2506
2507 #[test]
2508 fn test_list() {
2509 test_single_list::<i32>();
2510 test_nested_list::<i32>();
2511 }
2512
2513 #[test]
2514 fn test_large_list() {
2515 test_single_list::<i64>();
2516 test_nested_list::<i64>();
2517 }
2518
2519 #[test]
2520 fn test_fixed_size_list() {
2521 let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 3);
2522 builder.values().append_value(32);
2523 builder.values().append_value(52);
2524 builder.values().append_value(32);
2525 builder.append(true);
2526 builder.values().append_value(32);
2527 builder.values().append_value(52);
2528 builder.values().append_value(12);
2529 builder.append(true);
2530 builder.values().append_value(32);
2531 builder.values().append_value(52);
2532 builder.values().append_null();
2533 builder.append(true);
2534 builder.values().append_value(32); builder.values().append_value(52); builder.values().append_value(13); builder.append(false);
2538 builder.values().append_value(32);
2539 builder.values().append_null();
2540 builder.values().append_null();
2541 builder.append(true);
2542 builder.values().append_null();
2543 builder.values().append_null();
2544 builder.values().append_null();
2545 builder.append(true);
2546 builder.values().append_value(17); builder.values().append_null(); builder.values().append_value(77); builder.append(false);
2550
2551 let list = Arc::new(builder.finish()) as ArrayRef;
2552 let d = list.data_type().clone();
2553
2554 let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
2556
2557 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2558 assert!(rows.row(0) > rows.row(1)); assert!(rows.row(2) < rows.row(1)); assert!(rows.row(3) < rows.row(2)); assert!(rows.row(4) < rows.row(2)); assert!(rows.row(5) < rows.row(2)); assert!(rows.row(3) < rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
2567 assert_eq!(back.len(), 1);
2568 back[0].to_data().validate_full().unwrap();
2569 assert_eq!(&back[0], &list);
2570
2571 let options = SortOptions::default().asc().with_nulls_first(false);
2573 let field = SortField::new_with_options(d.clone(), options);
2574 let converter = RowConverter::new(vec![field]).unwrap();
2575 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2576 assert!(rows.row(0) > rows.row(1)); assert!(rows.row(2) > rows.row(1)); assert!(rows.row(3) > rows.row(2)); assert!(rows.row(4) > rows.row(2)); assert!(rows.row(5) > rows.row(2)); assert!(rows.row(3) > rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
2585 assert_eq!(back.len(), 1);
2586 back[0].to_data().validate_full().unwrap();
2587 assert_eq!(&back[0], &list);
2588
2589 let options = SortOptions::default().desc().with_nulls_first(false);
2591 let field = SortField::new_with_options(d.clone(), options);
2592 let converter = RowConverter::new(vec![field]).unwrap();
2593 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2594 assert!(rows.row(0) < rows.row(1)); assert!(rows.row(2) > rows.row(1)); assert!(rows.row(3) > rows.row(2)); assert!(rows.row(4) > rows.row(2)); assert!(rows.row(5) > rows.row(2)); assert!(rows.row(3) > rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
2603 assert_eq!(back.len(), 1);
2604 back[0].to_data().validate_full().unwrap();
2605 assert_eq!(&back[0], &list);
2606
2607 let options = SortOptions::default().desc().with_nulls_first(true);
2609 let field = SortField::new_with_options(d, options);
2610 let converter = RowConverter::new(vec![field]).unwrap();
2611 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2612
2613 assert!(rows.row(0) < rows.row(1)); assert!(rows.row(2) < rows.row(1)); assert!(rows.row(3) < rows.row(2)); assert!(rows.row(4) < rows.row(2)); assert!(rows.row(5) < rows.row(2)); assert!(rows.row(3) < rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
2622 assert_eq!(back.len(), 1);
2623 back[0].to_data().validate_full().unwrap();
2624 assert_eq!(&back[0], &list);
2625
2626 let sliced_list = list.slice(1, 5);
2627 let rows_on_sliced_list = converter
2628 .convert_columns(&[Arc::clone(&sliced_list)])
2629 .unwrap();
2630
2631 assert!(rows_on_sliced_list.row(2) < rows_on_sliced_list.row(1)); assert!(rows_on_sliced_list.row(3) < rows_on_sliced_list.row(1)); assert!(rows_on_sliced_list.row(4) < rows_on_sliced_list.row(1)); assert!(rows_on_sliced_list.row(2) < rows_on_sliced_list.row(4)); let back = converter.convert_rows(&rows_on_sliced_list).unwrap();
2637 assert_eq!(back.len(), 1);
2638 back[0].to_data().validate_full().unwrap();
2639 assert_eq!(&back[0], &sliced_list);
2640 }
2641
2642 #[test]
2643 fn test_two_fixed_size_lists() {
2644 let mut first = FixedSizeListBuilder::new(UInt8Builder::new(), 1);
2645 first.values().append_value(100);
2647 first.append(true);
2648 first.values().append_value(101);
2650 first.append(true);
2651 first.values().append_value(102);
2653 first.append(true);
2654 first.values().append_null();
2656 first.append(true);
2657 first.values().append_null(); first.append(false);
2660 let first = Arc::new(first.finish()) as ArrayRef;
2661 let first_type = first.data_type().clone();
2662
2663 let mut second = FixedSizeListBuilder::new(UInt8Builder::new(), 1);
2664 second.values().append_value(200);
2666 second.append(true);
2667 second.values().append_value(201);
2669 second.append(true);
2670 second.values().append_value(202);
2672 second.append(true);
2673 second.values().append_null();
2675 second.append(true);
2676 second.values().append_null(); second.append(false);
2679 let second = Arc::new(second.finish()) as ArrayRef;
2680 let second_type = second.data_type().clone();
2681
2682 let converter = RowConverter::new(vec![
2683 SortField::new(first_type.clone()),
2684 SortField::new(second_type.clone()),
2685 ])
2686 .unwrap();
2687
2688 let rows = converter
2689 .convert_columns(&[Arc::clone(&first), Arc::clone(&second)])
2690 .unwrap();
2691
2692 let back = converter.convert_rows(&rows).unwrap();
2693 assert_eq!(back.len(), 2);
2694 back[0].to_data().validate_full().unwrap();
2695 assert_eq!(&back[0], &first);
2696 back[1].to_data().validate_full().unwrap();
2697 assert_eq!(&back[1], &second);
2698 }
2699
2700 #[test]
2701 fn test_fixed_size_list_with_variable_width_content() {
2702 let mut first = FixedSizeListBuilder::new(
2703 StructBuilder::from_fields(
2704 vec![
2705 Field::new(
2706 "timestamp",
2707 DataType::Timestamp(TimeUnit::Microsecond, Some(Arc::from("UTC"))),
2708 false,
2709 ),
2710 Field::new("offset_minutes", DataType::Int16, false),
2711 Field::new("time_zone", DataType::Utf8, false),
2712 ],
2713 1,
2714 ),
2715 1,
2716 );
2717 first
2719 .values()
2720 .field_builder::<TimestampMicrosecondBuilder>(0)
2721 .unwrap()
2722 .append_null();
2723 first
2724 .values()
2725 .field_builder::<Int16Builder>(1)
2726 .unwrap()
2727 .append_null();
2728 first
2729 .values()
2730 .field_builder::<StringBuilder>(2)
2731 .unwrap()
2732 .append_null();
2733 first.values().append(false);
2734 first.append(false);
2735 first
2737 .values()
2738 .field_builder::<TimestampMicrosecondBuilder>(0)
2739 .unwrap()
2740 .append_null();
2741 first
2742 .values()
2743 .field_builder::<Int16Builder>(1)
2744 .unwrap()
2745 .append_null();
2746 first
2747 .values()
2748 .field_builder::<StringBuilder>(2)
2749 .unwrap()
2750 .append_null();
2751 first.values().append(false);
2752 first.append(true);
2753 first
2755 .values()
2756 .field_builder::<TimestampMicrosecondBuilder>(0)
2757 .unwrap()
2758 .append_value(0);
2759 first
2760 .values()
2761 .field_builder::<Int16Builder>(1)
2762 .unwrap()
2763 .append_value(0);
2764 first
2765 .values()
2766 .field_builder::<StringBuilder>(2)
2767 .unwrap()
2768 .append_value("UTC");
2769 first.values().append(true);
2770 first.append(true);
2771 first
2773 .values()
2774 .field_builder::<TimestampMicrosecondBuilder>(0)
2775 .unwrap()
2776 .append_value(1126351800123456);
2777 first
2778 .values()
2779 .field_builder::<Int16Builder>(1)
2780 .unwrap()
2781 .append_value(120);
2782 first
2783 .values()
2784 .field_builder::<StringBuilder>(2)
2785 .unwrap()
2786 .append_value("Europe/Warsaw");
2787 first.values().append(true);
2788 first.append(true);
2789 let first = Arc::new(first.finish()) as ArrayRef;
2790 let first_type = first.data_type().clone();
2791
2792 let mut second = StringBuilder::new();
2793 second.append_value("somewhere near");
2794 second.append_null();
2795 second.append_value("Greenwich");
2796 second.append_value("Warsaw");
2797 let second = Arc::new(second.finish()) as ArrayRef;
2798 let second_type = second.data_type().clone();
2799
2800 let converter = RowConverter::new(vec![
2801 SortField::new(first_type.clone()),
2802 SortField::new(second_type.clone()),
2803 ])
2804 .unwrap();
2805
2806 let rows = converter
2807 .convert_columns(&[Arc::clone(&first), Arc::clone(&second)])
2808 .unwrap();
2809
2810 let back = converter.convert_rows(&rows).unwrap();
2811 assert_eq!(back.len(), 2);
2812 back[0].to_data().validate_full().unwrap();
2813 assert_eq!(&back[0], &first);
2814 back[1].to_data().validate_full().unwrap();
2815 assert_eq!(&back[1], &second);
2816 }
2817
2818 fn generate_primitive_array<K>(len: usize, valid_percent: f64) -> PrimitiveArray<K>
2819 where
2820 K: ArrowPrimitiveType,
2821 StandardUniform: Distribution<K::Native>,
2822 {
2823 let mut rng = rng();
2824 (0..len)
2825 .map(|_| rng.random_bool(valid_percent).then(|| rng.random()))
2826 .collect()
2827 }
2828
2829 fn generate_strings<O: OffsetSizeTrait>(
2830 len: usize,
2831 valid_percent: f64,
2832 ) -> GenericStringArray<O> {
2833 let mut rng = rng();
2834 (0..len)
2835 .map(|_| {
2836 rng.random_bool(valid_percent).then(|| {
2837 let len = rng.random_range(0..100);
2838 let bytes = (0..len).map(|_| rng.random_range(0..128)).collect();
2839 String::from_utf8(bytes).unwrap()
2840 })
2841 })
2842 .collect()
2843 }
2844
2845 fn generate_string_view(len: usize, valid_percent: f64) -> StringViewArray {
2846 let mut rng = rng();
2847 (0..len)
2848 .map(|_| {
2849 rng.random_bool(valid_percent).then(|| {
2850 let len = rng.random_range(0..100);
2851 let bytes = (0..len).map(|_| rng.random_range(0..128)).collect();
2852 String::from_utf8(bytes).unwrap()
2853 })
2854 })
2855 .collect()
2856 }
2857
2858 fn generate_byte_view(len: usize, valid_percent: f64) -> BinaryViewArray {
2859 let mut rng = rng();
2860 (0..len)
2861 .map(|_| {
2862 rng.random_bool(valid_percent).then(|| {
2863 let len = rng.random_range(0..100);
2864 let bytes: Vec<_> = (0..len).map(|_| rng.random_range(0..128)).collect();
2865 bytes
2866 })
2867 })
2868 .collect()
2869 }
2870
2871 fn generate_fixed_stringview_column(len: usize) -> StringViewArray {
2872 let edge_cases = vec![
2873 Some("bar".to_string()),
2874 Some("bar\0".to_string()),
2875 Some("LongerThan12Bytes".to_string()),
2876 Some("LongerThan12Bytez".to_string()),
2877 Some("LongerThan12Bytes\0".to_string()),
2878 Some("LongerThan12Byt".to_string()),
2879 Some("backend one".to_string()),
2880 Some("backend two".to_string()),
2881 Some("a".repeat(257)),
2882 Some("a".repeat(300)),
2883 ];
2884
2885 let mut values = Vec::with_capacity(len);
2887 for i in 0..len {
2888 values.push(
2889 edge_cases
2890 .get(i % edge_cases.len())
2891 .cloned()
2892 .unwrap_or(None),
2893 );
2894 }
2895
2896 StringViewArray::from(values)
2897 }
2898
2899 fn generate_dictionary<K>(
2900 values: ArrayRef,
2901 len: usize,
2902 valid_percent: f64,
2903 ) -> DictionaryArray<K>
2904 where
2905 K: ArrowDictionaryKeyType,
2906 K::Native: SampleUniform,
2907 {
2908 let mut rng = rng();
2909 let min_key = K::Native::from_usize(0).unwrap();
2910 let max_key = K::Native::from_usize(values.len()).unwrap();
2911 let keys: PrimitiveArray<K> = (0..len)
2912 .map(|_| {
2913 rng.random_bool(valid_percent)
2914 .then(|| rng.random_range(min_key..max_key))
2915 })
2916 .collect();
2917
2918 let data_type =
2919 DataType::Dictionary(Box::new(K::DATA_TYPE), Box::new(values.data_type().clone()));
2920
2921 let data = keys
2922 .into_data()
2923 .into_builder()
2924 .data_type(data_type)
2925 .add_child_data(values.to_data())
2926 .build()
2927 .unwrap();
2928
2929 DictionaryArray::from(data)
2930 }
2931
2932 fn generate_fixed_size_binary(len: usize, valid_percent: f64) -> FixedSizeBinaryArray {
2933 let mut rng = rng();
2934 let width = rng.random_range(0..20);
2935 let mut builder = FixedSizeBinaryBuilder::new(width);
2936
2937 let mut b = vec![0; width as usize];
2938 for _ in 0..len {
2939 match rng.random_bool(valid_percent) {
2940 true => {
2941 b.iter_mut().for_each(|x| *x = rng.random());
2942 builder.append_value(&b).unwrap();
2943 }
2944 false => builder.append_null(),
2945 }
2946 }
2947
2948 builder.finish()
2949 }
2950
2951 fn generate_struct(len: usize, valid_percent: f64) -> StructArray {
2952 let mut rng = rng();
2953 let nulls = NullBuffer::from_iter((0..len).map(|_| rng.random_bool(valid_percent)));
2954 let a = generate_primitive_array::<Int32Type>(len, valid_percent);
2955 let b = generate_strings::<i32>(len, valid_percent);
2956 let fields = Fields::from(vec![
2957 Field::new("a", DataType::Int32, true),
2958 Field::new("b", DataType::Utf8, true),
2959 ]);
2960 let values = vec![Arc::new(a) as _, Arc::new(b) as _];
2961 StructArray::new(fields, values, Some(nulls))
2962 }
2963
2964 fn generate_list<F>(len: usize, valid_percent: f64, values: F) -> ListArray
2965 where
2966 F: FnOnce(usize) -> ArrayRef,
2967 {
2968 let mut rng = rng();
2969 let offsets = OffsetBuffer::<i32>::from_lengths((0..len).map(|_| rng.random_range(0..10)));
2970 let values_len = offsets.last().unwrap().to_usize().unwrap();
2971 let values = values(values_len);
2972 let nulls = NullBuffer::from_iter((0..len).map(|_| rng.random_bool(valid_percent)));
2973 let field = Arc::new(Field::new_list_field(values.data_type().clone(), true));
2974 ListArray::new(field, offsets, values, Some(nulls))
2975 }
2976
2977 fn generate_column(len: usize) -> ArrayRef {
2978 let mut rng = rng();
2979 match rng.random_range(0..18) {
2980 0 => Arc::new(generate_primitive_array::<Int32Type>(len, 0.8)),
2981 1 => Arc::new(generate_primitive_array::<UInt32Type>(len, 0.8)),
2982 2 => Arc::new(generate_primitive_array::<Int64Type>(len, 0.8)),
2983 3 => Arc::new(generate_primitive_array::<UInt64Type>(len, 0.8)),
2984 4 => Arc::new(generate_primitive_array::<Float32Type>(len, 0.8)),
2985 5 => Arc::new(generate_primitive_array::<Float64Type>(len, 0.8)),
2986 6 => Arc::new(generate_strings::<i32>(len, 0.8)),
2987 7 => Arc::new(generate_dictionary::<Int64Type>(
2988 Arc::new(generate_strings::<i32>(rng.random_range(1..len), 1.0)),
2990 len,
2991 0.8,
2992 )),
2993 8 => Arc::new(generate_dictionary::<Int64Type>(
2994 Arc::new(generate_primitive_array::<Int64Type>(
2996 rng.random_range(1..len),
2997 1.0,
2998 )),
2999 len,
3000 0.8,
3001 )),
3002 9 => Arc::new(generate_fixed_size_binary(len, 0.8)),
3003 10 => Arc::new(generate_struct(len, 0.8)),
3004 11 => Arc::new(generate_list(len, 0.8, |values_len| {
3005 Arc::new(generate_primitive_array::<Int64Type>(values_len, 0.8))
3006 })),
3007 12 => Arc::new(generate_list(len, 0.8, |values_len| {
3008 Arc::new(generate_strings::<i32>(values_len, 0.8))
3009 })),
3010 13 => Arc::new(generate_list(len, 0.8, |values_len| {
3011 Arc::new(generate_struct(values_len, 0.8))
3012 })),
3013 14 => Arc::new(generate_string_view(len, 0.8)),
3014 15 => Arc::new(generate_byte_view(len, 0.8)),
3015 16 => Arc::new(generate_fixed_stringview_column(len)),
3016 17 => Arc::new(
3017 generate_list(len + 1000, 0.8, |values_len| {
3018 Arc::new(generate_primitive_array::<Int64Type>(values_len, 0.8))
3019 })
3020 .slice(500, len),
3021 ),
3022 _ => unreachable!(),
3023 }
3024 }
3025
3026 fn print_row(cols: &[SortColumn], row: usize) -> String {
3027 let t: Vec<_> = cols
3028 .iter()
3029 .map(|x| match x.values.is_valid(row) {
3030 true => {
3031 let opts = FormatOptions::default().with_null("NULL");
3032 let formatter = ArrayFormatter::try_new(x.values.as_ref(), &opts).unwrap();
3033 formatter.value(row).to_string()
3034 }
3035 false => "NULL".to_string(),
3036 })
3037 .collect();
3038 t.join(",")
3039 }
3040
3041 fn print_col_types(cols: &[SortColumn]) -> String {
3042 let t: Vec<_> = cols
3043 .iter()
3044 .map(|x| x.values.data_type().to_string())
3045 .collect();
3046 t.join(",")
3047 }
3048
3049 #[test]
3050 #[cfg_attr(miri, ignore)]
3051 fn fuzz_test() {
3052 for _ in 0..100 {
3053 let mut rng = rng();
3054 let num_columns = rng.random_range(1..5);
3055 let len = rng.random_range(5..100);
3056 let arrays: Vec<_> = (0..num_columns).map(|_| generate_column(len)).collect();
3057
3058 let options: Vec<_> = (0..num_columns)
3059 .map(|_| SortOptions {
3060 descending: rng.random_bool(0.5),
3061 nulls_first: rng.random_bool(0.5),
3062 })
3063 .collect();
3064
3065 let sort_columns: Vec<_> = options
3066 .iter()
3067 .zip(&arrays)
3068 .map(|(o, c)| SortColumn {
3069 values: Arc::clone(c),
3070 options: Some(*o),
3071 })
3072 .collect();
3073
3074 let comparator = LexicographicalComparator::try_new(&sort_columns).unwrap();
3075
3076 let columns: Vec<SortField> = options
3077 .into_iter()
3078 .zip(&arrays)
3079 .map(|(o, a)| SortField::new_with_options(a.data_type().clone(), o))
3080 .collect();
3081
3082 let converter = RowConverter::new(columns).unwrap();
3083 let rows = converter.convert_columns(&arrays).unwrap();
3084
3085 for i in 0..len {
3086 for j in 0..len {
3087 let row_i = rows.row(i);
3088 let row_j = rows.row(j);
3089 let row_cmp = row_i.cmp(&row_j);
3090 let lex_cmp = comparator.compare(i, j);
3091 assert_eq!(
3092 row_cmp,
3093 lex_cmp,
3094 "({:?} vs {:?}) vs ({:?} vs {:?}) for types {}",
3095 print_row(&sort_columns, i),
3096 print_row(&sort_columns, j),
3097 row_i,
3098 row_j,
3099 print_col_types(&sort_columns)
3100 );
3101 }
3102 }
3103
3104 let back = converter.convert_rows(&rows).unwrap();
3107 for (actual, expected) in back.iter().zip(&arrays) {
3108 actual.to_data().validate_full().unwrap();
3109 dictionary_eq(actual, expected)
3110 }
3111
3112 let rows = rows.try_into_binary().expect("reasonable size");
3115 let parser = converter.parser();
3116 let back = converter
3117 .convert_rows(rows.iter().map(|b| parser.parse(b.expect("valid bytes"))))
3118 .unwrap();
3119 for (actual, expected) in back.iter().zip(&arrays) {
3120 actual.to_data().validate_full().unwrap();
3121 dictionary_eq(actual, expected)
3122 }
3123
3124 let rows = converter.from_binary(rows);
3125 let back = converter.convert_rows(&rows).unwrap();
3126 for (actual, expected) in back.iter().zip(&arrays) {
3127 actual.to_data().validate_full().unwrap();
3128 dictionary_eq(actual, expected)
3129 }
3130 }
3131 }
3132
3133 #[test]
3134 fn test_clear() {
3135 let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
3136 let mut rows = converter.empty_rows(3, 128);
3137
3138 let first = Int32Array::from(vec![None, Some(2), Some(4)]);
3139 let second = Int32Array::from(vec![Some(2), None, Some(4)]);
3140 let arrays = [Arc::new(first) as ArrayRef, Arc::new(second) as ArrayRef];
3141
3142 for array in arrays.iter() {
3143 rows.clear();
3144 converter.append(&mut rows, &[array.clone()]).unwrap();
3145 let back = converter.convert_rows(&rows).unwrap();
3146 assert_eq!(&back[0], array);
3147 }
3148
3149 let mut rows_expected = converter.empty_rows(3, 128);
3150 converter.append(&mut rows_expected, &arrays[1..]).unwrap();
3151
3152 for (i, (actual, expected)) in rows.iter().zip(rows_expected.iter()).enumerate() {
3153 assert_eq!(
3154 actual, expected,
3155 "For row {i}: expected {expected:?}, actual: {actual:?}",
3156 );
3157 }
3158 }
3159
3160 #[test]
3161 fn test_append_codec_dictionary_binary() {
3162 use DataType::*;
3163 let converter = RowConverter::new(vec![SortField::new(Dictionary(
3165 Box::new(Int32),
3166 Box::new(Binary),
3167 ))])
3168 .unwrap();
3169 let mut rows = converter.empty_rows(4, 128);
3170
3171 let keys = Int32Array::from_iter_values([0, 1, 2, 3]);
3172 let values = BinaryArray::from(vec![
3173 Some("a".as_bytes()),
3174 Some(b"b"),
3175 Some(b"c"),
3176 Some(b"d"),
3177 ]);
3178 let dict_array = DictionaryArray::new(keys, Arc::new(values));
3179
3180 rows.clear();
3181 let array = Arc::new(dict_array) as ArrayRef;
3182 converter.append(&mut rows, &[array.clone()]).unwrap();
3183 let back = converter.convert_rows(&rows).unwrap();
3184
3185 dictionary_eq(&back[0], &array);
3186 }
3187
3188 #[test]
3189 fn test_list_prefix() {
3190 let mut a = ListBuilder::new(Int8Builder::new());
3191 a.append_value([None]);
3192 a.append_value([None, None]);
3193 let a = a.finish();
3194
3195 let converter = RowConverter::new(vec![SortField::new(a.data_type().clone())]).unwrap();
3196 let rows = converter.convert_columns(&[Arc::new(a) as _]).unwrap();
3197 assert_eq!(rows.row(0).cmp(&rows.row(1)), Ordering::Less);
3198 }
3199
3200 #[test]
3201 fn map_should_be_marked_as_unsupported() {
3202 let map_data_type = Field::new_map(
3203 "map",
3204 "entries",
3205 Field::new("key", DataType::Utf8, false),
3206 Field::new("value", DataType::Utf8, true),
3207 false,
3208 true,
3209 )
3210 .data_type()
3211 .clone();
3212
3213 let is_supported = RowConverter::supports_fields(&[SortField::new(map_data_type)]);
3214
3215 assert!(!is_supported, "Map should not be supported");
3216 }
3217
3218 #[test]
3219 fn should_fail_to_create_row_converter_for_unsupported_map_type() {
3220 let map_data_type = Field::new_map(
3221 "map",
3222 "entries",
3223 Field::new("key", DataType::Utf8, false),
3224 Field::new("value", DataType::Utf8, true),
3225 false,
3226 true,
3227 )
3228 .data_type()
3229 .clone();
3230
3231 let converter = RowConverter::new(vec![SortField::new(map_data_type)]);
3232
3233 match converter {
3234 Err(ArrowError::NotYetImplemented(message)) => {
3235 assert!(
3236 message.contains("Row format support not yet implemented for"),
3237 "Expected NotYetImplemented error for map data type, got: {message}",
3238 );
3239 }
3240 Err(e) => panic!("Expected NotYetImplemented error, got: {e}"),
3241 Ok(_) => panic!("Expected NotYetImplemented error for map data type"),
3242 }
3243 }
3244
3245 #[test]
3246 fn test_values_buffer_smaller_when_utf8_validation_disabled() {
3247 fn get_values_buffer_len(col: ArrayRef) -> (usize, usize) {
3248 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8View)]).unwrap();
3250
3251 let rows = converter.convert_columns(&[col]).unwrap();
3253 let converted = converter.convert_rows(&rows).unwrap();
3254 let unchecked_values_len = converted[0].as_string_view().data_buffers()[0].len();
3255
3256 let rows = rows.try_into_binary().expect("reasonable size");
3258 let parser = converter.parser();
3259 let converted = converter
3260 .convert_rows(rows.iter().map(|b| parser.parse(b.expect("valid bytes"))))
3261 .unwrap();
3262 let checked_values_len = converted[0].as_string_view().data_buffers()[0].len();
3263 (unchecked_values_len, checked_values_len)
3264 }
3265
3266 let col = Arc::new(StringViewArray::from_iter([
3268 Some("hello"), None, Some("short"), Some("tiny"), ])) as ArrayRef;
3273
3274 let (unchecked_values_len, checked_values_len) = get_values_buffer_len(col);
3275 assert_eq!(unchecked_values_len, 0);
3277 assert_eq!(checked_values_len, 14);
3279
3280 let col = Arc::new(StringViewArray::from_iter([
3282 Some("this is a very long string over 12 bytes"),
3283 Some("another long string to test the buffer"),
3284 ])) as ArrayRef;
3285
3286 let (unchecked_values_len, checked_values_len) = get_values_buffer_len(col);
3287 assert!(unchecked_values_len > 0);
3289 assert_eq!(unchecked_values_len, checked_values_len);
3290
3291 let col = Arc::new(StringViewArray::from_iter([
3293 Some("tiny"), Some("thisisexact13"), None,
3296 Some("short"), ])) as ArrayRef;
3298
3299 let (unchecked_values_len, checked_values_len) = get_values_buffer_len(col);
3300 assert_eq!(unchecked_values_len, 13);
3302 assert!(checked_values_len > unchecked_values_len);
3303 }
3304}