1#![doc(
157 html_logo_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_white-bg.svg",
158 html_favicon_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_transparent-bg.svg"
159)]
160#![cfg_attr(docsrs, feature(doc_cfg))]
161#![warn(missing_docs)]
162use std::cmp::Ordering;
163use std::hash::{Hash, Hasher};
164use std::sync::Arc;
165
166use arrow_array::cast::*;
167use arrow_array::types::ArrowDictionaryKeyType;
168use arrow_array::*;
169use arrow_buffer::{ArrowNativeType, Buffer, OffsetBuffer, ScalarBuffer};
170use arrow_data::{ArrayData, ArrayDataBuilder};
171use arrow_schema::*;
172use variable::{decode_binary_view, decode_string_view};
173
174use crate::fixed::{decode_bool, decode_fixed_size_binary, decode_primitive};
175use crate::list::{compute_lengths_fixed_size_list, encode_fixed_size_list};
176use crate::variable::{decode_binary, decode_string};
177use arrow_array::types::{Int16Type, Int32Type, Int64Type};
178
179mod fixed;
180mod list;
181mod run;
182mod variable;
183
184#[derive(Debug)]
441pub struct RowConverter {
442 fields: Arc<[SortField]>,
443 codecs: Vec<Codec>,
445}
446
447#[derive(Debug)]
448enum Codec {
449 Stateless,
451 Dictionary(RowConverter, OwnedRow),
454 Struct(RowConverter, OwnedRow),
457 List(RowConverter),
459 RunEndEncoded(RowConverter),
461}
462
463impl Codec {
464 fn new(sort_field: &SortField) -> Result<Self, ArrowError> {
465 match &sort_field.data_type {
466 DataType::Dictionary(_, values) => {
467 let sort_field =
468 SortField::new_with_options(values.as_ref().clone(), sort_field.options);
469
470 let converter = RowConverter::new(vec![sort_field])?;
471 let null_array = new_null_array(values.as_ref(), 1);
472 let nulls = converter.convert_columns(&[null_array])?;
473
474 let owned = OwnedRow {
475 data: nulls.buffer.into(),
476 config: nulls.config,
477 };
478 Ok(Self::Dictionary(converter, owned))
479 }
480 DataType::RunEndEncoded(_, values) => {
481 let options = SortOptions {
483 descending: false,
484 nulls_first: sort_field.options.nulls_first != sort_field.options.descending,
485 };
486
487 let field = SortField::new_with_options(values.data_type().clone(), options);
488 let converter = RowConverter::new(vec![field])?;
489 Ok(Self::RunEndEncoded(converter))
490 }
491 d if !d.is_nested() => Ok(Self::Stateless),
492 DataType::List(f) | DataType::LargeList(f) => {
493 let options = SortOptions {
497 descending: false,
498 nulls_first: sort_field.options.nulls_first != sort_field.options.descending,
499 };
500
501 let field = SortField::new_with_options(f.data_type().clone(), options);
502 let converter = RowConverter::new(vec![field])?;
503 Ok(Self::List(converter))
504 }
505 DataType::FixedSizeList(f, _) => {
506 let field = SortField::new_with_options(f.data_type().clone(), sort_field.options);
507 let converter = RowConverter::new(vec![field])?;
508 Ok(Self::List(converter))
509 }
510 DataType::Struct(f) => {
511 let sort_fields = f
512 .iter()
513 .map(|x| SortField::new_with_options(x.data_type().clone(), sort_field.options))
514 .collect();
515
516 let converter = RowConverter::new(sort_fields)?;
517 let nulls: Vec<_> = f.iter().map(|x| new_null_array(x.data_type(), 1)).collect();
518
519 let nulls = converter.convert_columns(&nulls)?;
520 let owned = OwnedRow {
521 data: nulls.buffer.into(),
522 config: nulls.config,
523 };
524
525 Ok(Self::Struct(converter, owned))
526 }
527 _ => Err(ArrowError::NotYetImplemented(format!(
528 "not yet implemented: {:?}",
529 sort_field.data_type
530 ))),
531 }
532 }
533
534 fn encoder(&self, array: &dyn Array) -> Result<Encoder<'_>, ArrowError> {
535 match self {
536 Codec::Stateless => Ok(Encoder::Stateless),
537 Codec::Dictionary(converter, nulls) => {
538 let values = array.as_any_dictionary().values().clone();
539 let rows = converter.convert_columns(&[values])?;
540 Ok(Encoder::Dictionary(rows, nulls.row()))
541 }
542 Codec::Struct(converter, null) => {
543 let v = as_struct_array(array);
544 let rows = converter.convert_columns(v.columns())?;
545 Ok(Encoder::Struct(rows, null.row()))
546 }
547 Codec::List(converter) => {
548 let values = match array.data_type() {
549 DataType::List(_) => {
550 let list_array = as_list_array(array);
551 let first_offset = list_array.offsets()[0] as usize;
552 let last_offset =
553 list_array.offsets()[list_array.offsets().len() - 1] as usize;
554
555 list_array
558 .values()
559 .slice(first_offset, last_offset - first_offset)
560 }
561 DataType::LargeList(_) => {
562 let list_array = as_large_list_array(array);
563
564 let first_offset = list_array.offsets()[0] as usize;
565 let last_offset =
566 list_array.offsets()[list_array.offsets().len() - 1] as usize;
567
568 list_array
571 .values()
572 .slice(first_offset, last_offset - first_offset)
573 }
574 DataType::FixedSizeList(_, _) => {
575 as_fixed_size_list_array(array).values().clone()
576 }
577 _ => unreachable!(),
578 };
579 let rows = converter.convert_columns(&[values])?;
580 Ok(Encoder::List(rows))
581 }
582 Codec::RunEndEncoded(converter) => {
583 let values = match array.data_type() {
584 DataType::RunEndEncoded(r, _) => match r.data_type() {
585 DataType::Int16 => array.as_run::<Int16Type>().values(),
586 DataType::Int32 => array.as_run::<Int32Type>().values(),
587 DataType::Int64 => array.as_run::<Int64Type>().values(),
588 _ => unreachable!("Unsupported run end index type: {r:?}"),
589 },
590 _ => unreachable!(),
591 };
592 let rows = converter.convert_columns(std::slice::from_ref(values))?;
593 Ok(Encoder::RunEndEncoded(rows))
594 }
595 }
596 }
597
598 fn size(&self) -> usize {
599 match self {
600 Codec::Stateless => 0,
601 Codec::Dictionary(converter, nulls) => converter.size() + nulls.data.len(),
602 Codec::Struct(converter, nulls) => converter.size() + nulls.data.len(),
603 Codec::List(converter) => converter.size(),
604 Codec::RunEndEncoded(converter) => converter.size(),
605 }
606 }
607}
608
609#[derive(Debug)]
610enum Encoder<'a> {
611 Stateless,
613 Dictionary(Rows, Row<'a>),
615 Struct(Rows, Row<'a>),
621 List(Rows),
623 RunEndEncoded(Rows),
625}
626
627#[derive(Debug, Clone, PartialEq, Eq)]
629pub struct SortField {
630 options: SortOptions,
632 data_type: DataType,
634}
635
636impl SortField {
637 pub fn new(data_type: DataType) -> Self {
639 Self::new_with_options(data_type, Default::default())
640 }
641
642 pub fn new_with_options(data_type: DataType, options: SortOptions) -> Self {
644 Self { options, data_type }
645 }
646
647 pub fn size(&self) -> usize {
651 self.data_type.size() + std::mem::size_of::<Self>() - std::mem::size_of::<DataType>()
652 }
653}
654
655impl RowConverter {
656 pub fn new(fields: Vec<SortField>) -> Result<Self, ArrowError> {
658 if !Self::supports_fields(&fields) {
659 return Err(ArrowError::NotYetImplemented(format!(
660 "Row format support not yet implemented for: {fields:?}"
661 )));
662 }
663
664 let codecs = fields.iter().map(Codec::new).collect::<Result<_, _>>()?;
665 Ok(Self {
666 fields: fields.into(),
667 codecs,
668 })
669 }
670
671 pub fn supports_fields(fields: &[SortField]) -> bool {
673 fields.iter().all(|x| Self::supports_datatype(&x.data_type))
674 }
675
676 fn supports_datatype(d: &DataType) -> bool {
677 match d {
678 _ if !d.is_nested() => true,
679 DataType::List(f) | DataType::LargeList(f) | DataType::FixedSizeList(f, _) => {
680 Self::supports_datatype(f.data_type())
681 }
682 DataType::Struct(f) => f.iter().all(|x| Self::supports_datatype(x.data_type())),
683 DataType::RunEndEncoded(_, values) => Self::supports_datatype(values.data_type()),
684 _ => false,
685 }
686 }
687
688 pub fn convert_columns(&self, columns: &[ArrayRef]) -> Result<Rows, ArrowError> {
698 let num_rows = columns.first().map(|x| x.len()).unwrap_or(0);
699 let mut rows = self.empty_rows(num_rows, 0);
700 self.append(&mut rows, columns)?;
701 Ok(rows)
702 }
703
704 pub fn append(&self, rows: &mut Rows, columns: &[ArrayRef]) -> Result<(), ArrowError> {
735 assert!(
736 Arc::ptr_eq(&rows.config.fields, &self.fields),
737 "rows were not produced by this RowConverter"
738 );
739
740 if columns.len() != self.fields.len() {
741 return Err(ArrowError::InvalidArgumentError(format!(
742 "Incorrect number of arrays provided to RowConverter, expected {} got {}",
743 self.fields.len(),
744 columns.len()
745 )));
746 }
747 for colum in columns.iter().skip(1) {
748 if colum.len() != columns[0].len() {
749 return Err(ArrowError::InvalidArgumentError(format!(
750 "RowConverter columns must all have the same length, expected {} got {}",
751 columns[0].len(),
752 colum.len()
753 )));
754 }
755 }
756
757 let encoders = columns
758 .iter()
759 .zip(&self.codecs)
760 .zip(self.fields.iter())
761 .map(|((column, codec), field)| {
762 if !column.data_type().equals_datatype(&field.data_type) {
763 return Err(ArrowError::InvalidArgumentError(format!(
764 "RowConverter column schema mismatch, expected {} got {}",
765 field.data_type,
766 column.data_type()
767 )));
768 }
769 codec.encoder(column.as_ref())
770 })
771 .collect::<Result<Vec<_>, _>>()?;
772
773 let write_offset = rows.num_rows();
774 let lengths = row_lengths(columns, &encoders);
775 let total = lengths.extend_offsets(rows.offsets[write_offset], &mut rows.offsets);
776 rows.buffer.resize(total, 0);
777
778 for ((column, field), encoder) in columns.iter().zip(self.fields.iter()).zip(encoders) {
779 encode_column(
781 &mut rows.buffer,
782 &mut rows.offsets[write_offset..],
783 column.as_ref(),
784 field.options,
785 &encoder,
786 )
787 }
788
789 if cfg!(debug_assertions) {
790 assert_eq!(*rows.offsets.last().unwrap(), rows.buffer.len());
791 rows.offsets
792 .windows(2)
793 .for_each(|w| assert!(w[0] <= w[1], "offsets should be monotonic"));
794 }
795
796 Ok(())
797 }
798
799 pub fn convert_rows<'a, I>(&self, rows: I) -> Result<Vec<ArrayRef>, ArrowError>
807 where
808 I: IntoIterator<Item = Row<'a>>,
809 {
810 let mut validate_utf8 = false;
811 let mut rows: Vec<_> = rows
812 .into_iter()
813 .map(|row| {
814 assert!(
815 Arc::ptr_eq(&row.config.fields, &self.fields),
816 "rows were not produced by this RowConverter"
817 );
818 validate_utf8 |= row.config.validate_utf8;
819 row.data
820 })
821 .collect();
822
823 let result = unsafe { self.convert_raw(&mut rows, validate_utf8) }?;
827
828 if cfg!(test) {
829 for (i, row) in rows.iter().enumerate() {
830 if !row.is_empty() {
831 return Err(ArrowError::InvalidArgumentError(format!(
832 "Codecs {codecs:?} did not consume all bytes for row {i}, remaining bytes: {row:?}",
833 codecs = &self.codecs
834 )));
835 }
836 }
837 }
838
839 Ok(result)
840 }
841
842 pub fn empty_rows(&self, row_capacity: usize, data_capacity: usize) -> Rows {
871 let mut offsets = Vec::with_capacity(row_capacity.saturating_add(1));
872 offsets.push(0);
873
874 Rows {
875 offsets,
876 buffer: Vec::with_capacity(data_capacity),
877 config: RowConfig {
878 fields: self.fields.clone(),
879 validate_utf8: false,
880 },
881 }
882 }
883
884 pub fn from_binary(&self, array: BinaryArray) -> Rows {
911 assert_eq!(
912 array.null_count(),
913 0,
914 "can't construct Rows instance from array with nulls"
915 );
916 Rows {
917 buffer: array.values().to_vec(),
918 offsets: array.offsets().iter().map(|&i| i.as_usize()).collect(),
919 config: RowConfig {
920 fields: Arc::clone(&self.fields),
921 validate_utf8: true,
922 },
923 }
924 }
925
926 unsafe fn convert_raw(
932 &self,
933 rows: &mut [&[u8]],
934 validate_utf8: bool,
935 ) -> Result<Vec<ArrayRef>, ArrowError> {
936 self.fields
937 .iter()
938 .zip(&self.codecs)
939 .map(|(field, codec)| unsafe { decode_column(field, rows, codec, validate_utf8) })
940 .collect()
941 }
942
943 pub fn parser(&self) -> RowParser {
945 RowParser::new(Arc::clone(&self.fields))
946 }
947
948 pub fn size(&self) -> usize {
952 std::mem::size_of::<Self>()
953 + self.fields.iter().map(|x| x.size()).sum::<usize>()
954 + self.codecs.capacity() * std::mem::size_of::<Codec>()
955 + self.codecs.iter().map(Codec::size).sum::<usize>()
956 }
957}
958
959#[derive(Debug)]
961pub struct RowParser {
962 config: RowConfig,
963}
964
965impl RowParser {
966 fn new(fields: Arc<[SortField]>) -> Self {
967 Self {
968 config: RowConfig {
969 fields,
970 validate_utf8: true,
971 },
972 }
973 }
974
975 pub fn parse<'a>(&'a self, bytes: &'a [u8]) -> Row<'a> {
980 Row {
981 data: bytes,
982 config: &self.config,
983 }
984 }
985}
986
987#[derive(Debug, Clone)]
989struct RowConfig {
990 fields: Arc<[SortField]>,
992 validate_utf8: bool,
994}
995
996#[derive(Debug)]
1000pub struct Rows {
1001 buffer: Vec<u8>,
1003 offsets: Vec<usize>,
1005 config: RowConfig,
1007}
1008
1009impl Rows {
1010 pub fn push(&mut self, row: Row<'_>) {
1012 assert!(
1013 Arc::ptr_eq(&row.config.fields, &self.config.fields),
1014 "row was not produced by this RowConverter"
1015 );
1016 self.config.validate_utf8 |= row.config.validate_utf8;
1017 self.buffer.extend_from_slice(row.data);
1018 self.offsets.push(self.buffer.len())
1019 }
1020
1021 pub fn row(&self, row: usize) -> Row<'_> {
1023 assert!(row + 1 < self.offsets.len());
1024 unsafe { self.row_unchecked(row) }
1025 }
1026
1027 pub unsafe fn row_unchecked(&self, index: usize) -> Row<'_> {
1032 let end = unsafe { self.offsets.get_unchecked(index + 1) };
1033 let start = unsafe { self.offsets.get_unchecked(index) };
1034 let data = unsafe { self.buffer.get_unchecked(*start..*end) };
1035 Row {
1036 data,
1037 config: &self.config,
1038 }
1039 }
1040
1041 pub fn clear(&mut self) {
1043 self.offsets.truncate(1);
1044 self.buffer.clear();
1045 }
1046
1047 pub fn num_rows(&self) -> usize {
1049 self.offsets.len() - 1
1050 }
1051
1052 pub fn iter(&self) -> RowsIter<'_> {
1054 self.into_iter()
1055 }
1056
1057 pub fn size(&self) -> usize {
1061 std::mem::size_of::<Self>()
1063 + self.buffer.len()
1064 + self.offsets.len() * std::mem::size_of::<usize>()
1065 }
1066
1067 pub fn try_into_binary(self) -> Result<BinaryArray, ArrowError> {
1097 if self.buffer.len() > i32::MAX as usize {
1098 return Err(ArrowError::InvalidArgumentError(format!(
1099 "{}-byte rows buffer too long to convert into a i32-indexed BinaryArray",
1100 self.buffer.len()
1101 )));
1102 }
1103 let offsets_scalar = ScalarBuffer::from_iter(self.offsets.into_iter().map(i32::usize_as));
1105 let array = unsafe {
1107 BinaryArray::new_unchecked(
1108 OffsetBuffer::new_unchecked(offsets_scalar),
1109 Buffer::from_vec(self.buffer),
1110 None,
1111 )
1112 };
1113 Ok(array)
1114 }
1115}
1116
1117impl<'a> IntoIterator for &'a Rows {
1118 type Item = Row<'a>;
1119 type IntoIter = RowsIter<'a>;
1120
1121 fn into_iter(self) -> Self::IntoIter {
1122 RowsIter {
1123 rows: self,
1124 start: 0,
1125 end: self.num_rows(),
1126 }
1127 }
1128}
1129
1130#[derive(Debug)]
1132pub struct RowsIter<'a> {
1133 rows: &'a Rows,
1134 start: usize,
1135 end: usize,
1136}
1137
1138impl<'a> Iterator for RowsIter<'a> {
1139 type Item = Row<'a>;
1140
1141 fn next(&mut self) -> Option<Self::Item> {
1142 if self.end == self.start {
1143 return None;
1144 }
1145
1146 let row = unsafe { self.rows.row_unchecked(self.start) };
1148 self.start += 1;
1149 Some(row)
1150 }
1151
1152 fn size_hint(&self) -> (usize, Option<usize>) {
1153 let len = self.len();
1154 (len, Some(len))
1155 }
1156}
1157
1158impl ExactSizeIterator for RowsIter<'_> {
1159 fn len(&self) -> usize {
1160 self.end - self.start
1161 }
1162}
1163
1164impl DoubleEndedIterator for RowsIter<'_> {
1165 fn next_back(&mut self) -> Option<Self::Item> {
1166 if self.end == self.start {
1167 return None;
1168 }
1169 let row = unsafe { self.rows.row_unchecked(self.end) };
1171 self.end -= 1;
1172 Some(row)
1173 }
1174}
1175
1176#[derive(Debug, Copy, Clone)]
1185pub struct Row<'a> {
1186 data: &'a [u8],
1187 config: &'a RowConfig,
1188}
1189
1190impl<'a> Row<'a> {
1191 pub fn owned(&self) -> OwnedRow {
1193 OwnedRow {
1194 data: self.data.into(),
1195 config: self.config.clone(),
1196 }
1197 }
1198
1199 pub fn data(&self) -> &'a [u8] {
1201 self.data
1202 }
1203}
1204
1205impl PartialEq for Row<'_> {
1208 #[inline]
1209 fn eq(&self, other: &Self) -> bool {
1210 self.data.eq(other.data)
1211 }
1212}
1213
1214impl Eq for Row<'_> {}
1215
1216impl PartialOrd for Row<'_> {
1217 #[inline]
1218 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1219 Some(self.cmp(other))
1220 }
1221}
1222
1223impl Ord for Row<'_> {
1224 #[inline]
1225 fn cmp(&self, other: &Self) -> Ordering {
1226 self.data.cmp(other.data)
1227 }
1228}
1229
1230impl Hash for Row<'_> {
1231 #[inline]
1232 fn hash<H: Hasher>(&self, state: &mut H) {
1233 self.data.hash(state)
1234 }
1235}
1236
1237impl AsRef<[u8]> for Row<'_> {
1238 #[inline]
1239 fn as_ref(&self) -> &[u8] {
1240 self.data
1241 }
1242}
1243
1244#[derive(Debug, Clone)]
1248pub struct OwnedRow {
1249 data: Box<[u8]>,
1250 config: RowConfig,
1251}
1252
1253impl OwnedRow {
1254 pub fn row(&self) -> Row<'_> {
1258 Row {
1259 data: &self.data,
1260 config: &self.config,
1261 }
1262 }
1263}
1264
1265impl PartialEq for OwnedRow {
1268 #[inline]
1269 fn eq(&self, other: &Self) -> bool {
1270 self.row().eq(&other.row())
1271 }
1272}
1273
1274impl Eq for OwnedRow {}
1275
1276impl PartialOrd for OwnedRow {
1277 #[inline]
1278 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1279 Some(self.cmp(other))
1280 }
1281}
1282
1283impl Ord for OwnedRow {
1284 #[inline]
1285 fn cmp(&self, other: &Self) -> Ordering {
1286 self.row().cmp(&other.row())
1287 }
1288}
1289
1290impl Hash for OwnedRow {
1291 #[inline]
1292 fn hash<H: Hasher>(&self, state: &mut H) {
1293 self.row().hash(state)
1294 }
1295}
1296
1297impl AsRef<[u8]> for OwnedRow {
1298 #[inline]
1299 fn as_ref(&self) -> &[u8] {
1300 &self.data
1301 }
1302}
1303
1304#[inline]
1306fn null_sentinel(options: SortOptions) -> u8 {
1307 match options.nulls_first {
1308 true => 0,
1309 false => 0xFF,
1310 }
1311}
1312
1313enum LengthTracker {
1315 Fixed { length: usize, num_rows: usize },
1317 Variable {
1319 fixed_length: usize,
1320 lengths: Vec<usize>,
1321 },
1322}
1323
1324impl LengthTracker {
1325 fn new(num_rows: usize) -> Self {
1326 Self::Fixed {
1327 length: 0,
1328 num_rows,
1329 }
1330 }
1331
1332 fn push_fixed(&mut self, new_length: usize) {
1334 match self {
1335 LengthTracker::Fixed { length, .. } => *length += new_length,
1336 LengthTracker::Variable { fixed_length, .. } => *fixed_length += new_length,
1337 }
1338 }
1339
1340 fn push_variable(&mut self, new_lengths: impl ExactSizeIterator<Item = usize>) {
1342 match self {
1343 LengthTracker::Fixed { length, .. } => {
1344 *self = LengthTracker::Variable {
1345 fixed_length: *length,
1346 lengths: new_lengths.collect(),
1347 }
1348 }
1349 LengthTracker::Variable { lengths, .. } => {
1350 assert_eq!(lengths.len(), new_lengths.len());
1351 lengths
1352 .iter_mut()
1353 .zip(new_lengths)
1354 .for_each(|(length, new_length)| *length += new_length);
1355 }
1356 }
1357 }
1358
1359 fn materialized(&mut self) -> &mut [usize] {
1361 if let LengthTracker::Fixed { length, num_rows } = *self {
1362 *self = LengthTracker::Variable {
1363 fixed_length: length,
1364 lengths: vec![0; num_rows],
1365 };
1366 }
1367
1368 match self {
1369 LengthTracker::Variable { lengths, .. } => lengths,
1370 LengthTracker::Fixed { .. } => unreachable!(),
1371 }
1372 }
1373
1374 fn extend_offsets(&self, initial_offset: usize, offsets: &mut Vec<usize>) -> usize {
1392 match self {
1393 LengthTracker::Fixed { length, num_rows } => {
1394 offsets.extend((0..*num_rows).map(|i| initial_offset + i * length));
1395
1396 initial_offset + num_rows * length
1397 }
1398 LengthTracker::Variable {
1399 fixed_length,
1400 lengths,
1401 } => {
1402 let mut acc = initial_offset;
1403
1404 offsets.extend(lengths.iter().map(|length| {
1405 let current = acc;
1406 acc += length + fixed_length;
1407 current
1408 }));
1409
1410 acc
1411 }
1412 }
1413 }
1414}
1415
1416fn row_lengths(cols: &[ArrayRef], encoders: &[Encoder]) -> LengthTracker {
1418 use fixed::FixedLengthEncoding;
1419
1420 let num_rows = cols.first().map(|x| x.len()).unwrap_or(0);
1421 let mut tracker = LengthTracker::new(num_rows);
1422
1423 for (array, encoder) in cols.iter().zip(encoders) {
1424 match encoder {
1425 Encoder::Stateless => {
1426 downcast_primitive_array! {
1427 array => tracker.push_fixed(fixed::encoded_len(array)),
1428 DataType::Null => {},
1429 DataType::Boolean => tracker.push_fixed(bool::ENCODED_LEN),
1430 DataType::Binary => tracker.push_variable(
1431 as_generic_binary_array::<i32>(array)
1432 .iter()
1433 .map(|slice| variable::encoded_len(slice))
1434 ),
1435 DataType::LargeBinary => tracker.push_variable(
1436 as_generic_binary_array::<i64>(array)
1437 .iter()
1438 .map(|slice| variable::encoded_len(slice))
1439 ),
1440 DataType::BinaryView => tracker.push_variable(
1441 array.as_binary_view()
1442 .iter()
1443 .map(|slice| variable::encoded_len(slice))
1444 ),
1445 DataType::Utf8 => tracker.push_variable(
1446 array.as_string::<i32>()
1447 .iter()
1448 .map(|slice| variable::encoded_len(slice.map(|x| x.as_bytes())))
1449 ),
1450 DataType::LargeUtf8 => tracker.push_variable(
1451 array.as_string::<i64>()
1452 .iter()
1453 .map(|slice| variable::encoded_len(slice.map(|x| x.as_bytes())))
1454 ),
1455 DataType::Utf8View => tracker.push_variable(
1456 array.as_string_view()
1457 .iter()
1458 .map(|slice| variable::encoded_len(slice.map(|x| x.as_bytes())))
1459 ),
1460 DataType::FixedSizeBinary(len) => {
1461 let len = len.to_usize().unwrap();
1462 tracker.push_fixed(1 + len)
1463 }
1464 _ => unimplemented!("unsupported data type: {}", array.data_type()),
1465 }
1466 }
1467 Encoder::Dictionary(values, null) => {
1468 downcast_dictionary_array! {
1469 array => {
1470 tracker.push_variable(
1471 array.keys().iter().map(|v| match v {
1472 Some(k) => values.row(k.as_usize()).data.len(),
1473 None => null.data.len(),
1474 })
1475 )
1476 }
1477 _ => unreachable!(),
1478 }
1479 }
1480 Encoder::Struct(rows, null) => {
1481 let array = as_struct_array(array);
1482 tracker.push_variable((0..array.len()).map(|idx| match array.is_valid(idx) {
1483 true => 1 + rows.row(idx).as_ref().len(),
1484 false => 1 + null.data.len(),
1485 }));
1486 }
1487 Encoder::List(rows) => match array.data_type() {
1488 DataType::List(_) => {
1489 list::compute_lengths(tracker.materialized(), rows, as_list_array(array))
1490 }
1491 DataType::LargeList(_) => {
1492 list::compute_lengths(tracker.materialized(), rows, as_large_list_array(array))
1493 }
1494 DataType::FixedSizeList(_, _) => compute_lengths_fixed_size_list(
1495 &mut tracker,
1496 rows,
1497 as_fixed_size_list_array(array),
1498 ),
1499 _ => unreachable!(),
1500 },
1501 Encoder::RunEndEncoded(rows) => match array.data_type() {
1502 DataType::RunEndEncoded(r, _) => match r.data_type() {
1503 DataType::Int16 => run::compute_lengths(
1504 tracker.materialized(),
1505 rows,
1506 array.as_run::<Int16Type>(),
1507 ),
1508 DataType::Int32 => run::compute_lengths(
1509 tracker.materialized(),
1510 rows,
1511 array.as_run::<Int32Type>(),
1512 ),
1513 DataType::Int64 => run::compute_lengths(
1514 tracker.materialized(),
1515 rows,
1516 array.as_run::<Int64Type>(),
1517 ),
1518 _ => unreachable!("Unsupported run end index type: {r:?}"),
1519 },
1520 _ => unreachable!(),
1521 },
1522 }
1523 }
1524
1525 tracker
1526}
1527
1528fn encode_column(
1530 data: &mut [u8],
1531 offsets: &mut [usize],
1532 column: &dyn Array,
1533 opts: SortOptions,
1534 encoder: &Encoder<'_>,
1535) {
1536 match encoder {
1537 Encoder::Stateless => {
1538 downcast_primitive_array! {
1539 column => {
1540 if let Some(nulls) = column.nulls().filter(|n| n.null_count() > 0){
1541 fixed::encode(data, offsets, column.values(), nulls, opts)
1542 } else {
1543 fixed::encode_not_null(data, offsets, column.values(), opts)
1544 }
1545 }
1546 DataType::Null => {}
1547 DataType::Boolean => {
1548 if let Some(nulls) = column.nulls().filter(|n| n.null_count() > 0){
1549 fixed::encode_boolean(data, offsets, column.as_boolean().values(), nulls, opts)
1550 } else {
1551 fixed::encode_boolean_not_null(data, offsets, column.as_boolean().values(), opts)
1552 }
1553 }
1554 DataType::Binary => {
1555 variable::encode(data, offsets, as_generic_binary_array::<i32>(column).iter(), opts)
1556 }
1557 DataType::BinaryView => {
1558 variable::encode(data, offsets, column.as_binary_view().iter(), opts)
1559 }
1560 DataType::LargeBinary => {
1561 variable::encode(data, offsets, as_generic_binary_array::<i64>(column).iter(), opts)
1562 }
1563 DataType::Utf8 => variable::encode(
1564 data, offsets,
1565 column.as_string::<i32>().iter().map(|x| x.map(|x| x.as_bytes())),
1566 opts,
1567 ),
1568 DataType::LargeUtf8 => variable::encode(
1569 data, offsets,
1570 column.as_string::<i64>()
1571 .iter()
1572 .map(|x| x.map(|x| x.as_bytes())),
1573 opts,
1574 ),
1575 DataType::Utf8View => variable::encode(
1576 data, offsets,
1577 column.as_string_view().iter().map(|x| x.map(|x| x.as_bytes())),
1578 opts,
1579 ),
1580 DataType::FixedSizeBinary(_) => {
1581 let array = column.as_any().downcast_ref().unwrap();
1582 fixed::encode_fixed_size_binary(data, offsets, array, opts)
1583 }
1584 _ => unimplemented!("unsupported data type: {}", column.data_type()),
1585 }
1586 }
1587 Encoder::Dictionary(values, nulls) => {
1588 downcast_dictionary_array! {
1589 column => encode_dictionary_values(data, offsets, column, values, nulls),
1590 _ => unreachable!()
1591 }
1592 }
1593 Encoder::Struct(rows, null) => {
1594 let array = as_struct_array(column);
1595 let null_sentinel = null_sentinel(opts);
1596 offsets
1597 .iter_mut()
1598 .skip(1)
1599 .enumerate()
1600 .for_each(|(idx, offset)| {
1601 let (row, sentinel) = match array.is_valid(idx) {
1602 true => (rows.row(idx), 0x01),
1603 false => (*null, null_sentinel),
1604 };
1605 let end_offset = *offset + 1 + row.as_ref().len();
1606 data[*offset] = sentinel;
1607 data[*offset + 1..end_offset].copy_from_slice(row.as_ref());
1608 *offset = end_offset;
1609 })
1610 }
1611 Encoder::List(rows) => match column.data_type() {
1612 DataType::List(_) => list::encode(data, offsets, rows, opts, as_list_array(column)),
1613 DataType::LargeList(_) => {
1614 list::encode(data, offsets, rows, opts, as_large_list_array(column))
1615 }
1616 DataType::FixedSizeList(_, _) => {
1617 encode_fixed_size_list(data, offsets, rows, opts, as_fixed_size_list_array(column))
1618 }
1619 _ => unreachable!(),
1620 },
1621 Encoder::RunEndEncoded(rows) => match column.data_type() {
1622 DataType::RunEndEncoded(r, _) => match r.data_type() {
1623 DataType::Int16 => {
1624 run::encode(data, offsets, rows, opts, column.as_run::<Int16Type>())
1625 }
1626 DataType::Int32 => {
1627 run::encode(data, offsets, rows, opts, column.as_run::<Int32Type>())
1628 }
1629 DataType::Int64 => {
1630 run::encode(data, offsets, rows, opts, column.as_run::<Int64Type>())
1631 }
1632 _ => unreachable!("Unsupported run end index type: {r:?}"),
1633 },
1634 _ => unreachable!(),
1635 },
1636 }
1637}
1638
1639pub fn encode_dictionary_values<K: ArrowDictionaryKeyType>(
1641 data: &mut [u8],
1642 offsets: &mut [usize],
1643 column: &DictionaryArray<K>,
1644 values: &Rows,
1645 null: &Row<'_>,
1646) {
1647 for (offset, k) in offsets.iter_mut().skip(1).zip(column.keys()) {
1648 let row = match k {
1649 Some(k) => values.row(k.as_usize()).data,
1650 None => null.data,
1651 };
1652 let end_offset = *offset + row.len();
1653 data[*offset..end_offset].copy_from_slice(row);
1654 *offset = end_offset;
1655 }
1656}
1657
1658macro_rules! decode_primitive_helper {
1659 ($t:ty, $rows:ident, $data_type:ident, $options:ident) => {
1660 Arc::new(decode_primitive::<$t>($rows, $data_type, $options))
1661 };
1662}
1663
1664unsafe fn decode_column(
1670 field: &SortField,
1671 rows: &mut [&[u8]],
1672 codec: &Codec,
1673 validate_utf8: bool,
1674) -> Result<ArrayRef, ArrowError> {
1675 let options = field.options;
1676
1677 let array: ArrayRef = match codec {
1678 Codec::Stateless => {
1679 let data_type = field.data_type.clone();
1680 downcast_primitive! {
1681 data_type => (decode_primitive_helper, rows, data_type, options),
1682 DataType::Null => Arc::new(NullArray::new(rows.len())),
1683 DataType::Boolean => Arc::new(decode_bool(rows, options)),
1684 DataType::Binary => Arc::new(decode_binary::<i32>(rows, options)),
1685 DataType::LargeBinary => Arc::new(decode_binary::<i64>(rows, options)),
1686 DataType::BinaryView => Arc::new(decode_binary_view(rows, options)),
1687 DataType::FixedSizeBinary(size) => Arc::new(decode_fixed_size_binary(rows, size, options)),
1688 DataType::Utf8 => Arc::new(unsafe{ decode_string::<i32>(rows, options, validate_utf8) }),
1689 DataType::LargeUtf8 => Arc::new(unsafe { decode_string::<i64>(rows, options, validate_utf8) }),
1690 DataType::Utf8View => Arc::new(unsafe { decode_string_view(rows, options, validate_utf8) }),
1691 _ => return Err(ArrowError::NotYetImplemented(format!("unsupported data type: {data_type}" )))
1692 }
1693 }
1694 Codec::Dictionary(converter, _) => {
1695 let cols = unsafe { converter.convert_raw(rows, validate_utf8) }?;
1696 cols.into_iter().next().unwrap()
1697 }
1698 Codec::Struct(converter, _) => {
1699 let (null_count, nulls) = fixed::decode_nulls(rows);
1700 rows.iter_mut().for_each(|row| *row = &row[1..]);
1701 let children = unsafe { converter.convert_raw(rows, validate_utf8) }?;
1702
1703 let child_data: Vec<ArrayData> = children.iter().map(|c| c.to_data()).collect();
1704 let corrected_fields: Vec<Field> = match &field.data_type {
1707 DataType::Struct(struct_fields) => struct_fields
1708 .iter()
1709 .zip(child_data.iter())
1710 .map(|(orig_field, child_array)| {
1711 orig_field
1712 .as_ref()
1713 .clone()
1714 .with_data_type(child_array.data_type().clone())
1715 })
1716 .collect(),
1717 _ => unreachable!("Only Struct types should be corrected here"),
1718 };
1719 let corrected_struct_type = DataType::Struct(corrected_fields.into());
1720 let builder = ArrayDataBuilder::new(corrected_struct_type)
1721 .len(rows.len())
1722 .null_count(null_count)
1723 .null_bit_buffer(Some(nulls))
1724 .child_data(child_data);
1725
1726 Arc::new(StructArray::from(unsafe { builder.build_unchecked() }))
1727 }
1728 Codec::List(converter) => match &field.data_type {
1729 DataType::List(_) => {
1730 Arc::new(unsafe { list::decode::<i32>(converter, rows, field, validate_utf8) }?)
1731 }
1732 DataType::LargeList(_) => {
1733 Arc::new(unsafe { list::decode::<i64>(converter, rows, field, validate_utf8) }?)
1734 }
1735 DataType::FixedSizeList(_, value_length) => Arc::new(unsafe {
1736 list::decode_fixed_size_list(
1737 converter,
1738 rows,
1739 field,
1740 validate_utf8,
1741 value_length.as_usize(),
1742 )
1743 }?),
1744 _ => unreachable!(),
1745 },
1746 Codec::RunEndEncoded(converter) => match &field.data_type {
1747 DataType::RunEndEncoded(run_ends, _) => match run_ends.data_type() {
1748 DataType::Int16 => Arc::new(unsafe {
1749 run::decode::<Int16Type>(converter, rows, field, validate_utf8)
1750 }?),
1751 DataType::Int32 => Arc::new(unsafe {
1752 run::decode::<Int32Type>(converter, rows, field, validate_utf8)
1753 }?),
1754 DataType::Int64 => Arc::new(unsafe {
1755 run::decode::<Int64Type>(converter, rows, field, validate_utf8)
1756 }?),
1757 _ => unreachable!(),
1758 },
1759 _ => unreachable!(),
1760 },
1761 };
1762 Ok(array)
1763}
1764
1765#[cfg(test)]
1766mod tests {
1767 use rand::distr::uniform::SampleUniform;
1768 use rand::distr::{Distribution, StandardUniform};
1769 use rand::{Rng, rng};
1770
1771 use arrow_array::builder::*;
1772 use arrow_array::types::*;
1773 use arrow_array::*;
1774 use arrow_buffer::{Buffer, OffsetBuffer};
1775 use arrow_buffer::{NullBuffer, i256};
1776 use arrow_cast::display::{ArrayFormatter, FormatOptions};
1777 use arrow_ord::sort::{LexicographicalComparator, SortColumn};
1778
1779 use super::*;
1780
1781 #[test]
1782 fn test_fixed_width() {
1783 let cols = [
1784 Arc::new(Int16Array::from_iter([
1785 Some(1),
1786 Some(2),
1787 None,
1788 Some(-5),
1789 Some(2),
1790 Some(2),
1791 Some(0),
1792 ])) as ArrayRef,
1793 Arc::new(Float32Array::from_iter([
1794 Some(1.3),
1795 Some(2.5),
1796 None,
1797 Some(4.),
1798 Some(0.1),
1799 Some(-4.),
1800 Some(-0.),
1801 ])) as ArrayRef,
1802 ];
1803
1804 let converter = RowConverter::new(vec![
1805 SortField::new(DataType::Int16),
1806 SortField::new(DataType::Float32),
1807 ])
1808 .unwrap();
1809 let rows = converter.convert_columns(&cols).unwrap();
1810
1811 assert_eq!(rows.offsets, &[0, 8, 16, 24, 32, 40, 48, 56]);
1812 assert_eq!(
1813 rows.buffer,
1814 &[
1815 1, 128, 1, 1, 191, 166, 102, 102, 1, 128, 2, 1, 192, 32, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 127, 251, 1, 192, 128, 0, 0, 1, 128, 2, 1, 189, 204, 204, 205, 1, 128, 2, 1, 63, 127, 255, 255, 1, 128, 0, 1, 127, 255, 255, 255 ]
1830 );
1831
1832 assert!(rows.row(3) < rows.row(6));
1833 assert!(rows.row(0) < rows.row(1));
1834 assert!(rows.row(3) < rows.row(0));
1835 assert!(rows.row(4) < rows.row(1));
1836 assert!(rows.row(5) < rows.row(4));
1837
1838 let back = converter.convert_rows(&rows).unwrap();
1839 for (expected, actual) in cols.iter().zip(&back) {
1840 assert_eq!(expected, actual);
1841 }
1842 }
1843
1844 #[test]
1845 fn test_decimal32() {
1846 let converter = RowConverter::new(vec![SortField::new(DataType::Decimal32(
1847 DECIMAL32_MAX_PRECISION,
1848 7,
1849 ))])
1850 .unwrap();
1851 let col = Arc::new(
1852 Decimal32Array::from_iter([
1853 None,
1854 Some(i32::MIN),
1855 Some(-13),
1856 Some(46_i32),
1857 Some(5456_i32),
1858 Some(i32::MAX),
1859 ])
1860 .with_precision_and_scale(9, 7)
1861 .unwrap(),
1862 ) as ArrayRef;
1863
1864 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1865 for i in 0..rows.num_rows() - 1 {
1866 assert!(rows.row(i) < rows.row(i + 1));
1867 }
1868
1869 let back = converter.convert_rows(&rows).unwrap();
1870 assert_eq!(back.len(), 1);
1871 assert_eq!(col.as_ref(), back[0].as_ref())
1872 }
1873
1874 #[test]
1875 fn test_decimal64() {
1876 let converter = RowConverter::new(vec![SortField::new(DataType::Decimal64(
1877 DECIMAL64_MAX_PRECISION,
1878 7,
1879 ))])
1880 .unwrap();
1881 let col = Arc::new(
1882 Decimal64Array::from_iter([
1883 None,
1884 Some(i64::MIN),
1885 Some(-13),
1886 Some(46_i64),
1887 Some(5456_i64),
1888 Some(i64::MAX),
1889 ])
1890 .with_precision_and_scale(18, 7)
1891 .unwrap(),
1892 ) as ArrayRef;
1893
1894 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1895 for i in 0..rows.num_rows() - 1 {
1896 assert!(rows.row(i) < rows.row(i + 1));
1897 }
1898
1899 let back = converter.convert_rows(&rows).unwrap();
1900 assert_eq!(back.len(), 1);
1901 assert_eq!(col.as_ref(), back[0].as_ref())
1902 }
1903
1904 #[test]
1905 fn test_decimal128() {
1906 let converter = RowConverter::new(vec![SortField::new(DataType::Decimal128(
1907 DECIMAL128_MAX_PRECISION,
1908 7,
1909 ))])
1910 .unwrap();
1911 let col = Arc::new(
1912 Decimal128Array::from_iter([
1913 None,
1914 Some(i128::MIN),
1915 Some(-13),
1916 Some(46_i128),
1917 Some(5456_i128),
1918 Some(i128::MAX),
1919 ])
1920 .with_precision_and_scale(38, 7)
1921 .unwrap(),
1922 ) as ArrayRef;
1923
1924 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1925 for i in 0..rows.num_rows() - 1 {
1926 assert!(rows.row(i) < rows.row(i + 1));
1927 }
1928
1929 let back = converter.convert_rows(&rows).unwrap();
1930 assert_eq!(back.len(), 1);
1931 assert_eq!(col.as_ref(), back[0].as_ref())
1932 }
1933
1934 #[test]
1935 fn test_decimal256() {
1936 let converter = RowConverter::new(vec![SortField::new(DataType::Decimal256(
1937 DECIMAL256_MAX_PRECISION,
1938 7,
1939 ))])
1940 .unwrap();
1941 let col = Arc::new(
1942 Decimal256Array::from_iter([
1943 None,
1944 Some(i256::MIN),
1945 Some(i256::from_parts(0, -1)),
1946 Some(i256::from_parts(u128::MAX, -1)),
1947 Some(i256::from_parts(u128::MAX, 0)),
1948 Some(i256::from_parts(0, 46_i128)),
1949 Some(i256::from_parts(5, 46_i128)),
1950 Some(i256::MAX),
1951 ])
1952 .with_precision_and_scale(DECIMAL256_MAX_PRECISION, 7)
1953 .unwrap(),
1954 ) as ArrayRef;
1955
1956 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1957 for i in 0..rows.num_rows() - 1 {
1958 assert!(rows.row(i) < rows.row(i + 1));
1959 }
1960
1961 let back = converter.convert_rows(&rows).unwrap();
1962 assert_eq!(back.len(), 1);
1963 assert_eq!(col.as_ref(), back[0].as_ref())
1964 }
1965
1966 #[test]
1967 fn test_bool() {
1968 let converter = RowConverter::new(vec![SortField::new(DataType::Boolean)]).unwrap();
1969
1970 let col = Arc::new(BooleanArray::from_iter([None, Some(false), Some(true)])) as ArrayRef;
1971
1972 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1973 assert!(rows.row(2) > rows.row(1));
1974 assert!(rows.row(2) > rows.row(0));
1975 assert!(rows.row(1) > rows.row(0));
1976
1977 let cols = converter.convert_rows(&rows).unwrap();
1978 assert_eq!(&cols[0], &col);
1979
1980 let converter = RowConverter::new(vec![SortField::new_with_options(
1981 DataType::Boolean,
1982 SortOptions::default().desc().with_nulls_first(false),
1983 )])
1984 .unwrap();
1985
1986 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1987 assert!(rows.row(2) < rows.row(1));
1988 assert!(rows.row(2) < rows.row(0));
1989 assert!(rows.row(1) < rows.row(0));
1990 let cols = converter.convert_rows(&rows).unwrap();
1991 assert_eq!(&cols[0], &col);
1992 }
1993
1994 #[test]
1995 fn test_timezone() {
1996 let a =
1997 TimestampNanosecondArray::from(vec![1, 2, 3, 4, 5]).with_timezone("+01:00".to_string());
1998 let d = a.data_type().clone();
1999
2000 let converter = RowConverter::new(vec![SortField::new(a.data_type().clone())]).unwrap();
2001 let rows = converter.convert_columns(&[Arc::new(a) as _]).unwrap();
2002 let back = converter.convert_rows(&rows).unwrap();
2003 assert_eq!(back.len(), 1);
2004 assert_eq!(back[0].data_type(), &d);
2005
2006 let mut a = PrimitiveDictionaryBuilder::<Int32Type, TimestampNanosecondType>::new();
2008 a.append(34).unwrap();
2009 a.append_null();
2010 a.append(345).unwrap();
2011
2012 let dict = a.finish();
2014 let values = TimestampNanosecondArray::from(dict.values().to_data());
2015 let dict_with_tz = dict.with_values(Arc::new(values.with_timezone("+02:00")));
2016 let v = DataType::Timestamp(TimeUnit::Nanosecond, Some("+02:00".into()));
2017 let d = DataType::Dictionary(Box::new(DataType::Int32), Box::new(v.clone()));
2018
2019 assert_eq!(dict_with_tz.data_type(), &d);
2020 let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
2021 let rows = converter
2022 .convert_columns(&[Arc::new(dict_with_tz) as _])
2023 .unwrap();
2024 let back = converter.convert_rows(&rows).unwrap();
2025 assert_eq!(back.len(), 1);
2026 assert_eq!(back[0].data_type(), &v);
2027 }
2028
2029 #[test]
2030 fn test_null_encoding() {
2031 let col = Arc::new(NullArray::new(10));
2032 let converter = RowConverter::new(vec![SortField::new(DataType::Null)]).unwrap();
2033 let rows = converter.convert_columns(&[col]).unwrap();
2034 assert_eq!(rows.num_rows(), 10);
2035 assert_eq!(rows.row(1).data.len(), 0);
2036 }
2037
2038 #[test]
2039 fn test_variable_width() {
2040 let col = Arc::new(StringArray::from_iter([
2041 Some("hello"),
2042 Some("he"),
2043 None,
2044 Some("foo"),
2045 Some(""),
2046 ])) as ArrayRef;
2047
2048 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2049 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2050
2051 assert!(rows.row(1) < rows.row(0));
2052 assert!(rows.row(2) < rows.row(4));
2053 assert!(rows.row(3) < rows.row(0));
2054 assert!(rows.row(3) < rows.row(1));
2055
2056 let cols = converter.convert_rows(&rows).unwrap();
2057 assert_eq!(&cols[0], &col);
2058
2059 let col = Arc::new(BinaryArray::from_iter([
2060 None,
2061 Some(vec![0_u8; 0]),
2062 Some(vec![0_u8; 6]),
2063 Some(vec![0_u8; variable::MINI_BLOCK_SIZE]),
2064 Some(vec![0_u8; variable::MINI_BLOCK_SIZE + 1]),
2065 Some(vec![0_u8; variable::BLOCK_SIZE]),
2066 Some(vec![0_u8; variable::BLOCK_SIZE + 1]),
2067 Some(vec![1_u8; 6]),
2068 Some(vec![1_u8; variable::MINI_BLOCK_SIZE]),
2069 Some(vec![1_u8; variable::MINI_BLOCK_SIZE + 1]),
2070 Some(vec![1_u8; variable::BLOCK_SIZE]),
2071 Some(vec![1_u8; variable::BLOCK_SIZE + 1]),
2072 Some(vec![0xFF_u8; 6]),
2073 Some(vec![0xFF_u8; variable::MINI_BLOCK_SIZE]),
2074 Some(vec![0xFF_u8; variable::MINI_BLOCK_SIZE + 1]),
2075 Some(vec![0xFF_u8; variable::BLOCK_SIZE]),
2076 Some(vec![0xFF_u8; variable::BLOCK_SIZE + 1]),
2077 ])) as ArrayRef;
2078
2079 let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
2080 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2081
2082 for i in 0..rows.num_rows() {
2083 for j in i + 1..rows.num_rows() {
2084 assert!(
2085 rows.row(i) < rows.row(j),
2086 "{} < {} - {:?} < {:?}",
2087 i,
2088 j,
2089 rows.row(i),
2090 rows.row(j)
2091 );
2092 }
2093 }
2094
2095 let cols = converter.convert_rows(&rows).unwrap();
2096 assert_eq!(&cols[0], &col);
2097
2098 let converter = RowConverter::new(vec![SortField::new_with_options(
2099 DataType::Binary,
2100 SortOptions::default().desc().with_nulls_first(false),
2101 )])
2102 .unwrap();
2103 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2104
2105 for i in 0..rows.num_rows() {
2106 for j in i + 1..rows.num_rows() {
2107 assert!(
2108 rows.row(i) > rows.row(j),
2109 "{} > {} - {:?} > {:?}",
2110 i,
2111 j,
2112 rows.row(i),
2113 rows.row(j)
2114 );
2115 }
2116 }
2117
2118 let cols = converter.convert_rows(&rows).unwrap();
2119 assert_eq!(&cols[0], &col);
2120 }
2121
2122 fn dictionary_eq(a: &dyn Array, b: &dyn Array) {
2124 match b.data_type() {
2125 DataType::Dictionary(_, v) => {
2126 assert_eq!(a.data_type(), v.as_ref());
2127 let b = arrow_cast::cast(b, v).unwrap();
2128 assert_eq!(a, b.as_ref())
2129 }
2130 _ => assert_eq!(a, b),
2131 }
2132 }
2133
2134 #[test]
2135 fn test_string_dictionary() {
2136 let a = Arc::new(DictionaryArray::<Int32Type>::from_iter([
2137 Some("foo"),
2138 Some("hello"),
2139 Some("he"),
2140 None,
2141 Some("hello"),
2142 Some(""),
2143 Some("hello"),
2144 Some("hello"),
2145 ])) as ArrayRef;
2146
2147 let field = SortField::new(a.data_type().clone());
2148 let converter = RowConverter::new(vec![field]).unwrap();
2149 let rows_a = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
2150
2151 assert!(rows_a.row(3) < rows_a.row(5));
2152 assert!(rows_a.row(2) < rows_a.row(1));
2153 assert!(rows_a.row(0) < rows_a.row(1));
2154 assert!(rows_a.row(3) < rows_a.row(0));
2155
2156 assert_eq!(rows_a.row(1), rows_a.row(4));
2157 assert_eq!(rows_a.row(1), rows_a.row(6));
2158 assert_eq!(rows_a.row(1), rows_a.row(7));
2159
2160 let cols = converter.convert_rows(&rows_a).unwrap();
2161 dictionary_eq(&cols[0], &a);
2162
2163 let b = Arc::new(DictionaryArray::<Int32Type>::from_iter([
2164 Some("hello"),
2165 None,
2166 Some("cupcakes"),
2167 ])) as ArrayRef;
2168
2169 let rows_b = converter.convert_columns(&[Arc::clone(&b)]).unwrap();
2170 assert_eq!(rows_a.row(1), rows_b.row(0));
2171 assert_eq!(rows_a.row(3), rows_b.row(1));
2172 assert!(rows_b.row(2) < rows_a.row(0));
2173
2174 let cols = converter.convert_rows(&rows_b).unwrap();
2175 dictionary_eq(&cols[0], &b);
2176
2177 let converter = RowConverter::new(vec![SortField::new_with_options(
2178 a.data_type().clone(),
2179 SortOptions::default().desc().with_nulls_first(false),
2180 )])
2181 .unwrap();
2182
2183 let rows_c = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
2184 assert!(rows_c.row(3) > rows_c.row(5));
2185 assert!(rows_c.row(2) > rows_c.row(1));
2186 assert!(rows_c.row(0) > rows_c.row(1));
2187 assert!(rows_c.row(3) > rows_c.row(0));
2188
2189 let cols = converter.convert_rows(&rows_c).unwrap();
2190 dictionary_eq(&cols[0], &a);
2191
2192 let converter = RowConverter::new(vec![SortField::new_with_options(
2193 a.data_type().clone(),
2194 SortOptions::default().desc().with_nulls_first(true),
2195 )])
2196 .unwrap();
2197
2198 let rows_c = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
2199 assert!(rows_c.row(3) < rows_c.row(5));
2200 assert!(rows_c.row(2) > rows_c.row(1));
2201 assert!(rows_c.row(0) > rows_c.row(1));
2202 assert!(rows_c.row(3) < rows_c.row(0));
2203
2204 let cols = converter.convert_rows(&rows_c).unwrap();
2205 dictionary_eq(&cols[0], &a);
2206 }
2207
2208 #[test]
2209 fn test_struct() {
2210 let a = Arc::new(Int32Array::from(vec![1, 1, 2, 2])) as ArrayRef;
2212 let a_f = Arc::new(Field::new("int", DataType::Int32, false));
2213 let u = Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as ArrayRef;
2214 let u_f = Arc::new(Field::new("s", DataType::Utf8, false));
2215 let s1 = Arc::new(StructArray::from(vec![(a_f, a), (u_f, u)])) as ArrayRef;
2216
2217 let sort_fields = vec![SortField::new(s1.data_type().clone())];
2218 let converter = RowConverter::new(sort_fields).unwrap();
2219 let r1 = converter.convert_columns(&[Arc::clone(&s1)]).unwrap();
2220
2221 for (a, b) in r1.iter().zip(r1.iter().skip(1)) {
2222 assert!(a < b);
2223 }
2224
2225 let back = converter.convert_rows(&r1).unwrap();
2226 assert_eq!(back.len(), 1);
2227 assert_eq!(&back[0], &s1);
2228
2229 let data = s1
2231 .to_data()
2232 .into_builder()
2233 .null_bit_buffer(Some(Buffer::from_slice_ref([0b00001010])))
2234 .null_count(2)
2235 .build()
2236 .unwrap();
2237
2238 let s2 = Arc::new(StructArray::from(data)) as ArrayRef;
2239 let r2 = converter.convert_columns(&[Arc::clone(&s2)]).unwrap();
2240 assert_eq!(r2.row(0), r2.row(2)); assert!(r2.row(0) < r2.row(1)); assert_ne!(r1.row(0), r2.row(0)); assert_eq!(r1.row(1), r2.row(1)); let back = converter.convert_rows(&r2).unwrap();
2246 assert_eq!(back.len(), 1);
2247 assert_eq!(&back[0], &s2);
2248
2249 back[0].to_data().validate_full().unwrap();
2250 }
2251
2252 #[test]
2253 fn test_dictionary_in_struct() {
2254 let builder = StringDictionaryBuilder::<Int32Type>::new();
2255 let mut struct_builder = StructBuilder::new(
2256 vec![Field::new_dictionary(
2257 "foo",
2258 DataType::Int32,
2259 DataType::Utf8,
2260 true,
2261 )],
2262 vec![Box::new(builder)],
2263 );
2264
2265 let dict_builder = struct_builder
2266 .field_builder::<StringDictionaryBuilder<Int32Type>>(0)
2267 .unwrap();
2268
2269 dict_builder.append_value("a");
2271 dict_builder.append_null();
2272 dict_builder.append_value("a");
2273 dict_builder.append_value("b");
2274
2275 for _ in 0..4 {
2276 struct_builder.append(true);
2277 }
2278
2279 let s = Arc::new(struct_builder.finish()) as ArrayRef;
2280 let sort_fields = vec![SortField::new(s.data_type().clone())];
2281 let converter = RowConverter::new(sort_fields).unwrap();
2282 let r = converter.convert_columns(&[Arc::clone(&s)]).unwrap();
2283
2284 let back = converter.convert_rows(&r).unwrap();
2285 let [s2] = back.try_into().unwrap();
2286
2287 assert_ne!(&s.data_type(), &s2.data_type());
2290 s2.to_data().validate_full().unwrap();
2291
2292 let s1_struct = s.as_struct();
2296 let s1_0 = s1_struct.column(0);
2297 let s1_idx_0 = s1_0.as_dictionary::<Int32Type>();
2298 let keys = s1_idx_0.keys();
2299 let values = s1_idx_0.values().as_string::<i32>();
2300 let s2_struct = s2.as_struct();
2302 let s2_0 = s2_struct.column(0);
2303 let s2_idx_0 = s2_0.as_string::<i32>();
2304
2305 for i in 0..keys.len() {
2306 if keys.is_null(i) {
2307 assert!(s2_idx_0.is_null(i));
2308 } else {
2309 let dict_index = keys.value(i) as usize;
2310 assert_eq!(values.value(dict_index), s2_idx_0.value(i));
2311 }
2312 }
2313 }
2314
2315 #[test]
2316 fn test_dictionary_in_struct_empty() {
2317 let ty = DataType::Struct(
2318 vec![Field::new_dictionary(
2319 "foo",
2320 DataType::Int32,
2321 DataType::Int32,
2322 false,
2323 )]
2324 .into(),
2325 );
2326 let s = arrow_array::new_empty_array(&ty);
2327
2328 let sort_fields = vec![SortField::new(s.data_type().clone())];
2329 let converter = RowConverter::new(sort_fields).unwrap();
2330 let r = converter.convert_columns(&[Arc::clone(&s)]).unwrap();
2331
2332 let back = converter.convert_rows(&r).unwrap();
2333 let [s2] = back.try_into().unwrap();
2334
2335 assert_ne!(&s.data_type(), &s2.data_type());
2338 s2.to_data().validate_full().unwrap();
2339 assert_eq!(s.len(), 0);
2340 assert_eq!(s2.len(), 0);
2341 }
2342
2343 #[test]
2344 fn test_list_of_string_dictionary() {
2345 let mut builder = ListBuilder::<StringDictionaryBuilder<Int32Type>>::default();
2346 builder.values().append("a").unwrap();
2348 builder.values().append("b").unwrap();
2349 builder.values().append("zero").unwrap();
2350 builder.values().append_null();
2351 builder.values().append("c").unwrap();
2352 builder.values().append("b").unwrap();
2353 builder.values().append("d").unwrap();
2354 builder.append(true);
2355 builder.append(false);
2357 builder.values().append("e").unwrap();
2359 builder.values().append("zero").unwrap();
2360 builder.values().append("a").unwrap();
2361 builder.append(true);
2362
2363 let a = Arc::new(builder.finish()) as ArrayRef;
2364 let data_type = a.data_type().clone();
2365
2366 let field = SortField::new(data_type.clone());
2367 let converter = RowConverter::new(vec![field]).unwrap();
2368 let rows = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
2369
2370 let back = converter.convert_rows(&rows).unwrap();
2371 assert_eq!(back.len(), 1);
2372 let [a2] = back.try_into().unwrap();
2373
2374 assert_ne!(&a.data_type(), &a2.data_type());
2377
2378 a2.to_data().validate_full().unwrap();
2379
2380 let a2_list = a2.as_list::<i32>();
2381 let a1_list = a.as_list::<i32>();
2382
2383 let a1_0 = a1_list.value(0);
2386 let a1_idx_0 = a1_0.as_dictionary::<Int32Type>();
2387 let keys = a1_idx_0.keys();
2388 let values = a1_idx_0.values().as_string::<i32>();
2389 let a2_0 = a2_list.value(0);
2390 let a2_idx_0 = a2_0.as_string::<i32>();
2391
2392 for i in 0..keys.len() {
2393 if keys.is_null(i) {
2394 assert!(a2_idx_0.is_null(i));
2395 } else {
2396 let dict_index = keys.value(i) as usize;
2397 assert_eq!(values.value(dict_index), a2_idx_0.value(i));
2398 }
2399 }
2400
2401 assert!(a1_list.is_null(1));
2403 assert!(a2_list.is_null(1));
2404
2405 let a1_2 = a1_list.value(2);
2407 let a1_idx_2 = a1_2.as_dictionary::<Int32Type>();
2408 let keys = a1_idx_2.keys();
2409 let values = a1_idx_2.values().as_string::<i32>();
2410 let a2_2 = a2_list.value(2);
2411 let a2_idx_2 = a2_2.as_string::<i32>();
2412
2413 for i in 0..keys.len() {
2414 if keys.is_null(i) {
2415 assert!(a2_idx_2.is_null(i));
2416 } else {
2417 let dict_index = keys.value(i) as usize;
2418 assert_eq!(values.value(dict_index), a2_idx_2.value(i));
2419 }
2420 }
2421 }
2422
2423 #[test]
2424 fn test_primitive_dictionary() {
2425 let mut builder = PrimitiveDictionaryBuilder::<Int32Type, Int32Type>::new();
2426 builder.append(2).unwrap();
2427 builder.append(3).unwrap();
2428 builder.append(0).unwrap();
2429 builder.append_null();
2430 builder.append(5).unwrap();
2431 builder.append(3).unwrap();
2432 builder.append(-1).unwrap();
2433
2434 let a = builder.finish();
2435 let data_type = a.data_type().clone();
2436 let columns = [Arc::new(a) as ArrayRef];
2437
2438 let field = SortField::new(data_type.clone());
2439 let converter = RowConverter::new(vec![field]).unwrap();
2440 let rows = converter.convert_columns(&columns).unwrap();
2441 assert!(rows.row(0) < rows.row(1));
2442 assert!(rows.row(2) < rows.row(0));
2443 assert!(rows.row(3) < rows.row(2));
2444 assert!(rows.row(6) < rows.row(2));
2445 assert!(rows.row(3) < rows.row(6));
2446
2447 let back = converter.convert_rows(&rows).unwrap();
2448 assert_eq!(back.len(), 1);
2449 back[0].to_data().validate_full().unwrap();
2450 }
2451
2452 #[test]
2453 fn test_dictionary_nulls() {
2454 let values = Int32Array::from_iter([Some(1), Some(-1), None, Some(4), None]).into_data();
2455 let keys =
2456 Int32Array::from_iter([Some(0), Some(0), Some(1), Some(2), Some(4), None]).into_data();
2457
2458 let data_type = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Int32));
2459 let data = keys
2460 .into_builder()
2461 .data_type(data_type.clone())
2462 .child_data(vec![values])
2463 .build()
2464 .unwrap();
2465
2466 let columns = [Arc::new(DictionaryArray::<Int32Type>::from(data)) as ArrayRef];
2467 let field = SortField::new(data_type.clone());
2468 let converter = RowConverter::new(vec![field]).unwrap();
2469 let rows = converter.convert_columns(&columns).unwrap();
2470
2471 assert_eq!(rows.row(0), rows.row(1));
2472 assert_eq!(rows.row(3), rows.row(4));
2473 assert_eq!(rows.row(4), rows.row(5));
2474 assert!(rows.row(3) < rows.row(0));
2475 }
2476
2477 #[test]
2478 #[should_panic(expected = "Encountered non UTF-8 data")]
2479 fn test_invalid_utf8() {
2480 let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
2481 let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _;
2482 let rows = converter.convert_columns(&[array]).unwrap();
2483 let binary_row = rows.row(0);
2484
2485 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2486 let parser = converter.parser();
2487 let utf8_row = parser.parse(binary_row.as_ref());
2488
2489 converter.convert_rows(std::iter::once(utf8_row)).unwrap();
2490 }
2491
2492 #[test]
2493 #[should_panic(expected = "Encountered non UTF-8 data")]
2494 fn test_invalid_utf8_array() {
2495 let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
2496 let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _;
2497 let rows = converter.convert_columns(&[array]).unwrap();
2498 let binary_rows = rows.try_into_binary().expect("known-small rows");
2499
2500 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2501 let parsed = converter.from_binary(binary_rows);
2502
2503 converter.convert_rows(parsed.iter()).unwrap();
2504 }
2505
2506 #[test]
2507 #[should_panic(expected = "index out of bounds")]
2508 fn test_invalid_empty() {
2509 let binary_row: &[u8] = &[];
2510
2511 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2512 let parser = converter.parser();
2513 let utf8_row = parser.parse(binary_row.as_ref());
2514
2515 converter.convert_rows(std::iter::once(utf8_row)).unwrap();
2516 }
2517
2518 #[test]
2519 #[should_panic(expected = "index out of bounds")]
2520 fn test_invalid_empty_array() {
2521 let row: &[u8] = &[];
2522 let binary_rows = BinaryArray::from(vec![row]);
2523
2524 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2525 let parsed = converter.from_binary(binary_rows);
2526
2527 converter.convert_rows(parsed.iter()).unwrap();
2528 }
2529
2530 #[test]
2531 #[should_panic(expected = "index out of bounds")]
2532 fn test_invalid_truncated() {
2533 let binary_row: &[u8] = &[0x02];
2534
2535 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2536 let parser = converter.parser();
2537 let utf8_row = parser.parse(binary_row.as_ref());
2538
2539 converter.convert_rows(std::iter::once(utf8_row)).unwrap();
2540 }
2541
2542 #[test]
2543 #[should_panic(expected = "index out of bounds")]
2544 fn test_invalid_truncated_array() {
2545 let row: &[u8] = &[0x02];
2546 let binary_rows = BinaryArray::from(vec![row]);
2547
2548 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2549 let parsed = converter.from_binary(binary_rows);
2550
2551 converter.convert_rows(parsed.iter()).unwrap();
2552 }
2553
2554 #[test]
2555 #[should_panic(expected = "rows were not produced by this RowConverter")]
2556 fn test_different_converter() {
2557 let values = Arc::new(Int32Array::from_iter([Some(1), Some(-1)]));
2558 let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
2559 let rows = converter.convert_columns(&[values]).unwrap();
2560
2561 let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
2562 let _ = converter.convert_rows(&rows);
2563 }
2564
2565 fn test_single_list<O: OffsetSizeTrait>() {
2566 let mut builder = GenericListBuilder::<O, _>::new(Int32Builder::new());
2567 builder.values().append_value(32);
2568 builder.values().append_value(52);
2569 builder.values().append_value(32);
2570 builder.append(true);
2571 builder.values().append_value(32);
2572 builder.values().append_value(52);
2573 builder.values().append_value(12);
2574 builder.append(true);
2575 builder.values().append_value(32);
2576 builder.values().append_value(52);
2577 builder.append(true);
2578 builder.values().append_value(32); builder.values().append_value(52); builder.append(false);
2581 builder.values().append_value(32);
2582 builder.values().append_null();
2583 builder.append(true);
2584 builder.append(true);
2585 builder.values().append_value(17); builder.values().append_null(); builder.append(false);
2588
2589 let list = Arc::new(builder.finish()) as ArrayRef;
2590 let d = list.data_type().clone();
2591
2592 let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
2593
2594 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2595 assert!(rows.row(0) > rows.row(1)); assert!(rows.row(2) < rows.row(1)); assert!(rows.row(3) < rows.row(2)); assert!(rows.row(4) < rows.row(2)); assert!(rows.row(5) < rows.row(2)); assert!(rows.row(3) < rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
2604 assert_eq!(back.len(), 1);
2605 back[0].to_data().validate_full().unwrap();
2606 assert_eq!(&back[0], &list);
2607
2608 let options = SortOptions::default().asc().with_nulls_first(false);
2609 let field = SortField::new_with_options(d.clone(), options);
2610 let converter = RowConverter::new(vec![field]).unwrap();
2611 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2612
2613 assert!(rows.row(0) > rows.row(1)); assert!(rows.row(2) < rows.row(1)); assert!(rows.row(3) > rows.row(2)); assert!(rows.row(4) > rows.row(2)); assert!(rows.row(5) < rows.row(2)); assert!(rows.row(3) > rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
2622 assert_eq!(back.len(), 1);
2623 back[0].to_data().validate_full().unwrap();
2624 assert_eq!(&back[0], &list);
2625
2626 let options = SortOptions::default().desc().with_nulls_first(false);
2627 let field = SortField::new_with_options(d.clone(), options);
2628 let converter = RowConverter::new(vec![field]).unwrap();
2629 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2630
2631 assert!(rows.row(0) < rows.row(1)); assert!(rows.row(2) > rows.row(1)); assert!(rows.row(3) > rows.row(2)); assert!(rows.row(4) > rows.row(2)); assert!(rows.row(5) > rows.row(2)); assert!(rows.row(3) > rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
2640 assert_eq!(back.len(), 1);
2641 back[0].to_data().validate_full().unwrap();
2642 assert_eq!(&back[0], &list);
2643
2644 let options = SortOptions::default().desc().with_nulls_first(true);
2645 let field = SortField::new_with_options(d, options);
2646 let converter = RowConverter::new(vec![field]).unwrap();
2647 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2648
2649 assert!(rows.row(0) < rows.row(1)); assert!(rows.row(2) > rows.row(1)); assert!(rows.row(3) < rows.row(2)); assert!(rows.row(4) < rows.row(2)); assert!(rows.row(5) > rows.row(2)); assert!(rows.row(3) < rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
2658 assert_eq!(back.len(), 1);
2659 back[0].to_data().validate_full().unwrap();
2660 assert_eq!(&back[0], &list);
2661
2662 let sliced_list = list.slice(1, 5);
2663 let rows_on_sliced_list = converter
2664 .convert_columns(&[Arc::clone(&sliced_list)])
2665 .unwrap();
2666
2667 assert!(rows_on_sliced_list.row(1) > rows_on_sliced_list.row(0)); assert!(rows_on_sliced_list.row(2) < rows_on_sliced_list.row(1)); assert!(rows_on_sliced_list.row(3) < rows_on_sliced_list.row(1)); assert!(rows_on_sliced_list.row(4) > rows_on_sliced_list.row(1)); assert!(rows_on_sliced_list.row(2) < rows_on_sliced_list.row(4)); let back = converter.convert_rows(&rows_on_sliced_list).unwrap();
2674 assert_eq!(back.len(), 1);
2675 back[0].to_data().validate_full().unwrap();
2676 assert_eq!(&back[0], &sliced_list);
2677 }
2678
2679 fn test_nested_list<O: OffsetSizeTrait>() {
2680 let mut builder =
2681 GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(Int32Builder::new()));
2682
2683 builder.values().values().append_value(1);
2684 builder.values().values().append_value(2);
2685 builder.values().append(true);
2686 builder.values().values().append_value(1);
2687 builder.values().values().append_null();
2688 builder.values().append(true);
2689 builder.append(true);
2690
2691 builder.values().values().append_value(1);
2692 builder.values().values().append_null();
2693 builder.values().append(true);
2694 builder.values().values().append_value(1);
2695 builder.values().values().append_null();
2696 builder.values().append(true);
2697 builder.append(true);
2698
2699 builder.values().values().append_value(1);
2700 builder.values().values().append_null();
2701 builder.values().append(true);
2702 builder.values().append(false);
2703 builder.append(true);
2704 builder.append(false);
2705
2706 builder.values().values().append_value(1);
2707 builder.values().values().append_value(2);
2708 builder.values().append(true);
2709 builder.append(true);
2710
2711 let list = Arc::new(builder.finish()) as ArrayRef;
2712 let d = list.data_type().clone();
2713
2714 let options = SortOptions::default().asc().with_nulls_first(true);
2722 let field = SortField::new_with_options(d.clone(), options);
2723 let converter = RowConverter::new(vec![field]).unwrap();
2724 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2725
2726 assert!(rows.row(0) > rows.row(1));
2727 assert!(rows.row(1) > rows.row(2));
2728 assert!(rows.row(2) > rows.row(3));
2729 assert!(rows.row(4) < rows.row(0));
2730 assert!(rows.row(4) > rows.row(1));
2731
2732 let back = converter.convert_rows(&rows).unwrap();
2733 assert_eq!(back.len(), 1);
2734 back[0].to_data().validate_full().unwrap();
2735 assert_eq!(&back[0], &list);
2736
2737 let options = SortOptions::default().desc().with_nulls_first(true);
2738 let field = SortField::new_with_options(d.clone(), options);
2739 let converter = RowConverter::new(vec![field]).unwrap();
2740 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2741
2742 assert!(rows.row(0) > rows.row(1));
2743 assert!(rows.row(1) > rows.row(2));
2744 assert!(rows.row(2) > rows.row(3));
2745 assert!(rows.row(4) > rows.row(0));
2746 assert!(rows.row(4) > rows.row(1));
2747
2748 let back = converter.convert_rows(&rows).unwrap();
2749 assert_eq!(back.len(), 1);
2750 back[0].to_data().validate_full().unwrap();
2751 assert_eq!(&back[0], &list);
2752
2753 let options = SortOptions::default().desc().with_nulls_first(false);
2754 let field = SortField::new_with_options(d, options);
2755 let converter = RowConverter::new(vec![field]).unwrap();
2756 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2757
2758 assert!(rows.row(0) < rows.row(1));
2759 assert!(rows.row(1) < rows.row(2));
2760 assert!(rows.row(2) < rows.row(3));
2761 assert!(rows.row(4) > rows.row(0));
2762 assert!(rows.row(4) < rows.row(1));
2763
2764 let back = converter.convert_rows(&rows).unwrap();
2765 assert_eq!(back.len(), 1);
2766 back[0].to_data().validate_full().unwrap();
2767 assert_eq!(&back[0], &list);
2768
2769 let sliced_list = list.slice(1, 3);
2770 let rows = converter
2771 .convert_columns(&[Arc::clone(&sliced_list)])
2772 .unwrap();
2773
2774 assert!(rows.row(0) < rows.row(1));
2775 assert!(rows.row(1) < rows.row(2));
2776
2777 let back = converter.convert_rows(&rows).unwrap();
2778 assert_eq!(back.len(), 1);
2779 back[0].to_data().validate_full().unwrap();
2780 assert_eq!(&back[0], &sliced_list);
2781 }
2782
2783 #[test]
2784 fn test_list() {
2785 test_single_list::<i32>();
2786 test_nested_list::<i32>();
2787 }
2788
2789 #[test]
2790 fn test_large_list() {
2791 test_single_list::<i64>();
2792 test_nested_list::<i64>();
2793 }
2794
2795 #[test]
2796 fn test_fixed_size_list() {
2797 let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 3);
2798 builder.values().append_value(32);
2799 builder.values().append_value(52);
2800 builder.values().append_value(32);
2801 builder.append(true);
2802 builder.values().append_value(32);
2803 builder.values().append_value(52);
2804 builder.values().append_value(12);
2805 builder.append(true);
2806 builder.values().append_value(32);
2807 builder.values().append_value(52);
2808 builder.values().append_null();
2809 builder.append(true);
2810 builder.values().append_value(32); builder.values().append_value(52); builder.values().append_value(13); builder.append(false);
2814 builder.values().append_value(32);
2815 builder.values().append_null();
2816 builder.values().append_null();
2817 builder.append(true);
2818 builder.values().append_null();
2819 builder.values().append_null();
2820 builder.values().append_null();
2821 builder.append(true);
2822 builder.values().append_value(17); builder.values().append_null(); builder.values().append_value(77); builder.append(false);
2826
2827 let list = Arc::new(builder.finish()) as ArrayRef;
2828 let d = list.data_type().clone();
2829
2830 let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
2832
2833 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2834 assert!(rows.row(0) > rows.row(1)); assert!(rows.row(2) < rows.row(1)); assert!(rows.row(3) < rows.row(2)); assert!(rows.row(4) < rows.row(2)); assert!(rows.row(5) < rows.row(2)); assert!(rows.row(3) < rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
2843 assert_eq!(back.len(), 1);
2844 back[0].to_data().validate_full().unwrap();
2845 assert_eq!(&back[0], &list);
2846
2847 let options = SortOptions::default().asc().with_nulls_first(false);
2849 let field = SortField::new_with_options(d.clone(), options);
2850 let converter = RowConverter::new(vec![field]).unwrap();
2851 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2852 assert!(rows.row(0) > rows.row(1)); assert!(rows.row(2) > rows.row(1)); assert!(rows.row(3) > rows.row(2)); assert!(rows.row(4) > rows.row(2)); assert!(rows.row(5) > rows.row(2)); assert!(rows.row(3) > rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
2861 assert_eq!(back.len(), 1);
2862 back[0].to_data().validate_full().unwrap();
2863 assert_eq!(&back[0], &list);
2864
2865 let options = SortOptions::default().desc().with_nulls_first(false);
2867 let field = SortField::new_with_options(d.clone(), options);
2868 let converter = RowConverter::new(vec![field]).unwrap();
2869 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2870 assert!(rows.row(0) < rows.row(1)); assert!(rows.row(2) > rows.row(1)); assert!(rows.row(3) > rows.row(2)); assert!(rows.row(4) > rows.row(2)); assert!(rows.row(5) > rows.row(2)); assert!(rows.row(3) > rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
2879 assert_eq!(back.len(), 1);
2880 back[0].to_data().validate_full().unwrap();
2881 assert_eq!(&back[0], &list);
2882
2883 let options = SortOptions::default().desc().with_nulls_first(true);
2885 let field = SortField::new_with_options(d, options);
2886 let converter = RowConverter::new(vec![field]).unwrap();
2887 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2888
2889 assert!(rows.row(0) < rows.row(1)); assert!(rows.row(2) < rows.row(1)); assert!(rows.row(3) < rows.row(2)); assert!(rows.row(4) < rows.row(2)); assert!(rows.row(5) < rows.row(2)); assert!(rows.row(3) < rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
2898 assert_eq!(back.len(), 1);
2899 back[0].to_data().validate_full().unwrap();
2900 assert_eq!(&back[0], &list);
2901
2902 let sliced_list = list.slice(1, 5);
2903 let rows_on_sliced_list = converter
2904 .convert_columns(&[Arc::clone(&sliced_list)])
2905 .unwrap();
2906
2907 assert!(rows_on_sliced_list.row(2) < rows_on_sliced_list.row(1)); assert!(rows_on_sliced_list.row(3) < rows_on_sliced_list.row(1)); assert!(rows_on_sliced_list.row(4) < rows_on_sliced_list.row(1)); assert!(rows_on_sliced_list.row(2) < rows_on_sliced_list.row(4)); let back = converter.convert_rows(&rows_on_sliced_list).unwrap();
2913 assert_eq!(back.len(), 1);
2914 back[0].to_data().validate_full().unwrap();
2915 assert_eq!(&back[0], &sliced_list);
2916 }
2917
2918 #[test]
2919 fn test_two_fixed_size_lists() {
2920 let mut first = FixedSizeListBuilder::new(UInt8Builder::new(), 1);
2921 first.values().append_value(100);
2923 first.append(true);
2924 first.values().append_value(101);
2926 first.append(true);
2927 first.values().append_value(102);
2929 first.append(true);
2930 first.values().append_null();
2932 first.append(true);
2933 first.values().append_null(); first.append(false);
2936 let first = Arc::new(first.finish()) as ArrayRef;
2937 let first_type = first.data_type().clone();
2938
2939 let mut second = FixedSizeListBuilder::new(UInt8Builder::new(), 1);
2940 second.values().append_value(200);
2942 second.append(true);
2943 second.values().append_value(201);
2945 second.append(true);
2946 second.values().append_value(202);
2948 second.append(true);
2949 second.values().append_null();
2951 second.append(true);
2952 second.values().append_null(); second.append(false);
2955 let second = Arc::new(second.finish()) as ArrayRef;
2956 let second_type = second.data_type().clone();
2957
2958 let converter = RowConverter::new(vec![
2959 SortField::new(first_type.clone()),
2960 SortField::new(second_type.clone()),
2961 ])
2962 .unwrap();
2963
2964 let rows = converter
2965 .convert_columns(&[Arc::clone(&first), Arc::clone(&second)])
2966 .unwrap();
2967
2968 let back = converter.convert_rows(&rows).unwrap();
2969 assert_eq!(back.len(), 2);
2970 back[0].to_data().validate_full().unwrap();
2971 assert_eq!(&back[0], &first);
2972 back[1].to_data().validate_full().unwrap();
2973 assert_eq!(&back[1], &second);
2974 }
2975
2976 #[test]
2977 fn test_fixed_size_list_with_variable_width_content() {
2978 let mut first = FixedSizeListBuilder::new(
2979 StructBuilder::from_fields(
2980 vec![
2981 Field::new(
2982 "timestamp",
2983 DataType::Timestamp(TimeUnit::Microsecond, Some(Arc::from("UTC"))),
2984 false,
2985 ),
2986 Field::new("offset_minutes", DataType::Int16, false),
2987 Field::new("time_zone", DataType::Utf8, false),
2988 ],
2989 1,
2990 ),
2991 1,
2992 );
2993 first
2995 .values()
2996 .field_builder::<TimestampMicrosecondBuilder>(0)
2997 .unwrap()
2998 .append_null();
2999 first
3000 .values()
3001 .field_builder::<Int16Builder>(1)
3002 .unwrap()
3003 .append_null();
3004 first
3005 .values()
3006 .field_builder::<StringBuilder>(2)
3007 .unwrap()
3008 .append_null();
3009 first.values().append(false);
3010 first.append(false);
3011 first
3013 .values()
3014 .field_builder::<TimestampMicrosecondBuilder>(0)
3015 .unwrap()
3016 .append_null();
3017 first
3018 .values()
3019 .field_builder::<Int16Builder>(1)
3020 .unwrap()
3021 .append_null();
3022 first
3023 .values()
3024 .field_builder::<StringBuilder>(2)
3025 .unwrap()
3026 .append_null();
3027 first.values().append(false);
3028 first.append(true);
3029 first
3031 .values()
3032 .field_builder::<TimestampMicrosecondBuilder>(0)
3033 .unwrap()
3034 .append_value(0);
3035 first
3036 .values()
3037 .field_builder::<Int16Builder>(1)
3038 .unwrap()
3039 .append_value(0);
3040 first
3041 .values()
3042 .field_builder::<StringBuilder>(2)
3043 .unwrap()
3044 .append_value("UTC");
3045 first.values().append(true);
3046 first.append(true);
3047 first
3049 .values()
3050 .field_builder::<TimestampMicrosecondBuilder>(0)
3051 .unwrap()
3052 .append_value(1126351800123456);
3053 first
3054 .values()
3055 .field_builder::<Int16Builder>(1)
3056 .unwrap()
3057 .append_value(120);
3058 first
3059 .values()
3060 .field_builder::<StringBuilder>(2)
3061 .unwrap()
3062 .append_value("Europe/Warsaw");
3063 first.values().append(true);
3064 first.append(true);
3065 let first = Arc::new(first.finish()) as ArrayRef;
3066 let first_type = first.data_type().clone();
3067
3068 let mut second = StringBuilder::new();
3069 second.append_value("somewhere near");
3070 second.append_null();
3071 second.append_value("Greenwich");
3072 second.append_value("Warsaw");
3073 let second = Arc::new(second.finish()) as ArrayRef;
3074 let second_type = second.data_type().clone();
3075
3076 let converter = RowConverter::new(vec![
3077 SortField::new(first_type.clone()),
3078 SortField::new(second_type.clone()),
3079 ])
3080 .unwrap();
3081
3082 let rows = converter
3083 .convert_columns(&[Arc::clone(&first), Arc::clone(&second)])
3084 .unwrap();
3085
3086 let back = converter.convert_rows(&rows).unwrap();
3087 assert_eq!(back.len(), 2);
3088 back[0].to_data().validate_full().unwrap();
3089 assert_eq!(&back[0], &first);
3090 back[1].to_data().validate_full().unwrap();
3091 assert_eq!(&back[1], &second);
3092 }
3093
3094 fn generate_primitive_array<K>(len: usize, valid_percent: f64) -> PrimitiveArray<K>
3095 where
3096 K: ArrowPrimitiveType,
3097 StandardUniform: Distribution<K::Native>,
3098 {
3099 let mut rng = rng();
3100 (0..len)
3101 .map(|_| rng.random_bool(valid_percent).then(|| rng.random()))
3102 .collect()
3103 }
3104
3105 fn generate_strings<O: OffsetSizeTrait>(
3106 len: usize,
3107 valid_percent: f64,
3108 ) -> GenericStringArray<O> {
3109 let mut rng = rng();
3110 (0..len)
3111 .map(|_| {
3112 rng.random_bool(valid_percent).then(|| {
3113 let len = rng.random_range(0..100);
3114 let bytes = (0..len).map(|_| rng.random_range(0..128)).collect();
3115 String::from_utf8(bytes).unwrap()
3116 })
3117 })
3118 .collect()
3119 }
3120
3121 fn generate_string_view(len: usize, valid_percent: f64) -> StringViewArray {
3122 let mut rng = rng();
3123 (0..len)
3124 .map(|_| {
3125 rng.random_bool(valid_percent).then(|| {
3126 let len = rng.random_range(0..100);
3127 let bytes = (0..len).map(|_| rng.random_range(0..128)).collect();
3128 String::from_utf8(bytes).unwrap()
3129 })
3130 })
3131 .collect()
3132 }
3133
3134 fn generate_byte_view(len: usize, valid_percent: f64) -> BinaryViewArray {
3135 let mut rng = rng();
3136 (0..len)
3137 .map(|_| {
3138 rng.random_bool(valid_percent).then(|| {
3139 let len = rng.random_range(0..100);
3140 let bytes: Vec<_> = (0..len).map(|_| rng.random_range(0..128)).collect();
3141 bytes
3142 })
3143 })
3144 .collect()
3145 }
3146
3147 fn generate_fixed_stringview_column(len: usize) -> StringViewArray {
3148 let edge_cases = vec![
3149 Some("bar".to_string()),
3150 Some("bar\0".to_string()),
3151 Some("LongerThan12Bytes".to_string()),
3152 Some("LongerThan12Bytez".to_string()),
3153 Some("LongerThan12Bytes\0".to_string()),
3154 Some("LongerThan12Byt".to_string()),
3155 Some("backend one".to_string()),
3156 Some("backend two".to_string()),
3157 Some("a".repeat(257)),
3158 Some("a".repeat(300)),
3159 ];
3160
3161 let mut values = Vec::with_capacity(len);
3163 for i in 0..len {
3164 values.push(
3165 edge_cases
3166 .get(i % edge_cases.len())
3167 .cloned()
3168 .unwrap_or(None),
3169 );
3170 }
3171
3172 StringViewArray::from(values)
3173 }
3174
3175 fn generate_dictionary<K>(
3176 values: ArrayRef,
3177 len: usize,
3178 valid_percent: f64,
3179 ) -> DictionaryArray<K>
3180 where
3181 K: ArrowDictionaryKeyType,
3182 K::Native: SampleUniform,
3183 {
3184 let mut rng = rng();
3185 let min_key = K::Native::from_usize(0).unwrap();
3186 let max_key = K::Native::from_usize(values.len()).unwrap();
3187 let keys: PrimitiveArray<K> = (0..len)
3188 .map(|_| {
3189 rng.random_bool(valid_percent)
3190 .then(|| rng.random_range(min_key..max_key))
3191 })
3192 .collect();
3193
3194 let data_type =
3195 DataType::Dictionary(Box::new(K::DATA_TYPE), Box::new(values.data_type().clone()));
3196
3197 let data = keys
3198 .into_data()
3199 .into_builder()
3200 .data_type(data_type)
3201 .add_child_data(values.to_data())
3202 .build()
3203 .unwrap();
3204
3205 DictionaryArray::from(data)
3206 }
3207
3208 fn generate_fixed_size_binary(len: usize, valid_percent: f64) -> FixedSizeBinaryArray {
3209 let mut rng = rng();
3210 let width = rng.random_range(0..20);
3211 let mut builder = FixedSizeBinaryBuilder::new(width);
3212
3213 let mut b = vec![0; width as usize];
3214 for _ in 0..len {
3215 match rng.random_bool(valid_percent) {
3216 true => {
3217 b.iter_mut().for_each(|x| *x = rng.random());
3218 builder.append_value(&b).unwrap();
3219 }
3220 false => builder.append_null(),
3221 }
3222 }
3223
3224 builder.finish()
3225 }
3226
3227 fn generate_struct(len: usize, valid_percent: f64) -> StructArray {
3228 let mut rng = rng();
3229 let nulls = NullBuffer::from_iter((0..len).map(|_| rng.random_bool(valid_percent)));
3230 let a = generate_primitive_array::<Int32Type>(len, valid_percent);
3231 let b = generate_strings::<i32>(len, valid_percent);
3232 let fields = Fields::from(vec![
3233 Field::new("a", DataType::Int32, true),
3234 Field::new("b", DataType::Utf8, true),
3235 ]);
3236 let values = vec![Arc::new(a) as _, Arc::new(b) as _];
3237 StructArray::new(fields, values, Some(nulls))
3238 }
3239
3240 fn generate_list<F>(len: usize, valid_percent: f64, values: F) -> ListArray
3241 where
3242 F: FnOnce(usize) -> ArrayRef,
3243 {
3244 let mut rng = rng();
3245 let offsets = OffsetBuffer::<i32>::from_lengths((0..len).map(|_| rng.random_range(0..10)));
3246 let values_len = offsets.last().unwrap().to_usize().unwrap();
3247 let values = values(values_len);
3248 let nulls = NullBuffer::from_iter((0..len).map(|_| rng.random_bool(valid_percent)));
3249 let field = Arc::new(Field::new_list_field(values.data_type().clone(), true));
3250 ListArray::new(field, offsets, values, Some(nulls))
3251 }
3252
3253 fn generate_column(len: usize) -> ArrayRef {
3254 let mut rng = rng();
3255 match rng.random_range(0..18) {
3256 0 => Arc::new(generate_primitive_array::<Int32Type>(len, 0.8)),
3257 1 => Arc::new(generate_primitive_array::<UInt32Type>(len, 0.8)),
3258 2 => Arc::new(generate_primitive_array::<Int64Type>(len, 0.8)),
3259 3 => Arc::new(generate_primitive_array::<UInt64Type>(len, 0.8)),
3260 4 => Arc::new(generate_primitive_array::<Float32Type>(len, 0.8)),
3261 5 => Arc::new(generate_primitive_array::<Float64Type>(len, 0.8)),
3262 6 => Arc::new(generate_strings::<i32>(len, 0.8)),
3263 7 => Arc::new(generate_dictionary::<Int64Type>(
3264 Arc::new(generate_strings::<i32>(rng.random_range(1..len), 1.0)),
3266 len,
3267 0.8,
3268 )),
3269 8 => Arc::new(generate_dictionary::<Int64Type>(
3270 Arc::new(generate_primitive_array::<Int64Type>(
3272 rng.random_range(1..len),
3273 1.0,
3274 )),
3275 len,
3276 0.8,
3277 )),
3278 9 => Arc::new(generate_fixed_size_binary(len, 0.8)),
3279 10 => Arc::new(generate_struct(len, 0.8)),
3280 11 => Arc::new(generate_list(len, 0.8, |values_len| {
3281 Arc::new(generate_primitive_array::<Int64Type>(values_len, 0.8))
3282 })),
3283 12 => Arc::new(generate_list(len, 0.8, |values_len| {
3284 Arc::new(generate_strings::<i32>(values_len, 0.8))
3285 })),
3286 13 => Arc::new(generate_list(len, 0.8, |values_len| {
3287 Arc::new(generate_struct(values_len, 0.8))
3288 })),
3289 14 => Arc::new(generate_string_view(len, 0.8)),
3290 15 => Arc::new(generate_byte_view(len, 0.8)),
3291 16 => Arc::new(generate_fixed_stringview_column(len)),
3292 17 => Arc::new(
3293 generate_list(len + 1000, 0.8, |values_len| {
3294 Arc::new(generate_primitive_array::<Int64Type>(values_len, 0.8))
3295 })
3296 .slice(500, len),
3297 ),
3298 _ => unreachable!(),
3299 }
3300 }
3301
3302 fn print_row(cols: &[SortColumn], row: usize) -> String {
3303 let t: Vec<_> = cols
3304 .iter()
3305 .map(|x| match x.values.is_valid(row) {
3306 true => {
3307 let opts = FormatOptions::default().with_null("NULL");
3308 let formatter = ArrayFormatter::try_new(x.values.as_ref(), &opts).unwrap();
3309 formatter.value(row).to_string()
3310 }
3311 false => "NULL".to_string(),
3312 })
3313 .collect();
3314 t.join(",")
3315 }
3316
3317 fn print_col_types(cols: &[SortColumn]) -> String {
3318 let t: Vec<_> = cols
3319 .iter()
3320 .map(|x| x.values.data_type().to_string())
3321 .collect();
3322 t.join(",")
3323 }
3324
3325 #[test]
3326 #[cfg_attr(miri, ignore)]
3327 fn fuzz_test() {
3328 for _ in 0..100 {
3329 let mut rng = rng();
3330 let num_columns = rng.random_range(1..5);
3331 let len = rng.random_range(5..100);
3332 let arrays: Vec<_> = (0..num_columns).map(|_| generate_column(len)).collect();
3333
3334 let options: Vec<_> = (0..num_columns)
3335 .map(|_| SortOptions {
3336 descending: rng.random_bool(0.5),
3337 nulls_first: rng.random_bool(0.5),
3338 })
3339 .collect();
3340
3341 let sort_columns: Vec<_> = options
3342 .iter()
3343 .zip(&arrays)
3344 .map(|(o, c)| SortColumn {
3345 values: Arc::clone(c),
3346 options: Some(*o),
3347 })
3348 .collect();
3349
3350 let comparator = LexicographicalComparator::try_new(&sort_columns).unwrap();
3351
3352 let columns: Vec<SortField> = options
3353 .into_iter()
3354 .zip(&arrays)
3355 .map(|(o, a)| SortField::new_with_options(a.data_type().clone(), o))
3356 .collect();
3357
3358 let converter = RowConverter::new(columns).unwrap();
3359 let rows = converter.convert_columns(&arrays).unwrap();
3360
3361 for i in 0..len {
3362 for j in 0..len {
3363 let row_i = rows.row(i);
3364 let row_j = rows.row(j);
3365 let row_cmp = row_i.cmp(&row_j);
3366 let lex_cmp = comparator.compare(i, j);
3367 assert_eq!(
3368 row_cmp,
3369 lex_cmp,
3370 "({:?} vs {:?}) vs ({:?} vs {:?}) for types {}",
3371 print_row(&sort_columns, i),
3372 print_row(&sort_columns, j),
3373 row_i,
3374 row_j,
3375 print_col_types(&sort_columns)
3376 );
3377 }
3378 }
3379
3380 let back = converter.convert_rows(&rows).unwrap();
3383 for (actual, expected) in back.iter().zip(&arrays) {
3384 actual.to_data().validate_full().unwrap();
3385 dictionary_eq(actual, expected)
3386 }
3387
3388 let rows = rows.try_into_binary().expect("reasonable size");
3391 let parser = converter.parser();
3392 let back = converter
3393 .convert_rows(rows.iter().map(|b| parser.parse(b.expect("valid bytes"))))
3394 .unwrap();
3395 for (actual, expected) in back.iter().zip(&arrays) {
3396 actual.to_data().validate_full().unwrap();
3397 dictionary_eq(actual, expected)
3398 }
3399
3400 let rows = converter.from_binary(rows);
3401 let back = converter.convert_rows(&rows).unwrap();
3402 for (actual, expected) in back.iter().zip(&arrays) {
3403 actual.to_data().validate_full().unwrap();
3404 dictionary_eq(actual, expected)
3405 }
3406 }
3407 }
3408
3409 #[test]
3410 fn test_clear() {
3411 let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
3412 let mut rows = converter.empty_rows(3, 128);
3413
3414 let first = Int32Array::from(vec![None, Some(2), Some(4)]);
3415 let second = Int32Array::from(vec![Some(2), None, Some(4)]);
3416 let arrays = [Arc::new(first) as ArrayRef, Arc::new(second) as ArrayRef];
3417
3418 for array in arrays.iter() {
3419 rows.clear();
3420 converter
3421 .append(&mut rows, std::slice::from_ref(array))
3422 .unwrap();
3423 let back = converter.convert_rows(&rows).unwrap();
3424 assert_eq!(&back[0], array);
3425 }
3426
3427 let mut rows_expected = converter.empty_rows(3, 128);
3428 converter.append(&mut rows_expected, &arrays[1..]).unwrap();
3429
3430 for (i, (actual, expected)) in rows.iter().zip(rows_expected.iter()).enumerate() {
3431 assert_eq!(
3432 actual, expected,
3433 "For row {i}: expected {expected:?}, actual: {actual:?}",
3434 );
3435 }
3436 }
3437
3438 #[test]
3439 fn test_append_codec_dictionary_binary() {
3440 use DataType::*;
3441 let converter = RowConverter::new(vec![SortField::new(Dictionary(
3443 Box::new(Int32),
3444 Box::new(Binary),
3445 ))])
3446 .unwrap();
3447 let mut rows = converter.empty_rows(4, 128);
3448
3449 let keys = Int32Array::from_iter_values([0, 1, 2, 3]);
3450 let values = BinaryArray::from(vec![
3451 Some("a".as_bytes()),
3452 Some(b"b"),
3453 Some(b"c"),
3454 Some(b"d"),
3455 ]);
3456 let dict_array = DictionaryArray::new(keys, Arc::new(values));
3457
3458 rows.clear();
3459 let array = Arc::new(dict_array) as ArrayRef;
3460 converter
3461 .append(&mut rows, std::slice::from_ref(&array))
3462 .unwrap();
3463 let back = converter.convert_rows(&rows).unwrap();
3464
3465 dictionary_eq(&back[0], &array);
3466 }
3467
3468 #[test]
3469 fn test_list_prefix() {
3470 let mut a = ListBuilder::new(Int8Builder::new());
3471 a.append_value([None]);
3472 a.append_value([None, None]);
3473 let a = a.finish();
3474
3475 let converter = RowConverter::new(vec![SortField::new(a.data_type().clone())]).unwrap();
3476 let rows = converter.convert_columns(&[Arc::new(a) as _]).unwrap();
3477 assert_eq!(rows.row(0).cmp(&rows.row(1)), Ordering::Less);
3478 }
3479
3480 #[test]
3481 fn map_should_be_marked_as_unsupported() {
3482 let map_data_type = Field::new_map(
3483 "map",
3484 "entries",
3485 Field::new("key", DataType::Utf8, false),
3486 Field::new("value", DataType::Utf8, true),
3487 false,
3488 true,
3489 )
3490 .data_type()
3491 .clone();
3492
3493 let is_supported = RowConverter::supports_fields(&[SortField::new(map_data_type)]);
3494
3495 assert!(!is_supported, "Map should not be supported");
3496 }
3497
3498 #[test]
3499 fn should_fail_to_create_row_converter_for_unsupported_map_type() {
3500 let map_data_type = Field::new_map(
3501 "map",
3502 "entries",
3503 Field::new("key", DataType::Utf8, false),
3504 Field::new("value", DataType::Utf8, true),
3505 false,
3506 true,
3507 )
3508 .data_type()
3509 .clone();
3510
3511 let converter = RowConverter::new(vec![SortField::new(map_data_type)]);
3512
3513 match converter {
3514 Err(ArrowError::NotYetImplemented(message)) => {
3515 assert!(
3516 message.contains("Row format support not yet implemented for"),
3517 "Expected NotYetImplemented error for map data type, got: {message}",
3518 );
3519 }
3520 Err(e) => panic!("Expected NotYetImplemented error, got: {e}"),
3521 Ok(_) => panic!("Expected NotYetImplemented error for map data type"),
3522 }
3523 }
3524
3525 #[test]
3526 fn test_values_buffer_smaller_when_utf8_validation_disabled() {
3527 fn get_values_buffer_len(col: ArrayRef) -> (usize, usize) {
3528 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8View)]).unwrap();
3530
3531 let rows = converter.convert_columns(&[col]).unwrap();
3533 let converted = converter.convert_rows(&rows).unwrap();
3534 let unchecked_values_len = converted[0].as_string_view().data_buffers()[0].len();
3535
3536 let rows = rows.try_into_binary().expect("reasonable size");
3538 let parser = converter.parser();
3539 let converted = converter
3540 .convert_rows(rows.iter().map(|b| parser.parse(b.expect("valid bytes"))))
3541 .unwrap();
3542 let checked_values_len = converted[0].as_string_view().data_buffers()[0].len();
3543 (unchecked_values_len, checked_values_len)
3544 }
3545
3546 let col = Arc::new(StringViewArray::from_iter([
3548 Some("hello"), None, Some("short"), Some("tiny"), ])) as ArrayRef;
3553
3554 let (unchecked_values_len, checked_values_len) = get_values_buffer_len(col);
3555 assert_eq!(unchecked_values_len, 0);
3557 assert_eq!(checked_values_len, 14);
3559
3560 let col = Arc::new(StringViewArray::from_iter([
3562 Some("this is a very long string over 12 bytes"),
3563 Some("another long string to test the buffer"),
3564 ])) as ArrayRef;
3565
3566 let (unchecked_values_len, checked_values_len) = get_values_buffer_len(col);
3567 assert!(unchecked_values_len > 0);
3569 assert_eq!(unchecked_values_len, checked_values_len);
3570
3571 let col = Arc::new(StringViewArray::from_iter([
3573 Some("tiny"), Some("thisisexact13"), None,
3576 Some("short"), ])) as ArrayRef;
3578
3579 let (unchecked_values_len, checked_values_len) = get_values_buffer_len(col);
3580 assert_eq!(unchecked_values_len, 13);
3582 assert!(checked_values_len > unchecked_values_len);
3583 }
3584}