1#![doc(
157 html_logo_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_white-bg.svg",
158 html_favicon_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_transparent-bg.svg"
159)]
160#![cfg_attr(docsrs, feature(doc_cfg))]
161#![warn(missing_docs)]
162use std::cmp::Ordering;
163use std::hash::{Hash, Hasher};
164use std::sync::Arc;
165
166use arrow_array::cast::*;
167use arrow_array::types::ArrowDictionaryKeyType;
168use arrow_array::*;
169use arrow_buffer::{ArrowNativeType, Buffer, OffsetBuffer, ScalarBuffer};
170use arrow_data::{ArrayData, ArrayDataBuilder};
171use arrow_schema::*;
172use variable::{decode_binary_view, decode_string_view};
173
174use crate::fixed::{decode_bool, decode_fixed_size_binary, decode_primitive};
175use crate::list::{compute_lengths_fixed_size_list, encode_fixed_size_list};
176use crate::variable::{decode_binary, decode_string};
177use arrow_array::types::{Int16Type, Int32Type, Int64Type};
178
179mod fixed;
180mod list;
181mod run;
182mod variable;
183
184#[derive(Debug)]
441pub struct RowConverter {
442 fields: Arc<[SortField]>,
443 codecs: Vec<Codec>,
445}
446
447#[derive(Debug)]
448enum Codec {
449 Stateless,
451 Dictionary(RowConverter, OwnedRow),
454 Struct(RowConverter, OwnedRow),
457 List(RowConverter),
459 RunEndEncoded(RowConverter),
461}
462
463impl Codec {
464 fn new(sort_field: &SortField) -> Result<Self, ArrowError> {
465 match &sort_field.data_type {
466 DataType::Dictionary(_, values) => {
467 let sort_field =
468 SortField::new_with_options(values.as_ref().clone(), sort_field.options);
469
470 let converter = RowConverter::new(vec![sort_field])?;
471 let null_array = new_null_array(values.as_ref(), 1);
472 let nulls = converter.convert_columns(&[null_array])?;
473
474 let owned = OwnedRow {
475 data: nulls.buffer.into(),
476 config: nulls.config,
477 };
478 Ok(Self::Dictionary(converter, owned))
479 }
480 DataType::RunEndEncoded(_, values) => {
481 let options = SortOptions {
483 descending: false,
484 nulls_first: sort_field.options.nulls_first != sort_field.options.descending,
485 };
486
487 let field = SortField::new_with_options(values.data_type().clone(), options);
488 let converter = RowConverter::new(vec![field])?;
489 Ok(Self::RunEndEncoded(converter))
490 }
491 d if !d.is_nested() => Ok(Self::Stateless),
492 DataType::List(f) | DataType::LargeList(f) => {
493 let options = SortOptions {
497 descending: false,
498 nulls_first: sort_field.options.nulls_first != sort_field.options.descending,
499 };
500
501 let field = SortField::new_with_options(f.data_type().clone(), options);
502 let converter = RowConverter::new(vec![field])?;
503 Ok(Self::List(converter))
504 }
505 DataType::FixedSizeList(f, _) => {
506 let field = SortField::new_with_options(f.data_type().clone(), sort_field.options);
507 let converter = RowConverter::new(vec![field])?;
508 Ok(Self::List(converter))
509 }
510 DataType::Struct(f) => {
511 let sort_fields = f
512 .iter()
513 .map(|x| SortField::new_with_options(x.data_type().clone(), sort_field.options))
514 .collect();
515
516 let converter = RowConverter::new(sort_fields)?;
517 let nulls: Vec<_> = f.iter().map(|x| new_null_array(x.data_type(), 1)).collect();
518
519 let nulls = converter.convert_columns(&nulls)?;
520 let owned = OwnedRow {
521 data: nulls.buffer.into(),
522 config: nulls.config,
523 };
524
525 Ok(Self::Struct(converter, owned))
526 }
527 _ => Err(ArrowError::NotYetImplemented(format!(
528 "not yet implemented: {:?}",
529 sort_field.data_type
530 ))),
531 }
532 }
533
534 fn encoder(&self, array: &dyn Array) -> Result<Encoder<'_>, ArrowError> {
535 match self {
536 Codec::Stateless => Ok(Encoder::Stateless),
537 Codec::Dictionary(converter, nulls) => {
538 let values = array.as_any_dictionary().values().clone();
539 let rows = converter.convert_columns(&[values])?;
540 Ok(Encoder::Dictionary(rows, nulls.row()))
541 }
542 Codec::Struct(converter, null) => {
543 let v = as_struct_array(array);
544 let rows = converter.convert_columns(v.columns())?;
545 Ok(Encoder::Struct(rows, null.row()))
546 }
547 Codec::List(converter) => {
548 let values = match array.data_type() {
549 DataType::List(_) => {
550 let list_array = as_list_array(array);
551 let first_offset = list_array.offsets()[0] as usize;
552 let last_offset =
553 list_array.offsets()[list_array.offsets().len() - 1] as usize;
554
555 list_array
558 .values()
559 .slice(first_offset, last_offset - first_offset)
560 }
561 DataType::LargeList(_) => {
562 let list_array = as_large_list_array(array);
563
564 let first_offset = list_array.offsets()[0] as usize;
565 let last_offset =
566 list_array.offsets()[list_array.offsets().len() - 1] as usize;
567
568 list_array
571 .values()
572 .slice(first_offset, last_offset - first_offset)
573 }
574 DataType::FixedSizeList(_, _) => {
575 as_fixed_size_list_array(array).values().clone()
576 }
577 _ => unreachable!(),
578 };
579 let rows = converter.convert_columns(&[values])?;
580 Ok(Encoder::List(rows))
581 }
582 Codec::RunEndEncoded(converter) => {
583 let values = match array.data_type() {
584 DataType::RunEndEncoded(r, _) => match r.data_type() {
585 DataType::Int16 => array.as_run::<Int16Type>().values(),
586 DataType::Int32 => array.as_run::<Int32Type>().values(),
587 DataType::Int64 => array.as_run::<Int64Type>().values(),
588 _ => unreachable!("Unsupported run end index type: {r:?}"),
589 },
590 _ => unreachable!(),
591 };
592 let rows = converter.convert_columns(std::slice::from_ref(values))?;
593 Ok(Encoder::RunEndEncoded(rows))
594 }
595 }
596 }
597
598 fn size(&self) -> usize {
599 match self {
600 Codec::Stateless => 0,
601 Codec::Dictionary(converter, nulls) => converter.size() + nulls.data.len(),
602 Codec::Struct(converter, nulls) => converter.size() + nulls.data.len(),
603 Codec::List(converter) => converter.size(),
604 Codec::RunEndEncoded(converter) => converter.size(),
605 }
606 }
607}
608
609#[derive(Debug)]
610enum Encoder<'a> {
611 Stateless,
613 Dictionary(Rows, Row<'a>),
615 Struct(Rows, Row<'a>),
621 List(Rows),
623 RunEndEncoded(Rows),
625}
626
627#[derive(Debug, Clone, PartialEq, Eq)]
629pub struct SortField {
630 options: SortOptions,
632 data_type: DataType,
634}
635
636impl SortField {
637 pub fn new(data_type: DataType) -> Self {
639 Self::new_with_options(data_type, Default::default())
640 }
641
642 pub fn new_with_options(data_type: DataType, options: SortOptions) -> Self {
644 Self { options, data_type }
645 }
646
647 pub fn size(&self) -> usize {
651 self.data_type.size() + std::mem::size_of::<Self>() - std::mem::size_of::<DataType>()
652 }
653}
654
655impl RowConverter {
656 pub fn new(fields: Vec<SortField>) -> Result<Self, ArrowError> {
658 if !Self::supports_fields(&fields) {
659 return Err(ArrowError::NotYetImplemented(format!(
660 "Row format support not yet implemented for: {fields:?}"
661 )));
662 }
663
664 let codecs = fields.iter().map(Codec::new).collect::<Result<_, _>>()?;
665 Ok(Self {
666 fields: fields.into(),
667 codecs,
668 })
669 }
670
671 pub fn supports_fields(fields: &[SortField]) -> bool {
673 fields.iter().all(|x| Self::supports_datatype(&x.data_type))
674 }
675
676 fn supports_datatype(d: &DataType) -> bool {
677 match d {
678 _ if !d.is_nested() => true,
679 DataType::List(f) | DataType::LargeList(f) | DataType::FixedSizeList(f, _) => {
680 Self::supports_datatype(f.data_type())
681 }
682 DataType::Struct(f) => f.iter().all(|x| Self::supports_datatype(x.data_type())),
683 DataType::RunEndEncoded(_, values) => Self::supports_datatype(values.data_type()),
684 _ => false,
685 }
686 }
687
688 pub fn convert_columns(&self, columns: &[ArrayRef]) -> Result<Rows, ArrowError> {
698 let num_rows = columns.first().map(|x| x.len()).unwrap_or(0);
699 let mut rows = self.empty_rows(num_rows, 0);
700 self.append(&mut rows, columns)?;
701 Ok(rows)
702 }
703
704 pub fn append(&self, rows: &mut Rows, columns: &[ArrayRef]) -> Result<(), ArrowError> {
735 assert!(
736 Arc::ptr_eq(&rows.config.fields, &self.fields),
737 "rows were not produced by this RowConverter"
738 );
739
740 if columns.len() != self.fields.len() {
741 return Err(ArrowError::InvalidArgumentError(format!(
742 "Incorrect number of arrays provided to RowConverter, expected {} got {}",
743 self.fields.len(),
744 columns.len()
745 )));
746 }
747 for colum in columns.iter().skip(1) {
748 if colum.len() != columns[0].len() {
749 return Err(ArrowError::InvalidArgumentError(format!(
750 "RowConverter columns must all have the same length, expected {} got {}",
751 columns[0].len(),
752 colum.len()
753 )));
754 }
755 }
756
757 let encoders = columns
758 .iter()
759 .zip(&self.codecs)
760 .zip(self.fields.iter())
761 .map(|((column, codec), field)| {
762 if !column.data_type().equals_datatype(&field.data_type) {
763 return Err(ArrowError::InvalidArgumentError(format!(
764 "RowConverter column schema mismatch, expected {} got {}",
765 field.data_type,
766 column.data_type()
767 )));
768 }
769 codec.encoder(column.as_ref())
770 })
771 .collect::<Result<Vec<_>, _>>()?;
772
773 let write_offset = rows.num_rows();
774 let lengths = row_lengths(columns, &encoders);
775 let total = lengths.extend_offsets(rows.offsets[write_offset], &mut rows.offsets);
776 rows.buffer.resize(total, 0);
777
778 for ((column, field), encoder) in columns.iter().zip(self.fields.iter()).zip(encoders) {
779 encode_column(
781 &mut rows.buffer,
782 &mut rows.offsets[write_offset..],
783 column.as_ref(),
784 field.options,
785 &encoder,
786 )
787 }
788
789 if cfg!(debug_assertions) {
790 assert_eq!(*rows.offsets.last().unwrap(), rows.buffer.len());
791 rows.offsets
792 .windows(2)
793 .for_each(|w| assert!(w[0] <= w[1], "offsets should be monotonic"));
794 }
795
796 Ok(())
797 }
798
799 pub fn convert_rows<'a, I>(&self, rows: I) -> Result<Vec<ArrayRef>, ArrowError>
807 where
808 I: IntoIterator<Item = Row<'a>>,
809 {
810 let mut validate_utf8 = false;
811 let mut rows: Vec<_> = rows
812 .into_iter()
813 .map(|row| {
814 assert!(
815 Arc::ptr_eq(&row.config.fields, &self.fields),
816 "rows were not produced by this RowConverter"
817 );
818 validate_utf8 |= row.config.validate_utf8;
819 row.data
820 })
821 .collect();
822
823 let result = unsafe { self.convert_raw(&mut rows, validate_utf8) }?;
827
828 if cfg!(test) {
829 for (i, row) in rows.iter().enumerate() {
830 if !row.is_empty() {
831 return Err(ArrowError::InvalidArgumentError(format!(
832 "Codecs {codecs:?} did not consume all bytes for row {i}, remaining bytes: {row:?}",
833 codecs = &self.codecs
834 )));
835 }
836 }
837 }
838
839 Ok(result)
840 }
841
842 pub fn empty_rows(&self, row_capacity: usize, data_capacity: usize) -> Rows {
871 let mut offsets = Vec::with_capacity(row_capacity.saturating_add(1));
872 offsets.push(0);
873
874 Rows {
875 offsets,
876 buffer: Vec::with_capacity(data_capacity),
877 config: RowConfig {
878 fields: self.fields.clone(),
879 validate_utf8: false,
880 },
881 }
882 }
883
884 pub fn from_binary(&self, array: BinaryArray) -> Rows {
911 assert_eq!(
912 array.null_count(),
913 0,
914 "can't construct Rows instance from array with nulls"
915 );
916 let (offsets, values, _) = array.into_parts();
917 let offsets = offsets.iter().map(|&i| i.as_usize()).collect();
918 let buffer = values.into_vec().unwrap_or_else(|values| values.to_vec());
920 Rows {
921 buffer,
922 offsets,
923 config: RowConfig {
924 fields: Arc::clone(&self.fields),
925 validate_utf8: true,
926 },
927 }
928 }
929
930 unsafe fn convert_raw(
936 &self,
937 rows: &mut [&[u8]],
938 validate_utf8: bool,
939 ) -> Result<Vec<ArrayRef>, ArrowError> {
940 self.fields
941 .iter()
942 .zip(&self.codecs)
943 .map(|(field, codec)| unsafe { decode_column(field, rows, codec, validate_utf8) })
944 .collect()
945 }
946
947 pub fn parser(&self) -> RowParser {
949 RowParser::new(Arc::clone(&self.fields))
950 }
951
952 pub fn size(&self) -> usize {
956 std::mem::size_of::<Self>()
957 + self.fields.iter().map(|x| x.size()).sum::<usize>()
958 + self.codecs.capacity() * std::mem::size_of::<Codec>()
959 + self.codecs.iter().map(Codec::size).sum::<usize>()
960 }
961}
962
963#[derive(Debug)]
965pub struct RowParser {
966 config: RowConfig,
967}
968
969impl RowParser {
970 fn new(fields: Arc<[SortField]>) -> Self {
971 Self {
972 config: RowConfig {
973 fields,
974 validate_utf8: true,
975 },
976 }
977 }
978
979 pub fn parse<'a>(&'a self, bytes: &'a [u8]) -> Row<'a> {
984 Row {
985 data: bytes,
986 config: &self.config,
987 }
988 }
989}
990
991#[derive(Debug, Clone)]
993struct RowConfig {
994 fields: Arc<[SortField]>,
996 validate_utf8: bool,
998}
999
1000#[derive(Debug)]
1004pub struct Rows {
1005 buffer: Vec<u8>,
1007 offsets: Vec<usize>,
1009 config: RowConfig,
1011}
1012
1013impl Rows {
1014 pub fn push(&mut self, row: Row<'_>) {
1016 assert!(
1017 Arc::ptr_eq(&row.config.fields, &self.config.fields),
1018 "row was not produced by this RowConverter"
1019 );
1020 self.config.validate_utf8 |= row.config.validate_utf8;
1021 self.buffer.extend_from_slice(row.data);
1022 self.offsets.push(self.buffer.len())
1023 }
1024
1025 pub fn row(&self, row: usize) -> Row<'_> {
1027 assert!(row + 1 < self.offsets.len());
1028 unsafe { self.row_unchecked(row) }
1029 }
1030
1031 pub unsafe fn row_unchecked(&self, index: usize) -> Row<'_> {
1036 let end = unsafe { self.offsets.get_unchecked(index + 1) };
1037 let start = unsafe { self.offsets.get_unchecked(index) };
1038 let data = unsafe { self.buffer.get_unchecked(*start..*end) };
1039 Row {
1040 data,
1041 config: &self.config,
1042 }
1043 }
1044
1045 pub fn clear(&mut self) {
1047 self.offsets.truncate(1);
1048 self.buffer.clear();
1049 }
1050
1051 pub fn num_rows(&self) -> usize {
1053 self.offsets.len() - 1
1054 }
1055
1056 pub fn iter(&self) -> RowsIter<'_> {
1058 self.into_iter()
1059 }
1060
1061 pub fn size(&self) -> usize {
1065 std::mem::size_of::<Self>()
1067 + self.buffer.len()
1068 + self.offsets.len() * std::mem::size_of::<usize>()
1069 }
1070
1071 pub fn try_into_binary(self) -> Result<BinaryArray, ArrowError> {
1101 if self.buffer.len() > i32::MAX as usize {
1102 return Err(ArrowError::InvalidArgumentError(format!(
1103 "{}-byte rows buffer too long to convert into a i32-indexed BinaryArray",
1104 self.buffer.len()
1105 )));
1106 }
1107 let offsets_scalar = ScalarBuffer::from_iter(self.offsets.into_iter().map(i32::usize_as));
1109 let array = unsafe {
1111 BinaryArray::new_unchecked(
1112 OffsetBuffer::new_unchecked(offsets_scalar),
1113 Buffer::from_vec(self.buffer),
1114 None,
1115 )
1116 };
1117 Ok(array)
1118 }
1119}
1120
1121impl<'a> IntoIterator for &'a Rows {
1122 type Item = Row<'a>;
1123 type IntoIter = RowsIter<'a>;
1124
1125 fn into_iter(self) -> Self::IntoIter {
1126 RowsIter {
1127 rows: self,
1128 start: 0,
1129 end: self.num_rows(),
1130 }
1131 }
1132}
1133
1134#[derive(Debug)]
1136pub struct RowsIter<'a> {
1137 rows: &'a Rows,
1138 start: usize,
1139 end: usize,
1140}
1141
1142impl<'a> Iterator for RowsIter<'a> {
1143 type Item = Row<'a>;
1144
1145 fn next(&mut self) -> Option<Self::Item> {
1146 if self.end == self.start {
1147 return None;
1148 }
1149
1150 let row = unsafe { self.rows.row_unchecked(self.start) };
1152 self.start += 1;
1153 Some(row)
1154 }
1155
1156 fn size_hint(&self) -> (usize, Option<usize>) {
1157 let len = self.len();
1158 (len, Some(len))
1159 }
1160}
1161
1162impl ExactSizeIterator for RowsIter<'_> {
1163 fn len(&self) -> usize {
1164 self.end - self.start
1165 }
1166}
1167
1168impl DoubleEndedIterator for RowsIter<'_> {
1169 fn next_back(&mut self) -> Option<Self::Item> {
1170 if self.end == self.start {
1171 return None;
1172 }
1173 let row = unsafe { self.rows.row_unchecked(self.end) };
1175 self.end -= 1;
1176 Some(row)
1177 }
1178}
1179
1180#[derive(Debug, Copy, Clone)]
1189pub struct Row<'a> {
1190 data: &'a [u8],
1191 config: &'a RowConfig,
1192}
1193
1194impl<'a> Row<'a> {
1195 pub fn owned(&self) -> OwnedRow {
1197 OwnedRow {
1198 data: self.data.into(),
1199 config: self.config.clone(),
1200 }
1201 }
1202
1203 pub fn data(&self) -> &'a [u8] {
1205 self.data
1206 }
1207}
1208
1209impl PartialEq for Row<'_> {
1212 #[inline]
1213 fn eq(&self, other: &Self) -> bool {
1214 self.data.eq(other.data)
1215 }
1216}
1217
1218impl Eq for Row<'_> {}
1219
1220impl PartialOrd for Row<'_> {
1221 #[inline]
1222 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1223 Some(self.cmp(other))
1224 }
1225}
1226
1227impl Ord for Row<'_> {
1228 #[inline]
1229 fn cmp(&self, other: &Self) -> Ordering {
1230 self.data.cmp(other.data)
1231 }
1232}
1233
1234impl Hash for Row<'_> {
1235 #[inline]
1236 fn hash<H: Hasher>(&self, state: &mut H) {
1237 self.data.hash(state)
1238 }
1239}
1240
1241impl AsRef<[u8]> for Row<'_> {
1242 #[inline]
1243 fn as_ref(&self) -> &[u8] {
1244 self.data
1245 }
1246}
1247
1248#[derive(Debug, Clone)]
1252pub struct OwnedRow {
1253 data: Box<[u8]>,
1254 config: RowConfig,
1255}
1256
1257impl OwnedRow {
1258 pub fn row(&self) -> Row<'_> {
1262 Row {
1263 data: &self.data,
1264 config: &self.config,
1265 }
1266 }
1267}
1268
1269impl PartialEq for OwnedRow {
1272 #[inline]
1273 fn eq(&self, other: &Self) -> bool {
1274 self.row().eq(&other.row())
1275 }
1276}
1277
1278impl Eq for OwnedRow {}
1279
1280impl PartialOrd for OwnedRow {
1281 #[inline]
1282 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1283 Some(self.cmp(other))
1284 }
1285}
1286
1287impl Ord for OwnedRow {
1288 #[inline]
1289 fn cmp(&self, other: &Self) -> Ordering {
1290 self.row().cmp(&other.row())
1291 }
1292}
1293
1294impl Hash for OwnedRow {
1295 #[inline]
1296 fn hash<H: Hasher>(&self, state: &mut H) {
1297 self.row().hash(state)
1298 }
1299}
1300
1301impl AsRef<[u8]> for OwnedRow {
1302 #[inline]
1303 fn as_ref(&self) -> &[u8] {
1304 &self.data
1305 }
1306}
1307
1308#[inline]
1310fn null_sentinel(options: SortOptions) -> u8 {
1311 match options.nulls_first {
1312 true => 0,
1313 false => 0xFF,
1314 }
1315}
1316
1317enum LengthTracker {
1319 Fixed { length: usize, num_rows: usize },
1321 Variable {
1323 fixed_length: usize,
1324 lengths: Vec<usize>,
1325 },
1326}
1327
1328impl LengthTracker {
1329 fn new(num_rows: usize) -> Self {
1330 Self::Fixed {
1331 length: 0,
1332 num_rows,
1333 }
1334 }
1335
1336 fn push_fixed(&mut self, new_length: usize) {
1338 match self {
1339 LengthTracker::Fixed { length, .. } => *length += new_length,
1340 LengthTracker::Variable { fixed_length, .. } => *fixed_length += new_length,
1341 }
1342 }
1343
1344 fn push_variable(&mut self, new_lengths: impl ExactSizeIterator<Item = usize>) {
1346 match self {
1347 LengthTracker::Fixed { length, .. } => {
1348 *self = LengthTracker::Variable {
1349 fixed_length: *length,
1350 lengths: new_lengths.collect(),
1351 }
1352 }
1353 LengthTracker::Variable { lengths, .. } => {
1354 assert_eq!(lengths.len(), new_lengths.len());
1355 lengths
1356 .iter_mut()
1357 .zip(new_lengths)
1358 .for_each(|(length, new_length)| *length += new_length);
1359 }
1360 }
1361 }
1362
1363 fn materialized(&mut self) -> &mut [usize] {
1365 if let LengthTracker::Fixed { length, num_rows } = *self {
1366 *self = LengthTracker::Variable {
1367 fixed_length: length,
1368 lengths: vec![0; num_rows],
1369 };
1370 }
1371
1372 match self {
1373 LengthTracker::Variable { lengths, .. } => lengths,
1374 LengthTracker::Fixed { .. } => unreachable!(),
1375 }
1376 }
1377
1378 fn extend_offsets(&self, initial_offset: usize, offsets: &mut Vec<usize>) -> usize {
1396 match self {
1397 LengthTracker::Fixed { length, num_rows } => {
1398 offsets.extend((0..*num_rows).map(|i| initial_offset + i * length));
1399
1400 initial_offset + num_rows * length
1401 }
1402 LengthTracker::Variable {
1403 fixed_length,
1404 lengths,
1405 } => {
1406 let mut acc = initial_offset;
1407
1408 offsets.extend(lengths.iter().map(|length| {
1409 let current = acc;
1410 acc += length + fixed_length;
1411 current
1412 }));
1413
1414 acc
1415 }
1416 }
1417 }
1418}
1419
1420fn row_lengths(cols: &[ArrayRef], encoders: &[Encoder]) -> LengthTracker {
1422 use fixed::FixedLengthEncoding;
1423
1424 let num_rows = cols.first().map(|x| x.len()).unwrap_or(0);
1425 let mut tracker = LengthTracker::new(num_rows);
1426
1427 for (array, encoder) in cols.iter().zip(encoders) {
1428 match encoder {
1429 Encoder::Stateless => {
1430 downcast_primitive_array! {
1431 array => tracker.push_fixed(fixed::encoded_len(array)),
1432 DataType::Null => {},
1433 DataType::Boolean => tracker.push_fixed(bool::ENCODED_LEN),
1434 DataType::Binary => tracker.push_variable(
1435 as_generic_binary_array::<i32>(array)
1436 .iter()
1437 .map(|slice| variable::encoded_len(slice))
1438 ),
1439 DataType::LargeBinary => tracker.push_variable(
1440 as_generic_binary_array::<i64>(array)
1441 .iter()
1442 .map(|slice| variable::encoded_len(slice))
1443 ),
1444 DataType::BinaryView => tracker.push_variable(
1445 array.as_binary_view()
1446 .iter()
1447 .map(|slice| variable::encoded_len(slice))
1448 ),
1449 DataType::Utf8 => tracker.push_variable(
1450 array.as_string::<i32>()
1451 .iter()
1452 .map(|slice| variable::encoded_len(slice.map(|x| x.as_bytes())))
1453 ),
1454 DataType::LargeUtf8 => tracker.push_variable(
1455 array.as_string::<i64>()
1456 .iter()
1457 .map(|slice| variable::encoded_len(slice.map(|x| x.as_bytes())))
1458 ),
1459 DataType::Utf8View => tracker.push_variable(
1460 array.as_string_view()
1461 .iter()
1462 .map(|slice| variable::encoded_len(slice.map(|x| x.as_bytes())))
1463 ),
1464 DataType::FixedSizeBinary(len) => {
1465 let len = len.to_usize().unwrap();
1466 tracker.push_fixed(1 + len)
1467 }
1468 _ => unimplemented!("unsupported data type: {}", array.data_type()),
1469 }
1470 }
1471 Encoder::Dictionary(values, null) => {
1472 downcast_dictionary_array! {
1473 array => {
1474 tracker.push_variable(
1475 array.keys().iter().map(|v| match v {
1476 Some(k) => values.row(k.as_usize()).data.len(),
1477 None => null.data.len(),
1478 })
1479 )
1480 }
1481 _ => unreachable!(),
1482 }
1483 }
1484 Encoder::Struct(rows, null) => {
1485 let array = as_struct_array(array);
1486 tracker.push_variable((0..array.len()).map(|idx| match array.is_valid(idx) {
1487 true => 1 + rows.row(idx).as_ref().len(),
1488 false => 1 + null.data.len(),
1489 }));
1490 }
1491 Encoder::List(rows) => match array.data_type() {
1492 DataType::List(_) => {
1493 list::compute_lengths(tracker.materialized(), rows, as_list_array(array))
1494 }
1495 DataType::LargeList(_) => {
1496 list::compute_lengths(tracker.materialized(), rows, as_large_list_array(array))
1497 }
1498 DataType::FixedSizeList(_, _) => compute_lengths_fixed_size_list(
1499 &mut tracker,
1500 rows,
1501 as_fixed_size_list_array(array),
1502 ),
1503 _ => unreachable!(),
1504 },
1505 Encoder::RunEndEncoded(rows) => match array.data_type() {
1506 DataType::RunEndEncoded(r, _) => match r.data_type() {
1507 DataType::Int16 => run::compute_lengths(
1508 tracker.materialized(),
1509 rows,
1510 array.as_run::<Int16Type>(),
1511 ),
1512 DataType::Int32 => run::compute_lengths(
1513 tracker.materialized(),
1514 rows,
1515 array.as_run::<Int32Type>(),
1516 ),
1517 DataType::Int64 => run::compute_lengths(
1518 tracker.materialized(),
1519 rows,
1520 array.as_run::<Int64Type>(),
1521 ),
1522 _ => unreachable!("Unsupported run end index type: {r:?}"),
1523 },
1524 _ => unreachable!(),
1525 },
1526 }
1527 }
1528
1529 tracker
1530}
1531
1532fn encode_column(
1534 data: &mut [u8],
1535 offsets: &mut [usize],
1536 column: &dyn Array,
1537 opts: SortOptions,
1538 encoder: &Encoder<'_>,
1539) {
1540 match encoder {
1541 Encoder::Stateless => {
1542 downcast_primitive_array! {
1543 column => {
1544 if let Some(nulls) = column.nulls().filter(|n| n.null_count() > 0){
1545 fixed::encode(data, offsets, column.values(), nulls, opts)
1546 } else {
1547 fixed::encode_not_null(data, offsets, column.values(), opts)
1548 }
1549 }
1550 DataType::Null => {}
1551 DataType::Boolean => {
1552 if let Some(nulls) = column.nulls().filter(|n| n.null_count() > 0){
1553 fixed::encode_boolean(data, offsets, column.as_boolean().values(), nulls, opts)
1554 } else {
1555 fixed::encode_boolean_not_null(data, offsets, column.as_boolean().values(), opts)
1556 }
1557 }
1558 DataType::Binary => {
1559 variable::encode(data, offsets, as_generic_binary_array::<i32>(column).iter(), opts)
1560 }
1561 DataType::BinaryView => {
1562 variable::encode(data, offsets, column.as_binary_view().iter(), opts)
1563 }
1564 DataType::LargeBinary => {
1565 variable::encode(data, offsets, as_generic_binary_array::<i64>(column).iter(), opts)
1566 }
1567 DataType::Utf8 => variable::encode(
1568 data, offsets,
1569 column.as_string::<i32>().iter().map(|x| x.map(|x| x.as_bytes())),
1570 opts,
1571 ),
1572 DataType::LargeUtf8 => variable::encode(
1573 data, offsets,
1574 column.as_string::<i64>()
1575 .iter()
1576 .map(|x| x.map(|x| x.as_bytes())),
1577 opts,
1578 ),
1579 DataType::Utf8View => variable::encode(
1580 data, offsets,
1581 column.as_string_view().iter().map(|x| x.map(|x| x.as_bytes())),
1582 opts,
1583 ),
1584 DataType::FixedSizeBinary(_) => {
1585 let array = column.as_any().downcast_ref().unwrap();
1586 fixed::encode_fixed_size_binary(data, offsets, array, opts)
1587 }
1588 _ => unimplemented!("unsupported data type: {}", column.data_type()),
1589 }
1590 }
1591 Encoder::Dictionary(values, nulls) => {
1592 downcast_dictionary_array! {
1593 column => encode_dictionary_values(data, offsets, column, values, nulls),
1594 _ => unreachable!()
1595 }
1596 }
1597 Encoder::Struct(rows, null) => {
1598 let array = as_struct_array(column);
1599 let null_sentinel = null_sentinel(opts);
1600 offsets
1601 .iter_mut()
1602 .skip(1)
1603 .enumerate()
1604 .for_each(|(idx, offset)| {
1605 let (row, sentinel) = match array.is_valid(idx) {
1606 true => (rows.row(idx), 0x01),
1607 false => (*null, null_sentinel),
1608 };
1609 let end_offset = *offset + 1 + row.as_ref().len();
1610 data[*offset] = sentinel;
1611 data[*offset + 1..end_offset].copy_from_slice(row.as_ref());
1612 *offset = end_offset;
1613 })
1614 }
1615 Encoder::List(rows) => match column.data_type() {
1616 DataType::List(_) => list::encode(data, offsets, rows, opts, as_list_array(column)),
1617 DataType::LargeList(_) => {
1618 list::encode(data, offsets, rows, opts, as_large_list_array(column))
1619 }
1620 DataType::FixedSizeList(_, _) => {
1621 encode_fixed_size_list(data, offsets, rows, opts, as_fixed_size_list_array(column))
1622 }
1623 _ => unreachable!(),
1624 },
1625 Encoder::RunEndEncoded(rows) => match column.data_type() {
1626 DataType::RunEndEncoded(r, _) => match r.data_type() {
1627 DataType::Int16 => {
1628 run::encode(data, offsets, rows, opts, column.as_run::<Int16Type>())
1629 }
1630 DataType::Int32 => {
1631 run::encode(data, offsets, rows, opts, column.as_run::<Int32Type>())
1632 }
1633 DataType::Int64 => {
1634 run::encode(data, offsets, rows, opts, column.as_run::<Int64Type>())
1635 }
1636 _ => unreachable!("Unsupported run end index type: {r:?}"),
1637 },
1638 _ => unreachable!(),
1639 },
1640 }
1641}
1642
1643pub fn encode_dictionary_values<K: ArrowDictionaryKeyType>(
1645 data: &mut [u8],
1646 offsets: &mut [usize],
1647 column: &DictionaryArray<K>,
1648 values: &Rows,
1649 null: &Row<'_>,
1650) {
1651 for (offset, k) in offsets.iter_mut().skip(1).zip(column.keys()) {
1652 let row = match k {
1653 Some(k) => values.row(k.as_usize()).data,
1654 None => null.data,
1655 };
1656 let end_offset = *offset + row.len();
1657 data[*offset..end_offset].copy_from_slice(row);
1658 *offset = end_offset;
1659 }
1660}
1661
1662macro_rules! decode_primitive_helper {
1663 ($t:ty, $rows:ident, $data_type:ident, $options:ident) => {
1664 Arc::new(decode_primitive::<$t>($rows, $data_type, $options))
1665 };
1666}
1667
1668unsafe fn decode_column(
1674 field: &SortField,
1675 rows: &mut [&[u8]],
1676 codec: &Codec,
1677 validate_utf8: bool,
1678) -> Result<ArrayRef, ArrowError> {
1679 let options = field.options;
1680
1681 let array: ArrayRef = match codec {
1682 Codec::Stateless => {
1683 let data_type = field.data_type.clone();
1684 downcast_primitive! {
1685 data_type => (decode_primitive_helper, rows, data_type, options),
1686 DataType::Null => Arc::new(NullArray::new(rows.len())),
1687 DataType::Boolean => Arc::new(decode_bool(rows, options)),
1688 DataType::Binary => Arc::new(decode_binary::<i32>(rows, options)),
1689 DataType::LargeBinary => Arc::new(decode_binary::<i64>(rows, options)),
1690 DataType::BinaryView => Arc::new(decode_binary_view(rows, options)),
1691 DataType::FixedSizeBinary(size) => Arc::new(decode_fixed_size_binary(rows, size, options)),
1692 DataType::Utf8 => Arc::new(unsafe{ decode_string::<i32>(rows, options, validate_utf8) }),
1693 DataType::LargeUtf8 => Arc::new(unsafe { decode_string::<i64>(rows, options, validate_utf8) }),
1694 DataType::Utf8View => Arc::new(unsafe { decode_string_view(rows, options, validate_utf8) }),
1695 _ => return Err(ArrowError::NotYetImplemented(format!("unsupported data type: {data_type}" )))
1696 }
1697 }
1698 Codec::Dictionary(converter, _) => {
1699 let cols = unsafe { converter.convert_raw(rows, validate_utf8) }?;
1700 cols.into_iter().next().unwrap()
1701 }
1702 Codec::Struct(converter, _) => {
1703 let (null_count, nulls) = fixed::decode_nulls(rows);
1704 rows.iter_mut().for_each(|row| *row = &row[1..]);
1705 let children = unsafe { converter.convert_raw(rows, validate_utf8) }?;
1706
1707 let child_data: Vec<ArrayData> = children.iter().map(|c| c.to_data()).collect();
1708 let corrected_fields: Vec<Field> = match &field.data_type {
1711 DataType::Struct(struct_fields) => struct_fields
1712 .iter()
1713 .zip(child_data.iter())
1714 .map(|(orig_field, child_array)| {
1715 orig_field
1716 .as_ref()
1717 .clone()
1718 .with_data_type(child_array.data_type().clone())
1719 })
1720 .collect(),
1721 _ => unreachable!("Only Struct types should be corrected here"),
1722 };
1723 let corrected_struct_type = DataType::Struct(corrected_fields.into());
1724 let builder = ArrayDataBuilder::new(corrected_struct_type)
1725 .len(rows.len())
1726 .null_count(null_count)
1727 .null_bit_buffer(Some(nulls))
1728 .child_data(child_data);
1729
1730 Arc::new(StructArray::from(unsafe { builder.build_unchecked() }))
1731 }
1732 Codec::List(converter) => match &field.data_type {
1733 DataType::List(_) => {
1734 Arc::new(unsafe { list::decode::<i32>(converter, rows, field, validate_utf8) }?)
1735 }
1736 DataType::LargeList(_) => {
1737 Arc::new(unsafe { list::decode::<i64>(converter, rows, field, validate_utf8) }?)
1738 }
1739 DataType::FixedSizeList(_, value_length) => Arc::new(unsafe {
1740 list::decode_fixed_size_list(
1741 converter,
1742 rows,
1743 field,
1744 validate_utf8,
1745 value_length.as_usize(),
1746 )
1747 }?),
1748 _ => unreachable!(),
1749 },
1750 Codec::RunEndEncoded(converter) => match &field.data_type {
1751 DataType::RunEndEncoded(run_ends, _) => match run_ends.data_type() {
1752 DataType::Int16 => Arc::new(unsafe {
1753 run::decode::<Int16Type>(converter, rows, field, validate_utf8)
1754 }?),
1755 DataType::Int32 => Arc::new(unsafe {
1756 run::decode::<Int32Type>(converter, rows, field, validate_utf8)
1757 }?),
1758 DataType::Int64 => Arc::new(unsafe {
1759 run::decode::<Int64Type>(converter, rows, field, validate_utf8)
1760 }?),
1761 _ => unreachable!(),
1762 },
1763 _ => unreachable!(),
1764 },
1765 };
1766 Ok(array)
1767}
1768
1769#[cfg(test)]
1770mod tests {
1771 use rand::distr::uniform::SampleUniform;
1772 use rand::distr::{Distribution, StandardUniform};
1773 use rand::{Rng, rng};
1774
1775 use arrow_array::builder::*;
1776 use arrow_array::types::*;
1777 use arrow_array::*;
1778 use arrow_buffer::{Buffer, OffsetBuffer};
1779 use arrow_buffer::{NullBuffer, i256};
1780 use arrow_cast::display::{ArrayFormatter, FormatOptions};
1781 use arrow_ord::sort::{LexicographicalComparator, SortColumn};
1782
1783 use super::*;
1784
1785 #[test]
1786 fn test_fixed_width() {
1787 let cols = [
1788 Arc::new(Int16Array::from_iter([
1789 Some(1),
1790 Some(2),
1791 None,
1792 Some(-5),
1793 Some(2),
1794 Some(2),
1795 Some(0),
1796 ])) as ArrayRef,
1797 Arc::new(Float32Array::from_iter([
1798 Some(1.3),
1799 Some(2.5),
1800 None,
1801 Some(4.),
1802 Some(0.1),
1803 Some(-4.),
1804 Some(-0.),
1805 ])) as ArrayRef,
1806 ];
1807
1808 let converter = RowConverter::new(vec![
1809 SortField::new(DataType::Int16),
1810 SortField::new(DataType::Float32),
1811 ])
1812 .unwrap();
1813 let rows = converter.convert_columns(&cols).unwrap();
1814
1815 assert_eq!(rows.offsets, &[0, 8, 16, 24, 32, 40, 48, 56]);
1816 assert_eq!(
1817 rows.buffer,
1818 &[
1819 1, 128, 1, 1, 191, 166, 102, 102, 1, 128, 2, 1, 192, 32, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 127, 251, 1, 192, 128, 0, 0, 1, 128, 2, 1, 189, 204, 204, 205, 1, 128, 2, 1, 63, 127, 255, 255, 1, 128, 0, 1, 127, 255, 255, 255 ]
1834 );
1835
1836 assert!(rows.row(3) < rows.row(6));
1837 assert!(rows.row(0) < rows.row(1));
1838 assert!(rows.row(3) < rows.row(0));
1839 assert!(rows.row(4) < rows.row(1));
1840 assert!(rows.row(5) < rows.row(4));
1841
1842 let back = converter.convert_rows(&rows).unwrap();
1843 for (expected, actual) in cols.iter().zip(&back) {
1844 assert_eq!(expected, actual);
1845 }
1846 }
1847
1848 #[test]
1849 fn test_decimal32() {
1850 let converter = RowConverter::new(vec![SortField::new(DataType::Decimal32(
1851 DECIMAL32_MAX_PRECISION,
1852 7,
1853 ))])
1854 .unwrap();
1855 let col = Arc::new(
1856 Decimal32Array::from_iter([
1857 None,
1858 Some(i32::MIN),
1859 Some(-13),
1860 Some(46_i32),
1861 Some(5456_i32),
1862 Some(i32::MAX),
1863 ])
1864 .with_precision_and_scale(9, 7)
1865 .unwrap(),
1866 ) as ArrayRef;
1867
1868 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1869 for i in 0..rows.num_rows() - 1 {
1870 assert!(rows.row(i) < rows.row(i + 1));
1871 }
1872
1873 let back = converter.convert_rows(&rows).unwrap();
1874 assert_eq!(back.len(), 1);
1875 assert_eq!(col.as_ref(), back[0].as_ref())
1876 }
1877
1878 #[test]
1879 fn test_decimal64() {
1880 let converter = RowConverter::new(vec![SortField::new(DataType::Decimal64(
1881 DECIMAL64_MAX_PRECISION,
1882 7,
1883 ))])
1884 .unwrap();
1885 let col = Arc::new(
1886 Decimal64Array::from_iter([
1887 None,
1888 Some(i64::MIN),
1889 Some(-13),
1890 Some(46_i64),
1891 Some(5456_i64),
1892 Some(i64::MAX),
1893 ])
1894 .with_precision_and_scale(18, 7)
1895 .unwrap(),
1896 ) as ArrayRef;
1897
1898 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1899 for i in 0..rows.num_rows() - 1 {
1900 assert!(rows.row(i) < rows.row(i + 1));
1901 }
1902
1903 let back = converter.convert_rows(&rows).unwrap();
1904 assert_eq!(back.len(), 1);
1905 assert_eq!(col.as_ref(), back[0].as_ref())
1906 }
1907
1908 #[test]
1909 fn test_decimal128() {
1910 let converter = RowConverter::new(vec![SortField::new(DataType::Decimal128(
1911 DECIMAL128_MAX_PRECISION,
1912 7,
1913 ))])
1914 .unwrap();
1915 let col = Arc::new(
1916 Decimal128Array::from_iter([
1917 None,
1918 Some(i128::MIN),
1919 Some(-13),
1920 Some(46_i128),
1921 Some(5456_i128),
1922 Some(i128::MAX),
1923 ])
1924 .with_precision_and_scale(38, 7)
1925 .unwrap(),
1926 ) as ArrayRef;
1927
1928 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1929 for i in 0..rows.num_rows() - 1 {
1930 assert!(rows.row(i) < rows.row(i + 1));
1931 }
1932
1933 let back = converter.convert_rows(&rows).unwrap();
1934 assert_eq!(back.len(), 1);
1935 assert_eq!(col.as_ref(), back[0].as_ref())
1936 }
1937
1938 #[test]
1939 fn test_decimal256() {
1940 let converter = RowConverter::new(vec![SortField::new(DataType::Decimal256(
1941 DECIMAL256_MAX_PRECISION,
1942 7,
1943 ))])
1944 .unwrap();
1945 let col = Arc::new(
1946 Decimal256Array::from_iter([
1947 None,
1948 Some(i256::MIN),
1949 Some(i256::from_parts(0, -1)),
1950 Some(i256::from_parts(u128::MAX, -1)),
1951 Some(i256::from_parts(u128::MAX, 0)),
1952 Some(i256::from_parts(0, 46_i128)),
1953 Some(i256::from_parts(5, 46_i128)),
1954 Some(i256::MAX),
1955 ])
1956 .with_precision_and_scale(DECIMAL256_MAX_PRECISION, 7)
1957 .unwrap(),
1958 ) as ArrayRef;
1959
1960 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1961 for i in 0..rows.num_rows() - 1 {
1962 assert!(rows.row(i) < rows.row(i + 1));
1963 }
1964
1965 let back = converter.convert_rows(&rows).unwrap();
1966 assert_eq!(back.len(), 1);
1967 assert_eq!(col.as_ref(), back[0].as_ref())
1968 }
1969
1970 #[test]
1971 fn test_bool() {
1972 let converter = RowConverter::new(vec![SortField::new(DataType::Boolean)]).unwrap();
1973
1974 let col = Arc::new(BooleanArray::from_iter([None, Some(false), Some(true)])) as ArrayRef;
1975
1976 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1977 assert!(rows.row(2) > rows.row(1));
1978 assert!(rows.row(2) > rows.row(0));
1979 assert!(rows.row(1) > rows.row(0));
1980
1981 let cols = converter.convert_rows(&rows).unwrap();
1982 assert_eq!(&cols[0], &col);
1983
1984 let converter = RowConverter::new(vec![SortField::new_with_options(
1985 DataType::Boolean,
1986 SortOptions::default().desc().with_nulls_first(false),
1987 )])
1988 .unwrap();
1989
1990 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1991 assert!(rows.row(2) < rows.row(1));
1992 assert!(rows.row(2) < rows.row(0));
1993 assert!(rows.row(1) < rows.row(0));
1994 let cols = converter.convert_rows(&rows).unwrap();
1995 assert_eq!(&cols[0], &col);
1996 }
1997
1998 #[test]
1999 fn test_timezone() {
2000 let a =
2001 TimestampNanosecondArray::from(vec![1, 2, 3, 4, 5]).with_timezone("+01:00".to_string());
2002 let d = a.data_type().clone();
2003
2004 let converter = RowConverter::new(vec![SortField::new(a.data_type().clone())]).unwrap();
2005 let rows = converter.convert_columns(&[Arc::new(a) as _]).unwrap();
2006 let back = converter.convert_rows(&rows).unwrap();
2007 assert_eq!(back.len(), 1);
2008 assert_eq!(back[0].data_type(), &d);
2009
2010 let mut a = PrimitiveDictionaryBuilder::<Int32Type, TimestampNanosecondType>::new();
2012 a.append(34).unwrap();
2013 a.append_null();
2014 a.append(345).unwrap();
2015
2016 let dict = a.finish();
2018 let values = TimestampNanosecondArray::from(dict.values().to_data());
2019 let dict_with_tz = dict.with_values(Arc::new(values.with_timezone("+02:00")));
2020 let v = DataType::Timestamp(TimeUnit::Nanosecond, Some("+02:00".into()));
2021 let d = DataType::Dictionary(Box::new(DataType::Int32), Box::new(v.clone()));
2022
2023 assert_eq!(dict_with_tz.data_type(), &d);
2024 let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
2025 let rows = converter
2026 .convert_columns(&[Arc::new(dict_with_tz) as _])
2027 .unwrap();
2028 let back = converter.convert_rows(&rows).unwrap();
2029 assert_eq!(back.len(), 1);
2030 assert_eq!(back[0].data_type(), &v);
2031 }
2032
2033 #[test]
2034 fn test_null_encoding() {
2035 let col = Arc::new(NullArray::new(10));
2036 let converter = RowConverter::new(vec![SortField::new(DataType::Null)]).unwrap();
2037 let rows = converter.convert_columns(&[col]).unwrap();
2038 assert_eq!(rows.num_rows(), 10);
2039 assert_eq!(rows.row(1).data.len(), 0);
2040 }
2041
2042 #[test]
2043 fn test_variable_width() {
2044 let col = Arc::new(StringArray::from_iter([
2045 Some("hello"),
2046 Some("he"),
2047 None,
2048 Some("foo"),
2049 Some(""),
2050 ])) as ArrayRef;
2051
2052 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2053 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2054
2055 assert!(rows.row(1) < rows.row(0));
2056 assert!(rows.row(2) < rows.row(4));
2057 assert!(rows.row(3) < rows.row(0));
2058 assert!(rows.row(3) < rows.row(1));
2059
2060 let cols = converter.convert_rows(&rows).unwrap();
2061 assert_eq!(&cols[0], &col);
2062
2063 let col = Arc::new(BinaryArray::from_iter([
2064 None,
2065 Some(vec![0_u8; 0]),
2066 Some(vec![0_u8; 6]),
2067 Some(vec![0_u8; variable::MINI_BLOCK_SIZE]),
2068 Some(vec![0_u8; variable::MINI_BLOCK_SIZE + 1]),
2069 Some(vec![0_u8; variable::BLOCK_SIZE]),
2070 Some(vec![0_u8; variable::BLOCK_SIZE + 1]),
2071 Some(vec![1_u8; 6]),
2072 Some(vec![1_u8; variable::MINI_BLOCK_SIZE]),
2073 Some(vec![1_u8; variable::MINI_BLOCK_SIZE + 1]),
2074 Some(vec![1_u8; variable::BLOCK_SIZE]),
2075 Some(vec![1_u8; variable::BLOCK_SIZE + 1]),
2076 Some(vec![0xFF_u8; 6]),
2077 Some(vec![0xFF_u8; variable::MINI_BLOCK_SIZE]),
2078 Some(vec![0xFF_u8; variable::MINI_BLOCK_SIZE + 1]),
2079 Some(vec![0xFF_u8; variable::BLOCK_SIZE]),
2080 Some(vec![0xFF_u8; variable::BLOCK_SIZE + 1]),
2081 ])) as ArrayRef;
2082
2083 let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
2084 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2085
2086 for i in 0..rows.num_rows() {
2087 for j in i + 1..rows.num_rows() {
2088 assert!(
2089 rows.row(i) < rows.row(j),
2090 "{} < {} - {:?} < {:?}",
2091 i,
2092 j,
2093 rows.row(i),
2094 rows.row(j)
2095 );
2096 }
2097 }
2098
2099 let cols = converter.convert_rows(&rows).unwrap();
2100 assert_eq!(&cols[0], &col);
2101
2102 let converter = RowConverter::new(vec![SortField::new_with_options(
2103 DataType::Binary,
2104 SortOptions::default().desc().with_nulls_first(false),
2105 )])
2106 .unwrap();
2107 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
2108
2109 for i in 0..rows.num_rows() {
2110 for j in i + 1..rows.num_rows() {
2111 assert!(
2112 rows.row(i) > rows.row(j),
2113 "{} > {} - {:?} > {:?}",
2114 i,
2115 j,
2116 rows.row(i),
2117 rows.row(j)
2118 );
2119 }
2120 }
2121
2122 let cols = converter.convert_rows(&rows).unwrap();
2123 assert_eq!(&cols[0], &col);
2124 }
2125
2126 fn dictionary_eq(a: &dyn Array, b: &dyn Array) {
2128 match b.data_type() {
2129 DataType::Dictionary(_, v) => {
2130 assert_eq!(a.data_type(), v.as_ref());
2131 let b = arrow_cast::cast(b, v).unwrap();
2132 assert_eq!(a, b.as_ref())
2133 }
2134 _ => assert_eq!(a, b),
2135 }
2136 }
2137
2138 #[test]
2139 fn test_string_dictionary() {
2140 let a = Arc::new(DictionaryArray::<Int32Type>::from_iter([
2141 Some("foo"),
2142 Some("hello"),
2143 Some("he"),
2144 None,
2145 Some("hello"),
2146 Some(""),
2147 Some("hello"),
2148 Some("hello"),
2149 ])) as ArrayRef;
2150
2151 let field = SortField::new(a.data_type().clone());
2152 let converter = RowConverter::new(vec![field]).unwrap();
2153 let rows_a = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
2154
2155 assert!(rows_a.row(3) < rows_a.row(5));
2156 assert!(rows_a.row(2) < rows_a.row(1));
2157 assert!(rows_a.row(0) < rows_a.row(1));
2158 assert!(rows_a.row(3) < rows_a.row(0));
2159
2160 assert_eq!(rows_a.row(1), rows_a.row(4));
2161 assert_eq!(rows_a.row(1), rows_a.row(6));
2162 assert_eq!(rows_a.row(1), rows_a.row(7));
2163
2164 let cols = converter.convert_rows(&rows_a).unwrap();
2165 dictionary_eq(&cols[0], &a);
2166
2167 let b = Arc::new(DictionaryArray::<Int32Type>::from_iter([
2168 Some("hello"),
2169 None,
2170 Some("cupcakes"),
2171 ])) as ArrayRef;
2172
2173 let rows_b = converter.convert_columns(&[Arc::clone(&b)]).unwrap();
2174 assert_eq!(rows_a.row(1), rows_b.row(0));
2175 assert_eq!(rows_a.row(3), rows_b.row(1));
2176 assert!(rows_b.row(2) < rows_a.row(0));
2177
2178 let cols = converter.convert_rows(&rows_b).unwrap();
2179 dictionary_eq(&cols[0], &b);
2180
2181 let converter = RowConverter::new(vec![SortField::new_with_options(
2182 a.data_type().clone(),
2183 SortOptions::default().desc().with_nulls_first(false),
2184 )])
2185 .unwrap();
2186
2187 let rows_c = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
2188 assert!(rows_c.row(3) > rows_c.row(5));
2189 assert!(rows_c.row(2) > rows_c.row(1));
2190 assert!(rows_c.row(0) > rows_c.row(1));
2191 assert!(rows_c.row(3) > rows_c.row(0));
2192
2193 let cols = converter.convert_rows(&rows_c).unwrap();
2194 dictionary_eq(&cols[0], &a);
2195
2196 let converter = RowConverter::new(vec![SortField::new_with_options(
2197 a.data_type().clone(),
2198 SortOptions::default().desc().with_nulls_first(true),
2199 )])
2200 .unwrap();
2201
2202 let rows_c = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
2203 assert!(rows_c.row(3) < rows_c.row(5));
2204 assert!(rows_c.row(2) > rows_c.row(1));
2205 assert!(rows_c.row(0) > rows_c.row(1));
2206 assert!(rows_c.row(3) < rows_c.row(0));
2207
2208 let cols = converter.convert_rows(&rows_c).unwrap();
2209 dictionary_eq(&cols[0], &a);
2210 }
2211
2212 #[test]
2213 fn test_struct() {
2214 let a = Arc::new(Int32Array::from(vec![1, 1, 2, 2])) as ArrayRef;
2216 let a_f = Arc::new(Field::new("int", DataType::Int32, false));
2217 let u = Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as ArrayRef;
2218 let u_f = Arc::new(Field::new("s", DataType::Utf8, false));
2219 let s1 = Arc::new(StructArray::from(vec![(a_f, a), (u_f, u)])) as ArrayRef;
2220
2221 let sort_fields = vec![SortField::new(s1.data_type().clone())];
2222 let converter = RowConverter::new(sort_fields).unwrap();
2223 let r1 = converter.convert_columns(&[Arc::clone(&s1)]).unwrap();
2224
2225 for (a, b) in r1.iter().zip(r1.iter().skip(1)) {
2226 assert!(a < b);
2227 }
2228
2229 let back = converter.convert_rows(&r1).unwrap();
2230 assert_eq!(back.len(), 1);
2231 assert_eq!(&back[0], &s1);
2232
2233 let data = s1
2235 .to_data()
2236 .into_builder()
2237 .null_bit_buffer(Some(Buffer::from_slice_ref([0b00001010])))
2238 .null_count(2)
2239 .build()
2240 .unwrap();
2241
2242 let s2 = Arc::new(StructArray::from(data)) as ArrayRef;
2243 let r2 = converter.convert_columns(&[Arc::clone(&s2)]).unwrap();
2244 assert_eq!(r2.row(0), r2.row(2)); assert!(r2.row(0) < r2.row(1)); assert_ne!(r1.row(0), r2.row(0)); assert_eq!(r1.row(1), r2.row(1)); let back = converter.convert_rows(&r2).unwrap();
2250 assert_eq!(back.len(), 1);
2251 assert_eq!(&back[0], &s2);
2252
2253 back[0].to_data().validate_full().unwrap();
2254 }
2255
2256 #[test]
2257 fn test_dictionary_in_struct() {
2258 let builder = StringDictionaryBuilder::<Int32Type>::new();
2259 let mut struct_builder = StructBuilder::new(
2260 vec![Field::new_dictionary(
2261 "foo",
2262 DataType::Int32,
2263 DataType::Utf8,
2264 true,
2265 )],
2266 vec![Box::new(builder)],
2267 );
2268
2269 let dict_builder = struct_builder
2270 .field_builder::<StringDictionaryBuilder<Int32Type>>(0)
2271 .unwrap();
2272
2273 dict_builder.append_value("a");
2275 dict_builder.append_null();
2276 dict_builder.append_value("a");
2277 dict_builder.append_value("b");
2278
2279 for _ in 0..4 {
2280 struct_builder.append(true);
2281 }
2282
2283 let s = Arc::new(struct_builder.finish()) as ArrayRef;
2284 let sort_fields = vec![SortField::new(s.data_type().clone())];
2285 let converter = RowConverter::new(sort_fields).unwrap();
2286 let r = converter.convert_columns(&[Arc::clone(&s)]).unwrap();
2287
2288 let back = converter.convert_rows(&r).unwrap();
2289 let [s2] = back.try_into().unwrap();
2290
2291 assert_ne!(&s.data_type(), &s2.data_type());
2294 s2.to_data().validate_full().unwrap();
2295
2296 let s1_struct = s.as_struct();
2300 let s1_0 = s1_struct.column(0);
2301 let s1_idx_0 = s1_0.as_dictionary::<Int32Type>();
2302 let keys = s1_idx_0.keys();
2303 let values = s1_idx_0.values().as_string::<i32>();
2304 let s2_struct = s2.as_struct();
2306 let s2_0 = s2_struct.column(0);
2307 let s2_idx_0 = s2_0.as_string::<i32>();
2308
2309 for i in 0..keys.len() {
2310 if keys.is_null(i) {
2311 assert!(s2_idx_0.is_null(i));
2312 } else {
2313 let dict_index = keys.value(i) as usize;
2314 assert_eq!(values.value(dict_index), s2_idx_0.value(i));
2315 }
2316 }
2317 }
2318
2319 #[test]
2320 fn test_dictionary_in_struct_empty() {
2321 let ty = DataType::Struct(
2322 vec![Field::new_dictionary(
2323 "foo",
2324 DataType::Int32,
2325 DataType::Int32,
2326 false,
2327 )]
2328 .into(),
2329 );
2330 let s = arrow_array::new_empty_array(&ty);
2331
2332 let sort_fields = vec![SortField::new(s.data_type().clone())];
2333 let converter = RowConverter::new(sort_fields).unwrap();
2334 let r = converter.convert_columns(&[Arc::clone(&s)]).unwrap();
2335
2336 let back = converter.convert_rows(&r).unwrap();
2337 let [s2] = back.try_into().unwrap();
2338
2339 assert_ne!(&s.data_type(), &s2.data_type());
2342 s2.to_data().validate_full().unwrap();
2343 assert_eq!(s.len(), 0);
2344 assert_eq!(s2.len(), 0);
2345 }
2346
2347 #[test]
2348 fn test_list_of_string_dictionary() {
2349 let mut builder = ListBuilder::<StringDictionaryBuilder<Int32Type>>::default();
2350 builder.values().append("a").unwrap();
2352 builder.values().append("b").unwrap();
2353 builder.values().append("zero").unwrap();
2354 builder.values().append_null();
2355 builder.values().append("c").unwrap();
2356 builder.values().append("b").unwrap();
2357 builder.values().append("d").unwrap();
2358 builder.append(true);
2359 builder.append(false);
2361 builder.values().append("e").unwrap();
2363 builder.values().append("zero").unwrap();
2364 builder.values().append("a").unwrap();
2365 builder.append(true);
2366
2367 let a = Arc::new(builder.finish()) as ArrayRef;
2368 let data_type = a.data_type().clone();
2369
2370 let field = SortField::new(data_type.clone());
2371 let converter = RowConverter::new(vec![field]).unwrap();
2372 let rows = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
2373
2374 let back = converter.convert_rows(&rows).unwrap();
2375 assert_eq!(back.len(), 1);
2376 let [a2] = back.try_into().unwrap();
2377
2378 assert_ne!(&a.data_type(), &a2.data_type());
2381
2382 a2.to_data().validate_full().unwrap();
2383
2384 let a2_list = a2.as_list::<i32>();
2385 let a1_list = a.as_list::<i32>();
2386
2387 let a1_0 = a1_list.value(0);
2390 let a1_idx_0 = a1_0.as_dictionary::<Int32Type>();
2391 let keys = a1_idx_0.keys();
2392 let values = a1_idx_0.values().as_string::<i32>();
2393 let a2_0 = a2_list.value(0);
2394 let a2_idx_0 = a2_0.as_string::<i32>();
2395
2396 for i in 0..keys.len() {
2397 if keys.is_null(i) {
2398 assert!(a2_idx_0.is_null(i));
2399 } else {
2400 let dict_index = keys.value(i) as usize;
2401 assert_eq!(values.value(dict_index), a2_idx_0.value(i));
2402 }
2403 }
2404
2405 assert!(a1_list.is_null(1));
2407 assert!(a2_list.is_null(1));
2408
2409 let a1_2 = a1_list.value(2);
2411 let a1_idx_2 = a1_2.as_dictionary::<Int32Type>();
2412 let keys = a1_idx_2.keys();
2413 let values = a1_idx_2.values().as_string::<i32>();
2414 let a2_2 = a2_list.value(2);
2415 let a2_idx_2 = a2_2.as_string::<i32>();
2416
2417 for i in 0..keys.len() {
2418 if keys.is_null(i) {
2419 assert!(a2_idx_2.is_null(i));
2420 } else {
2421 let dict_index = keys.value(i) as usize;
2422 assert_eq!(values.value(dict_index), a2_idx_2.value(i));
2423 }
2424 }
2425 }
2426
2427 #[test]
2428 fn test_primitive_dictionary() {
2429 let mut builder = PrimitiveDictionaryBuilder::<Int32Type, Int32Type>::new();
2430 builder.append(2).unwrap();
2431 builder.append(3).unwrap();
2432 builder.append(0).unwrap();
2433 builder.append_null();
2434 builder.append(5).unwrap();
2435 builder.append(3).unwrap();
2436 builder.append(-1).unwrap();
2437
2438 let a = builder.finish();
2439 let data_type = a.data_type().clone();
2440 let columns = [Arc::new(a) as ArrayRef];
2441
2442 let field = SortField::new(data_type.clone());
2443 let converter = RowConverter::new(vec![field]).unwrap();
2444 let rows = converter.convert_columns(&columns).unwrap();
2445 assert!(rows.row(0) < rows.row(1));
2446 assert!(rows.row(2) < rows.row(0));
2447 assert!(rows.row(3) < rows.row(2));
2448 assert!(rows.row(6) < rows.row(2));
2449 assert!(rows.row(3) < rows.row(6));
2450
2451 let back = converter.convert_rows(&rows).unwrap();
2452 assert_eq!(back.len(), 1);
2453 back[0].to_data().validate_full().unwrap();
2454 }
2455
2456 #[test]
2457 fn test_dictionary_nulls() {
2458 let values = Int32Array::from_iter([Some(1), Some(-1), None, Some(4), None]).into_data();
2459 let keys =
2460 Int32Array::from_iter([Some(0), Some(0), Some(1), Some(2), Some(4), None]).into_data();
2461
2462 let data_type = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Int32));
2463 let data = keys
2464 .into_builder()
2465 .data_type(data_type.clone())
2466 .child_data(vec![values])
2467 .build()
2468 .unwrap();
2469
2470 let columns = [Arc::new(DictionaryArray::<Int32Type>::from(data)) as ArrayRef];
2471 let field = SortField::new(data_type.clone());
2472 let converter = RowConverter::new(vec![field]).unwrap();
2473 let rows = converter.convert_columns(&columns).unwrap();
2474
2475 assert_eq!(rows.row(0), rows.row(1));
2476 assert_eq!(rows.row(3), rows.row(4));
2477 assert_eq!(rows.row(4), rows.row(5));
2478 assert!(rows.row(3) < rows.row(0));
2479 }
2480
2481 #[test]
2482 fn test_from_binary_shared_buffer() {
2483 let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
2484 let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _;
2485 let rows = converter.convert_columns(&[array]).unwrap();
2486 let binary_rows = rows.try_into_binary().expect("known-small rows");
2487 let _binary_rows_shared_buffer = binary_rows.clone();
2488
2489 let parsed = converter.from_binary(binary_rows);
2490
2491 converter.convert_rows(parsed.iter()).unwrap();
2492 }
2493
2494 #[test]
2495 #[should_panic(expected = "Encountered non UTF-8 data")]
2496 fn test_invalid_utf8() {
2497 let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
2498 let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _;
2499 let rows = converter.convert_columns(&[array]).unwrap();
2500 let binary_row = rows.row(0);
2501
2502 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2503 let parser = converter.parser();
2504 let utf8_row = parser.parse(binary_row.as_ref());
2505
2506 converter.convert_rows(std::iter::once(utf8_row)).unwrap();
2507 }
2508
2509 #[test]
2510 #[should_panic(expected = "Encountered non UTF-8 data")]
2511 fn test_invalid_utf8_array() {
2512 let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
2513 let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _;
2514 let rows = converter.convert_columns(&[array]).unwrap();
2515 let binary_rows = rows.try_into_binary().expect("known-small rows");
2516
2517 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2518 let parsed = converter.from_binary(binary_rows);
2519
2520 converter.convert_rows(parsed.iter()).unwrap();
2521 }
2522
2523 #[test]
2524 #[should_panic(expected = "index out of bounds")]
2525 fn test_invalid_empty() {
2526 let binary_row: &[u8] = &[];
2527
2528 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2529 let parser = converter.parser();
2530 let utf8_row = parser.parse(binary_row.as_ref());
2531
2532 converter.convert_rows(std::iter::once(utf8_row)).unwrap();
2533 }
2534
2535 #[test]
2536 #[should_panic(expected = "index out of bounds")]
2537 fn test_invalid_empty_array() {
2538 let row: &[u8] = &[];
2539 let binary_rows = BinaryArray::from(vec![row]);
2540
2541 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2542 let parsed = converter.from_binary(binary_rows);
2543
2544 converter.convert_rows(parsed.iter()).unwrap();
2545 }
2546
2547 #[test]
2548 #[should_panic(expected = "index out of bounds")]
2549 fn test_invalid_truncated() {
2550 let binary_row: &[u8] = &[0x02];
2551
2552 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2553 let parser = converter.parser();
2554 let utf8_row = parser.parse(binary_row.as_ref());
2555
2556 converter.convert_rows(std::iter::once(utf8_row)).unwrap();
2557 }
2558
2559 #[test]
2560 #[should_panic(expected = "index out of bounds")]
2561 fn test_invalid_truncated_array() {
2562 let row: &[u8] = &[0x02];
2563 let binary_rows = BinaryArray::from(vec![row]);
2564
2565 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
2566 let parsed = converter.from_binary(binary_rows);
2567
2568 converter.convert_rows(parsed.iter()).unwrap();
2569 }
2570
2571 #[test]
2572 #[should_panic(expected = "rows were not produced by this RowConverter")]
2573 fn test_different_converter() {
2574 let values = Arc::new(Int32Array::from_iter([Some(1), Some(-1)]));
2575 let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
2576 let rows = converter.convert_columns(&[values]).unwrap();
2577
2578 let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
2579 let _ = converter.convert_rows(&rows);
2580 }
2581
2582 fn test_single_list<O: OffsetSizeTrait>() {
2583 let mut builder = GenericListBuilder::<O, _>::new(Int32Builder::new());
2584 builder.values().append_value(32);
2585 builder.values().append_value(52);
2586 builder.values().append_value(32);
2587 builder.append(true);
2588 builder.values().append_value(32);
2589 builder.values().append_value(52);
2590 builder.values().append_value(12);
2591 builder.append(true);
2592 builder.values().append_value(32);
2593 builder.values().append_value(52);
2594 builder.append(true);
2595 builder.values().append_value(32); builder.values().append_value(52); builder.append(false);
2598 builder.values().append_value(32);
2599 builder.values().append_null();
2600 builder.append(true);
2601 builder.append(true);
2602 builder.values().append_value(17); builder.values().append_null(); builder.append(false);
2605
2606 let list = Arc::new(builder.finish()) as ArrayRef;
2607 let d = list.data_type().clone();
2608
2609 let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
2610
2611 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2612 assert!(rows.row(0) > rows.row(1)); assert!(rows.row(2) < rows.row(1)); assert!(rows.row(3) < rows.row(2)); assert!(rows.row(4) < rows.row(2)); assert!(rows.row(5) < rows.row(2)); assert!(rows.row(3) < rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
2621 assert_eq!(back.len(), 1);
2622 back[0].to_data().validate_full().unwrap();
2623 assert_eq!(&back[0], &list);
2624
2625 let options = SortOptions::default().asc().with_nulls_first(false);
2626 let field = SortField::new_with_options(d.clone(), options);
2627 let converter = RowConverter::new(vec![field]).unwrap();
2628 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2629
2630 assert!(rows.row(0) > rows.row(1)); assert!(rows.row(2) < rows.row(1)); assert!(rows.row(3) > rows.row(2)); assert!(rows.row(4) > rows.row(2)); assert!(rows.row(5) < rows.row(2)); assert!(rows.row(3) > rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
2639 assert_eq!(back.len(), 1);
2640 back[0].to_data().validate_full().unwrap();
2641 assert_eq!(&back[0], &list);
2642
2643 let options = SortOptions::default().desc().with_nulls_first(false);
2644 let field = SortField::new_with_options(d.clone(), options);
2645 let converter = RowConverter::new(vec![field]).unwrap();
2646 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2647
2648 assert!(rows.row(0) < rows.row(1)); assert!(rows.row(2) > rows.row(1)); assert!(rows.row(3) > rows.row(2)); assert!(rows.row(4) > rows.row(2)); assert!(rows.row(5) > rows.row(2)); assert!(rows.row(3) > rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
2657 assert_eq!(back.len(), 1);
2658 back[0].to_data().validate_full().unwrap();
2659 assert_eq!(&back[0], &list);
2660
2661 let options = SortOptions::default().desc().with_nulls_first(true);
2662 let field = SortField::new_with_options(d, options);
2663 let converter = RowConverter::new(vec![field]).unwrap();
2664 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2665
2666 assert!(rows.row(0) < rows.row(1)); assert!(rows.row(2) > rows.row(1)); assert!(rows.row(3) < rows.row(2)); assert!(rows.row(4) < rows.row(2)); assert!(rows.row(5) > rows.row(2)); assert!(rows.row(3) < rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
2675 assert_eq!(back.len(), 1);
2676 back[0].to_data().validate_full().unwrap();
2677 assert_eq!(&back[0], &list);
2678
2679 let sliced_list = list.slice(1, 5);
2680 let rows_on_sliced_list = converter
2681 .convert_columns(&[Arc::clone(&sliced_list)])
2682 .unwrap();
2683
2684 assert!(rows_on_sliced_list.row(1) > rows_on_sliced_list.row(0)); assert!(rows_on_sliced_list.row(2) < rows_on_sliced_list.row(1)); assert!(rows_on_sliced_list.row(3) < rows_on_sliced_list.row(1)); assert!(rows_on_sliced_list.row(4) > rows_on_sliced_list.row(1)); assert!(rows_on_sliced_list.row(2) < rows_on_sliced_list.row(4)); let back = converter.convert_rows(&rows_on_sliced_list).unwrap();
2691 assert_eq!(back.len(), 1);
2692 back[0].to_data().validate_full().unwrap();
2693 assert_eq!(&back[0], &sliced_list);
2694 }
2695
2696 fn test_nested_list<O: OffsetSizeTrait>() {
2697 let mut builder =
2698 GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(Int32Builder::new()));
2699
2700 builder.values().values().append_value(1);
2701 builder.values().values().append_value(2);
2702 builder.values().append(true);
2703 builder.values().values().append_value(1);
2704 builder.values().values().append_null();
2705 builder.values().append(true);
2706 builder.append(true);
2707
2708 builder.values().values().append_value(1);
2709 builder.values().values().append_null();
2710 builder.values().append(true);
2711 builder.values().values().append_value(1);
2712 builder.values().values().append_null();
2713 builder.values().append(true);
2714 builder.append(true);
2715
2716 builder.values().values().append_value(1);
2717 builder.values().values().append_null();
2718 builder.values().append(true);
2719 builder.values().append(false);
2720 builder.append(true);
2721 builder.append(false);
2722
2723 builder.values().values().append_value(1);
2724 builder.values().values().append_value(2);
2725 builder.values().append(true);
2726 builder.append(true);
2727
2728 let list = Arc::new(builder.finish()) as ArrayRef;
2729 let d = list.data_type().clone();
2730
2731 let options = SortOptions::default().asc().with_nulls_first(true);
2739 let field = SortField::new_with_options(d.clone(), options);
2740 let converter = RowConverter::new(vec![field]).unwrap();
2741 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2742
2743 assert!(rows.row(0) > rows.row(1));
2744 assert!(rows.row(1) > rows.row(2));
2745 assert!(rows.row(2) > rows.row(3));
2746 assert!(rows.row(4) < rows.row(0));
2747 assert!(rows.row(4) > rows.row(1));
2748
2749 let back = converter.convert_rows(&rows).unwrap();
2750 assert_eq!(back.len(), 1);
2751 back[0].to_data().validate_full().unwrap();
2752 assert_eq!(&back[0], &list);
2753
2754 let options = SortOptions::default().desc().with_nulls_first(true);
2755 let field = SortField::new_with_options(d.clone(), options);
2756 let converter = RowConverter::new(vec![field]).unwrap();
2757 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2758
2759 assert!(rows.row(0) > rows.row(1));
2760 assert!(rows.row(1) > rows.row(2));
2761 assert!(rows.row(2) > rows.row(3));
2762 assert!(rows.row(4) > rows.row(0));
2763 assert!(rows.row(4) > rows.row(1));
2764
2765 let back = converter.convert_rows(&rows).unwrap();
2766 assert_eq!(back.len(), 1);
2767 back[0].to_data().validate_full().unwrap();
2768 assert_eq!(&back[0], &list);
2769
2770 let options = SortOptions::default().desc().with_nulls_first(false);
2771 let field = SortField::new_with_options(d, options);
2772 let converter = RowConverter::new(vec![field]).unwrap();
2773 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2774
2775 assert!(rows.row(0) < rows.row(1));
2776 assert!(rows.row(1) < rows.row(2));
2777 assert!(rows.row(2) < rows.row(3));
2778 assert!(rows.row(4) > rows.row(0));
2779 assert!(rows.row(4) < rows.row(1));
2780
2781 let back = converter.convert_rows(&rows).unwrap();
2782 assert_eq!(back.len(), 1);
2783 back[0].to_data().validate_full().unwrap();
2784 assert_eq!(&back[0], &list);
2785
2786 let sliced_list = list.slice(1, 3);
2787 let rows = converter
2788 .convert_columns(&[Arc::clone(&sliced_list)])
2789 .unwrap();
2790
2791 assert!(rows.row(0) < rows.row(1));
2792 assert!(rows.row(1) < rows.row(2));
2793
2794 let back = converter.convert_rows(&rows).unwrap();
2795 assert_eq!(back.len(), 1);
2796 back[0].to_data().validate_full().unwrap();
2797 assert_eq!(&back[0], &sliced_list);
2798 }
2799
2800 #[test]
2801 fn test_list() {
2802 test_single_list::<i32>();
2803 test_nested_list::<i32>();
2804 }
2805
2806 #[test]
2807 fn test_large_list() {
2808 test_single_list::<i64>();
2809 test_nested_list::<i64>();
2810 }
2811
2812 #[test]
2813 fn test_fixed_size_list() {
2814 let mut builder = FixedSizeListBuilder::new(Int32Builder::new(), 3);
2815 builder.values().append_value(32);
2816 builder.values().append_value(52);
2817 builder.values().append_value(32);
2818 builder.append(true);
2819 builder.values().append_value(32);
2820 builder.values().append_value(52);
2821 builder.values().append_value(12);
2822 builder.append(true);
2823 builder.values().append_value(32);
2824 builder.values().append_value(52);
2825 builder.values().append_null();
2826 builder.append(true);
2827 builder.values().append_value(32); builder.values().append_value(52); builder.values().append_value(13); builder.append(false);
2831 builder.values().append_value(32);
2832 builder.values().append_null();
2833 builder.values().append_null();
2834 builder.append(true);
2835 builder.values().append_null();
2836 builder.values().append_null();
2837 builder.values().append_null();
2838 builder.append(true);
2839 builder.values().append_value(17); builder.values().append_null(); builder.values().append_value(77); builder.append(false);
2843
2844 let list = Arc::new(builder.finish()) as ArrayRef;
2845 let d = list.data_type().clone();
2846
2847 let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
2849
2850 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2851 assert!(rows.row(0) > rows.row(1)); assert!(rows.row(2) < rows.row(1)); assert!(rows.row(3) < rows.row(2)); assert!(rows.row(4) < rows.row(2)); assert!(rows.row(5) < rows.row(2)); assert!(rows.row(3) < rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
2860 assert_eq!(back.len(), 1);
2861 back[0].to_data().validate_full().unwrap();
2862 assert_eq!(&back[0], &list);
2863
2864 let options = SortOptions::default().asc().with_nulls_first(false);
2866 let field = SortField::new_with_options(d.clone(), options);
2867 let converter = RowConverter::new(vec![field]).unwrap();
2868 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2869 assert!(rows.row(0) > rows.row(1)); assert!(rows.row(2) > rows.row(1)); assert!(rows.row(3) > rows.row(2)); assert!(rows.row(4) > rows.row(2)); assert!(rows.row(5) > rows.row(2)); assert!(rows.row(3) > rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
2878 assert_eq!(back.len(), 1);
2879 back[0].to_data().validate_full().unwrap();
2880 assert_eq!(&back[0], &list);
2881
2882 let options = SortOptions::default().desc().with_nulls_first(false);
2884 let field = SortField::new_with_options(d.clone(), options);
2885 let converter = RowConverter::new(vec![field]).unwrap();
2886 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2887 assert!(rows.row(0) < rows.row(1)); assert!(rows.row(2) > rows.row(1)); assert!(rows.row(3) > rows.row(2)); assert!(rows.row(4) > rows.row(2)); assert!(rows.row(5) > rows.row(2)); assert!(rows.row(3) > rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
2896 assert_eq!(back.len(), 1);
2897 back[0].to_data().validate_full().unwrap();
2898 assert_eq!(&back[0], &list);
2899
2900 let options = SortOptions::default().desc().with_nulls_first(true);
2902 let field = SortField::new_with_options(d, options);
2903 let converter = RowConverter::new(vec![field]).unwrap();
2904 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2905
2906 assert!(rows.row(0) < rows.row(1)); assert!(rows.row(2) < rows.row(1)); assert!(rows.row(3) < rows.row(2)); assert!(rows.row(4) < rows.row(2)); assert!(rows.row(5) < rows.row(2)); assert!(rows.row(3) < rows.row(5)); assert_eq!(rows.row(3), rows.row(6)); let back = converter.convert_rows(&rows).unwrap();
2915 assert_eq!(back.len(), 1);
2916 back[0].to_data().validate_full().unwrap();
2917 assert_eq!(&back[0], &list);
2918
2919 let sliced_list = list.slice(1, 5);
2920 let rows_on_sliced_list = converter
2921 .convert_columns(&[Arc::clone(&sliced_list)])
2922 .unwrap();
2923
2924 assert!(rows_on_sliced_list.row(2) < rows_on_sliced_list.row(1)); assert!(rows_on_sliced_list.row(3) < rows_on_sliced_list.row(1)); assert!(rows_on_sliced_list.row(4) < rows_on_sliced_list.row(1)); assert!(rows_on_sliced_list.row(2) < rows_on_sliced_list.row(4)); let back = converter.convert_rows(&rows_on_sliced_list).unwrap();
2930 assert_eq!(back.len(), 1);
2931 back[0].to_data().validate_full().unwrap();
2932 assert_eq!(&back[0], &sliced_list);
2933 }
2934
2935 #[test]
2936 fn test_two_fixed_size_lists() {
2937 let mut first = FixedSizeListBuilder::new(UInt8Builder::new(), 1);
2938 first.values().append_value(100);
2940 first.append(true);
2941 first.values().append_value(101);
2943 first.append(true);
2944 first.values().append_value(102);
2946 first.append(true);
2947 first.values().append_null();
2949 first.append(true);
2950 first.values().append_null(); first.append(false);
2953 let first = Arc::new(first.finish()) as ArrayRef;
2954 let first_type = first.data_type().clone();
2955
2956 let mut second = FixedSizeListBuilder::new(UInt8Builder::new(), 1);
2957 second.values().append_value(200);
2959 second.append(true);
2960 second.values().append_value(201);
2962 second.append(true);
2963 second.values().append_value(202);
2965 second.append(true);
2966 second.values().append_null();
2968 second.append(true);
2969 second.values().append_null(); second.append(false);
2972 let second = Arc::new(second.finish()) as ArrayRef;
2973 let second_type = second.data_type().clone();
2974
2975 let converter = RowConverter::new(vec![
2976 SortField::new(first_type.clone()),
2977 SortField::new(second_type.clone()),
2978 ])
2979 .unwrap();
2980
2981 let rows = converter
2982 .convert_columns(&[Arc::clone(&first), Arc::clone(&second)])
2983 .unwrap();
2984
2985 let back = converter.convert_rows(&rows).unwrap();
2986 assert_eq!(back.len(), 2);
2987 back[0].to_data().validate_full().unwrap();
2988 assert_eq!(&back[0], &first);
2989 back[1].to_data().validate_full().unwrap();
2990 assert_eq!(&back[1], &second);
2991 }
2992
2993 #[test]
2994 fn test_fixed_size_list_with_variable_width_content() {
2995 let mut first = FixedSizeListBuilder::new(
2996 StructBuilder::from_fields(
2997 vec![
2998 Field::new(
2999 "timestamp",
3000 DataType::Timestamp(TimeUnit::Microsecond, Some(Arc::from("UTC"))),
3001 false,
3002 ),
3003 Field::new("offset_minutes", DataType::Int16, false),
3004 Field::new("time_zone", DataType::Utf8, false),
3005 ],
3006 1,
3007 ),
3008 1,
3009 );
3010 first
3012 .values()
3013 .field_builder::<TimestampMicrosecondBuilder>(0)
3014 .unwrap()
3015 .append_null();
3016 first
3017 .values()
3018 .field_builder::<Int16Builder>(1)
3019 .unwrap()
3020 .append_null();
3021 first
3022 .values()
3023 .field_builder::<StringBuilder>(2)
3024 .unwrap()
3025 .append_null();
3026 first.values().append(false);
3027 first.append(false);
3028 first
3030 .values()
3031 .field_builder::<TimestampMicrosecondBuilder>(0)
3032 .unwrap()
3033 .append_null();
3034 first
3035 .values()
3036 .field_builder::<Int16Builder>(1)
3037 .unwrap()
3038 .append_null();
3039 first
3040 .values()
3041 .field_builder::<StringBuilder>(2)
3042 .unwrap()
3043 .append_null();
3044 first.values().append(false);
3045 first.append(true);
3046 first
3048 .values()
3049 .field_builder::<TimestampMicrosecondBuilder>(0)
3050 .unwrap()
3051 .append_value(0);
3052 first
3053 .values()
3054 .field_builder::<Int16Builder>(1)
3055 .unwrap()
3056 .append_value(0);
3057 first
3058 .values()
3059 .field_builder::<StringBuilder>(2)
3060 .unwrap()
3061 .append_value("UTC");
3062 first.values().append(true);
3063 first.append(true);
3064 first
3066 .values()
3067 .field_builder::<TimestampMicrosecondBuilder>(0)
3068 .unwrap()
3069 .append_value(1126351800123456);
3070 first
3071 .values()
3072 .field_builder::<Int16Builder>(1)
3073 .unwrap()
3074 .append_value(120);
3075 first
3076 .values()
3077 .field_builder::<StringBuilder>(2)
3078 .unwrap()
3079 .append_value("Europe/Warsaw");
3080 first.values().append(true);
3081 first.append(true);
3082 let first = Arc::new(first.finish()) as ArrayRef;
3083 let first_type = first.data_type().clone();
3084
3085 let mut second = StringBuilder::new();
3086 second.append_value("somewhere near");
3087 second.append_null();
3088 second.append_value("Greenwich");
3089 second.append_value("Warsaw");
3090 let second = Arc::new(second.finish()) as ArrayRef;
3091 let second_type = second.data_type().clone();
3092
3093 let converter = RowConverter::new(vec![
3094 SortField::new(first_type.clone()),
3095 SortField::new(second_type.clone()),
3096 ])
3097 .unwrap();
3098
3099 let rows = converter
3100 .convert_columns(&[Arc::clone(&first), Arc::clone(&second)])
3101 .unwrap();
3102
3103 let back = converter.convert_rows(&rows).unwrap();
3104 assert_eq!(back.len(), 2);
3105 back[0].to_data().validate_full().unwrap();
3106 assert_eq!(&back[0], &first);
3107 back[1].to_data().validate_full().unwrap();
3108 assert_eq!(&back[1], &second);
3109 }
3110
3111 fn generate_primitive_array<K>(len: usize, valid_percent: f64) -> PrimitiveArray<K>
3112 where
3113 K: ArrowPrimitiveType,
3114 StandardUniform: Distribution<K::Native>,
3115 {
3116 let mut rng = rng();
3117 (0..len)
3118 .map(|_| rng.random_bool(valid_percent).then(|| rng.random()))
3119 .collect()
3120 }
3121
3122 fn generate_strings<O: OffsetSizeTrait>(
3123 len: usize,
3124 valid_percent: f64,
3125 ) -> GenericStringArray<O> {
3126 let mut rng = rng();
3127 (0..len)
3128 .map(|_| {
3129 rng.random_bool(valid_percent).then(|| {
3130 let len = rng.random_range(0..100);
3131 let bytes = (0..len).map(|_| rng.random_range(0..128)).collect();
3132 String::from_utf8(bytes).unwrap()
3133 })
3134 })
3135 .collect()
3136 }
3137
3138 fn generate_string_view(len: usize, valid_percent: f64) -> StringViewArray {
3139 let mut rng = rng();
3140 (0..len)
3141 .map(|_| {
3142 rng.random_bool(valid_percent).then(|| {
3143 let len = rng.random_range(0..100);
3144 let bytes = (0..len).map(|_| rng.random_range(0..128)).collect();
3145 String::from_utf8(bytes).unwrap()
3146 })
3147 })
3148 .collect()
3149 }
3150
3151 fn generate_byte_view(len: usize, valid_percent: f64) -> BinaryViewArray {
3152 let mut rng = rng();
3153 (0..len)
3154 .map(|_| {
3155 rng.random_bool(valid_percent).then(|| {
3156 let len = rng.random_range(0..100);
3157 let bytes: Vec<_> = (0..len).map(|_| rng.random_range(0..128)).collect();
3158 bytes
3159 })
3160 })
3161 .collect()
3162 }
3163
3164 fn generate_fixed_stringview_column(len: usize) -> StringViewArray {
3165 let edge_cases = vec![
3166 Some("bar".to_string()),
3167 Some("bar\0".to_string()),
3168 Some("LongerThan12Bytes".to_string()),
3169 Some("LongerThan12Bytez".to_string()),
3170 Some("LongerThan12Bytes\0".to_string()),
3171 Some("LongerThan12Byt".to_string()),
3172 Some("backend one".to_string()),
3173 Some("backend two".to_string()),
3174 Some("a".repeat(257)),
3175 Some("a".repeat(300)),
3176 ];
3177
3178 let mut values = Vec::with_capacity(len);
3180 for i in 0..len {
3181 values.push(
3182 edge_cases
3183 .get(i % edge_cases.len())
3184 .cloned()
3185 .unwrap_or(None),
3186 );
3187 }
3188
3189 StringViewArray::from(values)
3190 }
3191
3192 fn generate_dictionary<K>(
3193 values: ArrayRef,
3194 len: usize,
3195 valid_percent: f64,
3196 ) -> DictionaryArray<K>
3197 where
3198 K: ArrowDictionaryKeyType,
3199 K::Native: SampleUniform,
3200 {
3201 let mut rng = rng();
3202 let min_key = K::Native::from_usize(0).unwrap();
3203 let max_key = K::Native::from_usize(values.len()).unwrap();
3204 let keys: PrimitiveArray<K> = (0..len)
3205 .map(|_| {
3206 rng.random_bool(valid_percent)
3207 .then(|| rng.random_range(min_key..max_key))
3208 })
3209 .collect();
3210
3211 let data_type =
3212 DataType::Dictionary(Box::new(K::DATA_TYPE), Box::new(values.data_type().clone()));
3213
3214 let data = keys
3215 .into_data()
3216 .into_builder()
3217 .data_type(data_type)
3218 .add_child_data(values.to_data())
3219 .build()
3220 .unwrap();
3221
3222 DictionaryArray::from(data)
3223 }
3224
3225 fn generate_fixed_size_binary(len: usize, valid_percent: f64) -> FixedSizeBinaryArray {
3226 let mut rng = rng();
3227 let width = rng.random_range(0..20);
3228 let mut builder = FixedSizeBinaryBuilder::new(width);
3229
3230 let mut b = vec![0; width as usize];
3231 for _ in 0..len {
3232 match rng.random_bool(valid_percent) {
3233 true => {
3234 b.iter_mut().for_each(|x| *x = rng.random());
3235 builder.append_value(&b).unwrap();
3236 }
3237 false => builder.append_null(),
3238 }
3239 }
3240
3241 builder.finish()
3242 }
3243
3244 fn generate_struct(len: usize, valid_percent: f64) -> StructArray {
3245 let mut rng = rng();
3246 let nulls = NullBuffer::from_iter((0..len).map(|_| rng.random_bool(valid_percent)));
3247 let a = generate_primitive_array::<Int32Type>(len, valid_percent);
3248 let b = generate_strings::<i32>(len, valid_percent);
3249 let fields = Fields::from(vec![
3250 Field::new("a", DataType::Int32, true),
3251 Field::new("b", DataType::Utf8, true),
3252 ]);
3253 let values = vec![Arc::new(a) as _, Arc::new(b) as _];
3254 StructArray::new(fields, values, Some(nulls))
3255 }
3256
3257 fn generate_list<F>(len: usize, valid_percent: f64, values: F) -> ListArray
3258 where
3259 F: FnOnce(usize) -> ArrayRef,
3260 {
3261 let mut rng = rng();
3262 let offsets = OffsetBuffer::<i32>::from_lengths((0..len).map(|_| rng.random_range(0..10)));
3263 let values_len = offsets.last().unwrap().to_usize().unwrap();
3264 let values = values(values_len);
3265 let nulls = NullBuffer::from_iter((0..len).map(|_| rng.random_bool(valid_percent)));
3266 let field = Arc::new(Field::new_list_field(values.data_type().clone(), true));
3267 ListArray::new(field, offsets, values, Some(nulls))
3268 }
3269
3270 fn generate_column(len: usize) -> ArrayRef {
3271 let mut rng = rng();
3272 match rng.random_range(0..18) {
3273 0 => Arc::new(generate_primitive_array::<Int32Type>(len, 0.8)),
3274 1 => Arc::new(generate_primitive_array::<UInt32Type>(len, 0.8)),
3275 2 => Arc::new(generate_primitive_array::<Int64Type>(len, 0.8)),
3276 3 => Arc::new(generate_primitive_array::<UInt64Type>(len, 0.8)),
3277 4 => Arc::new(generate_primitive_array::<Float32Type>(len, 0.8)),
3278 5 => Arc::new(generate_primitive_array::<Float64Type>(len, 0.8)),
3279 6 => Arc::new(generate_strings::<i32>(len, 0.8)),
3280 7 => Arc::new(generate_dictionary::<Int64Type>(
3281 Arc::new(generate_strings::<i32>(rng.random_range(1..len), 1.0)),
3283 len,
3284 0.8,
3285 )),
3286 8 => Arc::new(generate_dictionary::<Int64Type>(
3287 Arc::new(generate_primitive_array::<Int64Type>(
3289 rng.random_range(1..len),
3290 1.0,
3291 )),
3292 len,
3293 0.8,
3294 )),
3295 9 => Arc::new(generate_fixed_size_binary(len, 0.8)),
3296 10 => Arc::new(generate_struct(len, 0.8)),
3297 11 => Arc::new(generate_list(len, 0.8, |values_len| {
3298 Arc::new(generate_primitive_array::<Int64Type>(values_len, 0.8))
3299 })),
3300 12 => Arc::new(generate_list(len, 0.8, |values_len| {
3301 Arc::new(generate_strings::<i32>(values_len, 0.8))
3302 })),
3303 13 => Arc::new(generate_list(len, 0.8, |values_len| {
3304 Arc::new(generate_struct(values_len, 0.8))
3305 })),
3306 14 => Arc::new(generate_string_view(len, 0.8)),
3307 15 => Arc::new(generate_byte_view(len, 0.8)),
3308 16 => Arc::new(generate_fixed_stringview_column(len)),
3309 17 => Arc::new(
3310 generate_list(len + 1000, 0.8, |values_len| {
3311 Arc::new(generate_primitive_array::<Int64Type>(values_len, 0.8))
3312 })
3313 .slice(500, len),
3314 ),
3315 _ => unreachable!(),
3316 }
3317 }
3318
3319 fn print_row(cols: &[SortColumn], row: usize) -> String {
3320 let t: Vec<_> = cols
3321 .iter()
3322 .map(|x| match x.values.is_valid(row) {
3323 true => {
3324 let opts = FormatOptions::default().with_null("NULL");
3325 let formatter = ArrayFormatter::try_new(x.values.as_ref(), &opts).unwrap();
3326 formatter.value(row).to_string()
3327 }
3328 false => "NULL".to_string(),
3329 })
3330 .collect();
3331 t.join(",")
3332 }
3333
3334 fn print_col_types(cols: &[SortColumn]) -> String {
3335 let t: Vec<_> = cols
3336 .iter()
3337 .map(|x| x.values.data_type().to_string())
3338 .collect();
3339 t.join(",")
3340 }
3341
3342 #[test]
3343 #[cfg_attr(miri, ignore)]
3344 fn fuzz_test() {
3345 for _ in 0..100 {
3346 let mut rng = rng();
3347 let num_columns = rng.random_range(1..5);
3348 let len = rng.random_range(5..100);
3349 let arrays: Vec<_> = (0..num_columns).map(|_| generate_column(len)).collect();
3350
3351 let options: Vec<_> = (0..num_columns)
3352 .map(|_| SortOptions {
3353 descending: rng.random_bool(0.5),
3354 nulls_first: rng.random_bool(0.5),
3355 })
3356 .collect();
3357
3358 let sort_columns: Vec<_> = options
3359 .iter()
3360 .zip(&arrays)
3361 .map(|(o, c)| SortColumn {
3362 values: Arc::clone(c),
3363 options: Some(*o),
3364 })
3365 .collect();
3366
3367 let comparator = LexicographicalComparator::try_new(&sort_columns).unwrap();
3368
3369 let columns: Vec<SortField> = options
3370 .into_iter()
3371 .zip(&arrays)
3372 .map(|(o, a)| SortField::new_with_options(a.data_type().clone(), o))
3373 .collect();
3374
3375 let converter = RowConverter::new(columns).unwrap();
3376 let rows = converter.convert_columns(&arrays).unwrap();
3377
3378 for i in 0..len {
3379 for j in 0..len {
3380 let row_i = rows.row(i);
3381 let row_j = rows.row(j);
3382 let row_cmp = row_i.cmp(&row_j);
3383 let lex_cmp = comparator.compare(i, j);
3384 assert_eq!(
3385 row_cmp,
3386 lex_cmp,
3387 "({:?} vs {:?}) vs ({:?} vs {:?}) for types {}",
3388 print_row(&sort_columns, i),
3389 print_row(&sort_columns, j),
3390 row_i,
3391 row_j,
3392 print_col_types(&sort_columns)
3393 );
3394 }
3395 }
3396
3397 let back = converter.convert_rows(&rows).unwrap();
3400 for (actual, expected) in back.iter().zip(&arrays) {
3401 actual.to_data().validate_full().unwrap();
3402 dictionary_eq(actual, expected)
3403 }
3404
3405 let rows = rows.try_into_binary().expect("reasonable size");
3408 let parser = converter.parser();
3409 let back = converter
3410 .convert_rows(rows.iter().map(|b| parser.parse(b.expect("valid bytes"))))
3411 .unwrap();
3412 for (actual, expected) in back.iter().zip(&arrays) {
3413 actual.to_data().validate_full().unwrap();
3414 dictionary_eq(actual, expected)
3415 }
3416
3417 let rows = converter.from_binary(rows);
3418 let back = converter.convert_rows(&rows).unwrap();
3419 for (actual, expected) in back.iter().zip(&arrays) {
3420 actual.to_data().validate_full().unwrap();
3421 dictionary_eq(actual, expected)
3422 }
3423 }
3424 }
3425
3426 #[test]
3427 fn test_clear() {
3428 let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
3429 let mut rows = converter.empty_rows(3, 128);
3430
3431 let first = Int32Array::from(vec![None, Some(2), Some(4)]);
3432 let second = Int32Array::from(vec![Some(2), None, Some(4)]);
3433 let arrays = [Arc::new(first) as ArrayRef, Arc::new(second) as ArrayRef];
3434
3435 for array in arrays.iter() {
3436 rows.clear();
3437 converter
3438 .append(&mut rows, std::slice::from_ref(array))
3439 .unwrap();
3440 let back = converter.convert_rows(&rows).unwrap();
3441 assert_eq!(&back[0], array);
3442 }
3443
3444 let mut rows_expected = converter.empty_rows(3, 128);
3445 converter.append(&mut rows_expected, &arrays[1..]).unwrap();
3446
3447 for (i, (actual, expected)) in rows.iter().zip(rows_expected.iter()).enumerate() {
3448 assert_eq!(
3449 actual, expected,
3450 "For row {i}: expected {expected:?}, actual: {actual:?}",
3451 );
3452 }
3453 }
3454
3455 #[test]
3456 fn test_append_codec_dictionary_binary() {
3457 use DataType::*;
3458 let converter = RowConverter::new(vec![SortField::new(Dictionary(
3460 Box::new(Int32),
3461 Box::new(Binary),
3462 ))])
3463 .unwrap();
3464 let mut rows = converter.empty_rows(4, 128);
3465
3466 let keys = Int32Array::from_iter_values([0, 1, 2, 3]);
3467 let values = BinaryArray::from(vec![
3468 Some("a".as_bytes()),
3469 Some(b"b"),
3470 Some(b"c"),
3471 Some(b"d"),
3472 ]);
3473 let dict_array = DictionaryArray::new(keys, Arc::new(values));
3474
3475 rows.clear();
3476 let array = Arc::new(dict_array) as ArrayRef;
3477 converter
3478 .append(&mut rows, std::slice::from_ref(&array))
3479 .unwrap();
3480 let back = converter.convert_rows(&rows).unwrap();
3481
3482 dictionary_eq(&back[0], &array);
3483 }
3484
3485 #[test]
3486 fn test_list_prefix() {
3487 let mut a = ListBuilder::new(Int8Builder::new());
3488 a.append_value([None]);
3489 a.append_value([None, None]);
3490 let a = a.finish();
3491
3492 let converter = RowConverter::new(vec![SortField::new(a.data_type().clone())]).unwrap();
3493 let rows = converter.convert_columns(&[Arc::new(a) as _]).unwrap();
3494 assert_eq!(rows.row(0).cmp(&rows.row(1)), Ordering::Less);
3495 }
3496
3497 #[test]
3498 fn map_should_be_marked_as_unsupported() {
3499 let map_data_type = Field::new_map(
3500 "map",
3501 "entries",
3502 Field::new("key", DataType::Utf8, false),
3503 Field::new("value", DataType::Utf8, true),
3504 false,
3505 true,
3506 )
3507 .data_type()
3508 .clone();
3509
3510 let is_supported = RowConverter::supports_fields(&[SortField::new(map_data_type)]);
3511
3512 assert!(!is_supported, "Map should not be supported");
3513 }
3514
3515 #[test]
3516 fn should_fail_to_create_row_converter_for_unsupported_map_type() {
3517 let map_data_type = Field::new_map(
3518 "map",
3519 "entries",
3520 Field::new("key", DataType::Utf8, false),
3521 Field::new("value", DataType::Utf8, true),
3522 false,
3523 true,
3524 )
3525 .data_type()
3526 .clone();
3527
3528 let converter = RowConverter::new(vec![SortField::new(map_data_type)]);
3529
3530 match converter {
3531 Err(ArrowError::NotYetImplemented(message)) => {
3532 assert!(
3533 message.contains("Row format support not yet implemented for"),
3534 "Expected NotYetImplemented error for map data type, got: {message}",
3535 );
3536 }
3537 Err(e) => panic!("Expected NotYetImplemented error, got: {e}"),
3538 Ok(_) => panic!("Expected NotYetImplemented error for map data type"),
3539 }
3540 }
3541
3542 #[test]
3543 fn test_values_buffer_smaller_when_utf8_validation_disabled() {
3544 fn get_values_buffer_len(col: ArrayRef) -> (usize, usize) {
3545 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8View)]).unwrap();
3547
3548 let rows = converter.convert_columns(&[col]).unwrap();
3550 let converted = converter.convert_rows(&rows).unwrap();
3551 let unchecked_values_len = converted[0].as_string_view().data_buffers()[0].len();
3552
3553 let rows = rows.try_into_binary().expect("reasonable size");
3555 let parser = converter.parser();
3556 let converted = converter
3557 .convert_rows(rows.iter().map(|b| parser.parse(b.expect("valid bytes"))))
3558 .unwrap();
3559 let checked_values_len = converted[0].as_string_view().data_buffers()[0].len();
3560 (unchecked_values_len, checked_values_len)
3561 }
3562
3563 let col = Arc::new(StringViewArray::from_iter([
3565 Some("hello"), None, Some("short"), Some("tiny"), ])) as ArrayRef;
3570
3571 let (unchecked_values_len, checked_values_len) = get_values_buffer_len(col);
3572 assert_eq!(unchecked_values_len, 0);
3574 assert_eq!(checked_values_len, 14);
3576
3577 let col = Arc::new(StringViewArray::from_iter([
3579 Some("this is a very long string over 12 bytes"),
3580 Some("another long string to test the buffer"),
3581 ])) as ArrayRef;
3582
3583 let (unchecked_values_len, checked_values_len) = get_values_buffer_len(col);
3584 assert!(unchecked_values_len > 0);
3586 assert_eq!(unchecked_values_len, checked_values_len);
3587
3588 let col = Arc::new(StringViewArray::from_iter([
3590 Some("tiny"), Some("thisisexact13"), None,
3593 Some("short"), ])) as ArrayRef;
3595
3596 let (unchecked_values_len, checked_values_len) = get_values_buffer_len(col);
3597 assert_eq!(unchecked_values_len, 13);
3599 assert!(checked_values_len > unchecked_values_len);
3600 }
3601}