1#![doc(
129 html_logo_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_white-bg.svg",
130 html_favicon_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_transparent-bg.svg"
131)]
132#![cfg_attr(docsrs, feature(doc_auto_cfg))]
133#![warn(missing_docs)]
134use std::cmp::Ordering;
135use std::hash::{Hash, Hasher};
136use std::sync::Arc;
137
138use arrow_array::cast::*;
139use arrow_array::types::ArrowDictionaryKeyType;
140use arrow_array::*;
141use arrow_buffer::{ArrowNativeType, Buffer, OffsetBuffer, ScalarBuffer};
142use arrow_data::ArrayDataBuilder;
143use arrow_schema::*;
144use variable::{decode_binary_view, decode_string_view};
145
146use crate::fixed::{decode_bool, decode_fixed_size_binary, decode_primitive};
147use crate::variable::{decode_binary, decode_string};
148
149mod fixed;
150mod list;
151mod variable;
152
153#[derive(Debug)]
366pub struct RowConverter {
367 fields: Arc<[SortField]>,
368 codecs: Vec<Codec>,
370}
371
372#[derive(Debug)]
373enum Codec {
374 Stateless,
376 Dictionary(RowConverter, OwnedRow),
379 Struct(RowConverter, OwnedRow),
382 List(RowConverter),
384}
385
386impl Codec {
387 fn new(sort_field: &SortField) -> Result<Self, ArrowError> {
388 match &sort_field.data_type {
389 DataType::Dictionary(_, values) => {
390 let sort_field =
391 SortField::new_with_options(values.as_ref().clone(), sort_field.options);
392
393 let converter = RowConverter::new(vec![sort_field])?;
394 let null_array = new_null_array(values.as_ref(), 1);
395 let nulls = converter.convert_columns(&[null_array])?;
396
397 let owned = OwnedRow {
398 data: nulls.buffer.into(),
399 config: nulls.config,
400 };
401 Ok(Self::Dictionary(converter, owned))
402 }
403 d if !d.is_nested() => Ok(Self::Stateless),
404 DataType::List(f) | DataType::LargeList(f) => {
405 let options = SortOptions {
409 descending: false,
410 nulls_first: sort_field.options.nulls_first != sort_field.options.descending,
411 };
412
413 let field = SortField::new_with_options(f.data_type().clone(), options);
414 let converter = RowConverter::new(vec![field])?;
415 Ok(Self::List(converter))
416 }
417 DataType::Struct(f) => {
418 let sort_fields = f
419 .iter()
420 .map(|x| SortField::new_with_options(x.data_type().clone(), sort_field.options))
421 .collect();
422
423 let converter = RowConverter::new(sort_fields)?;
424 let nulls: Vec<_> = f.iter().map(|x| new_null_array(x.data_type(), 1)).collect();
425
426 let nulls = converter.convert_columns(&nulls)?;
427 let owned = OwnedRow {
428 data: nulls.buffer.into(),
429 config: nulls.config,
430 };
431
432 Ok(Self::Struct(converter, owned))
433 }
434 _ => Err(ArrowError::NotYetImplemented(format!(
435 "not yet implemented: {:?}",
436 sort_field.data_type
437 ))),
438 }
439 }
440
441 fn encoder(&self, array: &dyn Array) -> Result<Encoder<'_>, ArrowError> {
442 match self {
443 Codec::Stateless => Ok(Encoder::Stateless),
444 Codec::Dictionary(converter, nulls) => {
445 let values = array.as_any_dictionary().values().clone();
446 let rows = converter.convert_columns(&[values])?;
447 Ok(Encoder::Dictionary(rows, nulls.row()))
448 }
449 Codec::Struct(converter, null) => {
450 let v = as_struct_array(array);
451 let rows = converter.convert_columns(v.columns())?;
452 Ok(Encoder::Struct(rows, null.row()))
453 }
454 Codec::List(converter) => {
455 let values = match array.data_type() {
456 DataType::List(_) => as_list_array(array).values(),
457 DataType::LargeList(_) => as_large_list_array(array).values(),
458 _ => unreachable!(),
459 };
460 let rows = converter.convert_columns(&[values.clone()])?;
461 Ok(Encoder::List(rows))
462 }
463 }
464 }
465
466 fn size(&self) -> usize {
467 match self {
468 Codec::Stateless => 0,
469 Codec::Dictionary(converter, nulls) => converter.size() + nulls.data.len(),
470 Codec::Struct(converter, nulls) => converter.size() + nulls.data.len(),
471 Codec::List(converter) => converter.size(),
472 }
473 }
474}
475
476#[derive(Debug)]
477enum Encoder<'a> {
478 Stateless,
480 Dictionary(Rows, Row<'a>),
482 Struct(Rows, Row<'a>),
488 List(Rows),
490}
491
492#[derive(Debug, Clone, PartialEq, Eq)]
494pub struct SortField {
495 options: SortOptions,
497 data_type: DataType,
499}
500
501impl SortField {
502 pub fn new(data_type: DataType) -> Self {
504 Self::new_with_options(data_type, Default::default())
505 }
506
507 pub fn new_with_options(data_type: DataType, options: SortOptions) -> Self {
509 Self { options, data_type }
510 }
511
512 pub fn size(&self) -> usize {
516 self.data_type.size() + std::mem::size_of::<Self>() - std::mem::size_of::<DataType>()
517 }
518}
519
520impl RowConverter {
521 pub fn new(fields: Vec<SortField>) -> Result<Self, ArrowError> {
523 if !Self::supports_fields(&fields) {
524 return Err(ArrowError::NotYetImplemented(format!(
525 "Row format support not yet implemented for: {fields:?}"
526 )));
527 }
528
529 let codecs = fields.iter().map(Codec::new).collect::<Result<_, _>>()?;
530 Ok(Self {
531 fields: fields.into(),
532 codecs,
533 })
534 }
535
536 pub fn supports_fields(fields: &[SortField]) -> bool {
538 fields.iter().all(|x| Self::supports_datatype(&x.data_type))
539 }
540
541 fn supports_datatype(d: &DataType) -> bool {
542 match d {
543 _ if !d.is_nested() => true,
544 DataType::List(f) | DataType::LargeList(f) | DataType::Map(f, _) => {
545 Self::supports_datatype(f.data_type())
546 }
547 DataType::Struct(f) => f.iter().all(|x| Self::supports_datatype(x.data_type())),
548 _ => false,
549 }
550 }
551
552 pub fn convert_columns(&self, columns: &[ArrayRef]) -> Result<Rows, ArrowError> {
560 let num_rows = columns.first().map(|x| x.len()).unwrap_or(0);
561 let mut rows = self.empty_rows(num_rows, 0);
562 self.append(&mut rows, columns)?;
563 Ok(rows)
564 }
565
566 pub fn append(&self, rows: &mut Rows, columns: &[ArrayRef]) -> Result<(), ArrowError> {
597 assert!(
598 Arc::ptr_eq(&rows.config.fields, &self.fields),
599 "rows were not produced by this RowConverter"
600 );
601
602 if columns.len() != self.fields.len() {
603 return Err(ArrowError::InvalidArgumentError(format!(
604 "Incorrect number of arrays provided to RowConverter, expected {} got {}",
605 self.fields.len(),
606 columns.len()
607 )));
608 }
609
610 let encoders = columns
611 .iter()
612 .zip(&self.codecs)
613 .zip(self.fields.iter())
614 .map(|((column, codec), field)| {
615 if !column.data_type().equals_datatype(&field.data_type) {
616 return Err(ArrowError::InvalidArgumentError(format!(
617 "RowConverter column schema mismatch, expected {} got {}",
618 field.data_type,
619 column.data_type()
620 )));
621 }
622 codec.encoder(column.as_ref())
623 })
624 .collect::<Result<Vec<_>, _>>()?;
625
626 let write_offset = rows.num_rows();
627 let lengths = row_lengths(columns, &encoders);
628
629 rows.offsets.reserve(lengths.len());
645 let mut cur_offset = rows.offsets[write_offset];
646 for l in lengths {
647 rows.offsets.push(cur_offset);
648 cur_offset = cur_offset.checked_add(l).expect("overflow");
649 }
650
651 rows.buffer.resize(cur_offset, 0);
655
656 for ((column, field), encoder) in columns.iter().zip(self.fields.iter()).zip(encoders) {
657 encode_column(
659 &mut rows.buffer,
660 &mut rows.offsets[write_offset..],
661 column.as_ref(),
662 field.options,
663 &encoder,
664 )
665 }
666
667 if cfg!(debug_assertions) {
668 assert_eq!(*rows.offsets.last().unwrap(), rows.buffer.len());
669 rows.offsets
670 .windows(2)
671 .for_each(|w| assert!(w[0] <= w[1], "offsets should be monotonic"));
672 }
673
674 Ok(())
675 }
676
677 pub fn convert_rows<'a, I>(&self, rows: I) -> Result<Vec<ArrayRef>, ArrowError>
683 where
684 I: IntoIterator<Item = Row<'a>>,
685 {
686 let mut validate_utf8 = false;
687 let mut rows: Vec<_> = rows
688 .into_iter()
689 .map(|row| {
690 assert!(
691 Arc::ptr_eq(&row.config.fields, &self.fields),
692 "rows were not produced by this RowConverter"
693 );
694 validate_utf8 |= row.config.validate_utf8;
695 row.data
696 })
697 .collect();
698
699 unsafe { self.convert_raw(&mut rows, validate_utf8) }
703 }
704
705 pub fn empty_rows(&self, row_capacity: usize, data_capacity: usize) -> Rows {
734 let mut offsets = Vec::with_capacity(row_capacity.saturating_add(1));
735 offsets.push(0);
736
737 Rows {
738 offsets,
739 buffer: Vec::with_capacity(data_capacity),
740 config: RowConfig {
741 fields: self.fields.clone(),
742 validate_utf8: false,
743 },
744 }
745 }
746
747 pub fn from_binary(&self, array: BinaryArray) -> Rows {
774 assert_eq!(
775 array.null_count(),
776 0,
777 "can't construct Rows instance from array with nulls"
778 );
779 Rows {
780 buffer: array.values().to_vec(),
781 offsets: array.offsets().iter().map(|&i| i.as_usize()).collect(),
782 config: RowConfig {
783 fields: Arc::clone(&self.fields),
784 validate_utf8: true,
785 },
786 }
787 }
788
789 unsafe fn convert_raw(
795 &self,
796 rows: &mut [&[u8]],
797 validate_utf8: bool,
798 ) -> Result<Vec<ArrayRef>, ArrowError> {
799 self.fields
800 .iter()
801 .zip(&self.codecs)
802 .map(|(field, codec)| decode_column(field, rows, codec, validate_utf8))
803 .collect()
804 }
805
806 pub fn parser(&self) -> RowParser {
808 RowParser::new(Arc::clone(&self.fields))
809 }
810
811 pub fn size(&self) -> usize {
815 std::mem::size_of::<Self>()
816 + self.fields.iter().map(|x| x.size()).sum::<usize>()
817 + self.codecs.capacity() * std::mem::size_of::<Codec>()
818 + self.codecs.iter().map(Codec::size).sum::<usize>()
819 }
820}
821
822#[derive(Debug)]
824pub struct RowParser {
825 config: RowConfig,
826}
827
828impl RowParser {
829 fn new(fields: Arc<[SortField]>) -> Self {
830 Self {
831 config: RowConfig {
832 fields,
833 validate_utf8: true,
834 },
835 }
836 }
837
838 pub fn parse<'a>(&'a self, bytes: &'a [u8]) -> Row<'a> {
843 Row {
844 data: bytes,
845 config: &self.config,
846 }
847 }
848}
849
850#[derive(Debug, Clone)]
852struct RowConfig {
853 fields: Arc<[SortField]>,
855 validate_utf8: bool,
857}
858
859#[derive(Debug)]
863pub struct Rows {
864 buffer: Vec<u8>,
866 offsets: Vec<usize>,
868 config: RowConfig,
870}
871
872impl Rows {
873 pub fn push(&mut self, row: Row<'_>) {
875 assert!(
876 Arc::ptr_eq(&row.config.fields, &self.config.fields),
877 "row was not produced by this RowConverter"
878 );
879 self.config.validate_utf8 |= row.config.validate_utf8;
880 self.buffer.extend_from_slice(row.data);
881 self.offsets.push(self.buffer.len())
882 }
883
884 pub fn row(&self, row: usize) -> Row<'_> {
886 assert!(row + 1 < self.offsets.len());
887 unsafe { self.row_unchecked(row) }
888 }
889
890 pub unsafe fn row_unchecked(&self, index: usize) -> Row<'_> {
895 let end = unsafe { self.offsets.get_unchecked(index + 1) };
896 let start = unsafe { self.offsets.get_unchecked(index) };
897 let data = unsafe { self.buffer.get_unchecked(*start..*end) };
898 Row {
899 data,
900 config: &self.config,
901 }
902 }
903
904 pub fn clear(&mut self) {
906 self.offsets.truncate(1);
907 self.buffer.clear();
908 }
909
910 pub fn num_rows(&self) -> usize {
912 self.offsets.len() - 1
913 }
914
915 pub fn iter(&self) -> RowsIter<'_> {
917 self.into_iter()
918 }
919
920 pub fn size(&self) -> usize {
924 std::mem::size_of::<Self>()
926 + self.buffer.len()
927 + self.offsets.len() * std::mem::size_of::<usize>()
928 }
929
930 pub fn try_into_binary(self) -> Result<BinaryArray, ArrowError> {
960 if self.buffer.len() > i32::MAX as usize {
961 return Err(ArrowError::InvalidArgumentError(format!(
962 "{}-byte rows buffer too long to convert into a i32-indexed BinaryArray",
963 self.buffer.len()
964 )));
965 }
966 let offsets_scalar = ScalarBuffer::from_iter(self.offsets.into_iter().map(i32::usize_as));
968 let array = unsafe {
970 BinaryArray::new_unchecked(
971 OffsetBuffer::new_unchecked(offsets_scalar),
972 Buffer::from_vec(self.buffer),
973 None,
974 )
975 };
976 Ok(array)
977 }
978}
979
980impl<'a> IntoIterator for &'a Rows {
981 type Item = Row<'a>;
982 type IntoIter = RowsIter<'a>;
983
984 fn into_iter(self) -> Self::IntoIter {
985 RowsIter {
986 rows: self,
987 start: 0,
988 end: self.num_rows(),
989 }
990 }
991}
992
993#[derive(Debug)]
995pub struct RowsIter<'a> {
996 rows: &'a Rows,
997 start: usize,
998 end: usize,
999}
1000
1001impl<'a> Iterator for RowsIter<'a> {
1002 type Item = Row<'a>;
1003
1004 fn next(&mut self) -> Option<Self::Item> {
1005 if self.end == self.start {
1006 return None;
1007 }
1008
1009 let row = unsafe { self.rows.row_unchecked(self.start) };
1011 self.start += 1;
1012 Some(row)
1013 }
1014
1015 fn size_hint(&self) -> (usize, Option<usize>) {
1016 let len = self.len();
1017 (len, Some(len))
1018 }
1019}
1020
1021impl ExactSizeIterator for RowsIter<'_> {
1022 fn len(&self) -> usize {
1023 self.end - self.start
1024 }
1025}
1026
1027impl DoubleEndedIterator for RowsIter<'_> {
1028 fn next_back(&mut self) -> Option<Self::Item> {
1029 if self.end == self.start {
1030 return None;
1031 }
1032 let row = unsafe { self.rows.row_unchecked(self.end) };
1034 self.end -= 1;
1035 Some(row)
1036 }
1037}
1038
1039#[derive(Debug, Copy, Clone)]
1048pub struct Row<'a> {
1049 data: &'a [u8],
1050 config: &'a RowConfig,
1051}
1052
1053impl<'a> Row<'a> {
1054 pub fn owned(&self) -> OwnedRow {
1056 OwnedRow {
1057 data: self.data.into(),
1058 config: self.config.clone(),
1059 }
1060 }
1061
1062 pub fn data(&self) -> &'a [u8] {
1064 self.data
1065 }
1066}
1067
1068impl PartialEq for Row<'_> {
1071 #[inline]
1072 fn eq(&self, other: &Self) -> bool {
1073 self.data.eq(other.data)
1074 }
1075}
1076
1077impl Eq for Row<'_> {}
1078
1079impl PartialOrd for Row<'_> {
1080 #[inline]
1081 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1082 Some(self.cmp(other))
1083 }
1084}
1085
1086impl Ord for Row<'_> {
1087 #[inline]
1088 fn cmp(&self, other: &Self) -> Ordering {
1089 self.data.cmp(other.data)
1090 }
1091}
1092
1093impl Hash for Row<'_> {
1094 #[inline]
1095 fn hash<H: Hasher>(&self, state: &mut H) {
1096 self.data.hash(state)
1097 }
1098}
1099
1100impl AsRef<[u8]> for Row<'_> {
1101 #[inline]
1102 fn as_ref(&self) -> &[u8] {
1103 self.data
1104 }
1105}
1106
1107#[derive(Debug, Clone)]
1111pub struct OwnedRow {
1112 data: Box<[u8]>,
1113 config: RowConfig,
1114}
1115
1116impl OwnedRow {
1117 pub fn row(&self) -> Row<'_> {
1121 Row {
1122 data: &self.data,
1123 config: &self.config,
1124 }
1125 }
1126}
1127
1128impl PartialEq for OwnedRow {
1131 #[inline]
1132 fn eq(&self, other: &Self) -> bool {
1133 self.row().eq(&other.row())
1134 }
1135}
1136
1137impl Eq for OwnedRow {}
1138
1139impl PartialOrd for OwnedRow {
1140 #[inline]
1141 fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
1142 Some(self.cmp(other))
1143 }
1144}
1145
1146impl Ord for OwnedRow {
1147 #[inline]
1148 fn cmp(&self, other: &Self) -> Ordering {
1149 self.row().cmp(&other.row())
1150 }
1151}
1152
1153impl Hash for OwnedRow {
1154 #[inline]
1155 fn hash<H: Hasher>(&self, state: &mut H) {
1156 self.row().hash(state)
1157 }
1158}
1159
1160impl AsRef<[u8]> for OwnedRow {
1161 #[inline]
1162 fn as_ref(&self) -> &[u8] {
1163 &self.data
1164 }
1165}
1166
1167#[inline]
1169fn null_sentinel(options: SortOptions) -> u8 {
1170 match options.nulls_first {
1171 true => 0,
1172 false => 0xFF,
1173 }
1174}
1175
1176fn row_lengths(cols: &[ArrayRef], encoders: &[Encoder]) -> Vec<usize> {
1178 use fixed::FixedLengthEncoding;
1179
1180 let num_rows = cols.first().map(|x| x.len()).unwrap_or(0);
1181 let mut lengths = vec![0; num_rows];
1182
1183 for (array, encoder) in cols.iter().zip(encoders) {
1184 match encoder {
1185 Encoder::Stateless => {
1186 downcast_primitive_array! {
1187 array => lengths.iter_mut().for_each(|x| *x += fixed::encoded_len(array)),
1188 DataType::Null => {},
1189 DataType::Boolean => lengths.iter_mut().for_each(|x| *x += bool::ENCODED_LEN),
1190 DataType::Binary => as_generic_binary_array::<i32>(array)
1191 .iter()
1192 .zip(lengths.iter_mut())
1193 .for_each(|(slice, length)| *length += variable::encoded_len(slice)),
1194 DataType::LargeBinary => as_generic_binary_array::<i64>(array)
1195 .iter()
1196 .zip(lengths.iter_mut())
1197 .for_each(|(slice, length)| *length += variable::encoded_len(slice)),
1198 DataType::BinaryView => array.as_binary_view().iter().zip(lengths.iter_mut()).for_each(|(slice, length)| {
1199 *length += variable::encoded_len(slice)
1200 }),
1201 DataType::Utf8 => array.as_string::<i32>()
1202 .iter()
1203 .zip(lengths.iter_mut())
1204 .for_each(|(slice, length)| {
1205 *length += variable::encoded_len(slice.map(|x| x.as_bytes()))
1206 }),
1207 DataType::LargeUtf8 => array.as_string::<i64>()
1208 .iter()
1209 .zip(lengths.iter_mut())
1210 .for_each(|(slice, length)| {
1211 *length += variable::encoded_len(slice.map(|x| x.as_bytes()))
1212 }),
1213 DataType::Utf8View => array.as_string_view().iter().zip(lengths.iter_mut()).for_each(|(slice, length)| {
1214 *length += variable::encoded_len(slice.map(|x| x.as_bytes()))
1215 }),
1216 DataType::FixedSizeBinary(len) => {
1217 let len = len.to_usize().unwrap();
1218 lengths.iter_mut().for_each(|x| *x += 1 + len)
1219 }
1220 _ => unimplemented!("unsupported data type: {}", array.data_type()),
1221 }
1222 }
1223 Encoder::Dictionary(values, null) => {
1224 downcast_dictionary_array! {
1225 array => {
1226 for (v, length) in array.keys().iter().zip(lengths.iter_mut()) {
1227 *length += match v {
1228 Some(k) => values.row(k.as_usize()).data.len(),
1229 None => null.data.len(),
1230 }
1231 }
1232 }
1233 _ => unreachable!(),
1234 }
1235 }
1236 Encoder::Struct(rows, null) => {
1237 let array = as_struct_array(array);
1238 lengths.iter_mut().enumerate().for_each(|(idx, length)| {
1239 match array.is_valid(idx) {
1240 true => *length += 1 + rows.row(idx).as_ref().len(),
1241 false => *length += 1 + null.data.len(),
1242 }
1243 });
1244 }
1245 Encoder::List(rows) => match array.data_type() {
1246 DataType::List(_) => {
1247 list::compute_lengths(&mut lengths, rows, as_list_array(array))
1248 }
1249 DataType::LargeList(_) => {
1250 list::compute_lengths(&mut lengths, rows, as_large_list_array(array))
1251 }
1252 _ => unreachable!(),
1253 },
1254 }
1255 }
1256
1257 lengths
1258}
1259
1260fn encode_column(
1262 data: &mut [u8],
1263 offsets: &mut [usize],
1264 column: &dyn Array,
1265 opts: SortOptions,
1266 encoder: &Encoder<'_>,
1267) {
1268 match encoder {
1269 Encoder::Stateless => {
1270 downcast_primitive_array! {
1271 column => {
1272 if let Some(nulls) = column.nulls().filter(|n| n.null_count() > 0){
1273 fixed::encode(data, offsets, column.values(), nulls, opts)
1274 } else {
1275 fixed::encode_not_null(data, offsets, column.values(), opts)
1276 }
1277 }
1278 DataType::Null => {}
1279 DataType::Boolean => {
1280 if let Some(nulls) = column.nulls().filter(|n| n.null_count() > 0){
1281 fixed::encode_boolean(data, offsets, column.as_boolean().values(), nulls, opts)
1282 } else {
1283 fixed::encode_boolean_not_null(data, offsets, column.as_boolean().values(), opts)
1284 }
1285 }
1286 DataType::Binary => {
1287 variable::encode(data, offsets, as_generic_binary_array::<i32>(column).iter(), opts)
1288 }
1289 DataType::BinaryView => {
1290 variable::encode(data, offsets, column.as_binary_view().iter(), opts)
1291 }
1292 DataType::LargeBinary => {
1293 variable::encode(data, offsets, as_generic_binary_array::<i64>(column).iter(), opts)
1294 }
1295 DataType::Utf8 => variable::encode(
1296 data, offsets,
1297 column.as_string::<i32>().iter().map(|x| x.map(|x| x.as_bytes())),
1298 opts,
1299 ),
1300 DataType::LargeUtf8 => variable::encode(
1301 data, offsets,
1302 column.as_string::<i64>()
1303 .iter()
1304 .map(|x| x.map(|x| x.as_bytes())),
1305 opts,
1306 ),
1307 DataType::Utf8View => variable::encode(
1308 data, offsets,
1309 column.as_string_view().iter().map(|x| x.map(|x| x.as_bytes())),
1310 opts,
1311 ),
1312 DataType::FixedSizeBinary(_) => {
1313 let array = column.as_any().downcast_ref().unwrap();
1314 fixed::encode_fixed_size_binary(data, offsets, array, opts)
1315 }
1316 _ => unimplemented!("unsupported data type: {}", column.data_type()),
1317 }
1318 }
1319 Encoder::Dictionary(values, nulls) => {
1320 downcast_dictionary_array! {
1321 column => encode_dictionary_values(data, offsets, column, values, nulls),
1322 _ => unreachable!()
1323 }
1324 }
1325 Encoder::Struct(rows, null) => {
1326 let array = as_struct_array(column);
1327 let null_sentinel = null_sentinel(opts);
1328 offsets
1329 .iter_mut()
1330 .skip(1)
1331 .enumerate()
1332 .for_each(|(idx, offset)| {
1333 let (row, sentinel) = match array.is_valid(idx) {
1334 true => (rows.row(idx), 0x01),
1335 false => (*null, null_sentinel),
1336 };
1337 let end_offset = *offset + 1 + row.as_ref().len();
1338 data[*offset] = sentinel;
1339 data[*offset + 1..end_offset].copy_from_slice(row.as_ref());
1340 *offset = end_offset;
1341 })
1342 }
1343 Encoder::List(rows) => match column.data_type() {
1344 DataType::List(_) => list::encode(data, offsets, rows, opts, as_list_array(column)),
1345 DataType::LargeList(_) => {
1346 list::encode(data, offsets, rows, opts, as_large_list_array(column))
1347 }
1348 _ => unreachable!(),
1349 },
1350 }
1351}
1352
1353pub fn encode_dictionary_values<K: ArrowDictionaryKeyType>(
1355 data: &mut [u8],
1356 offsets: &mut [usize],
1357 column: &DictionaryArray<K>,
1358 values: &Rows,
1359 null: &Row<'_>,
1360) {
1361 for (offset, k) in offsets.iter_mut().skip(1).zip(column.keys()) {
1362 let row = match k {
1363 Some(k) => values.row(k.as_usize()).data,
1364 None => null.data,
1365 };
1366 let end_offset = *offset + row.len();
1367 data[*offset..end_offset].copy_from_slice(row);
1368 *offset = end_offset;
1369 }
1370}
1371
1372macro_rules! decode_primitive_helper {
1373 ($t:ty, $rows:ident, $data_type:ident, $options:ident) => {
1374 Arc::new(decode_primitive::<$t>($rows, $data_type, $options))
1375 };
1376}
1377
1378unsafe fn decode_column(
1384 field: &SortField,
1385 rows: &mut [&[u8]],
1386 codec: &Codec,
1387 validate_utf8: bool,
1388) -> Result<ArrayRef, ArrowError> {
1389 let options = field.options;
1390
1391 let array: ArrayRef = match codec {
1392 Codec::Stateless => {
1393 let data_type = field.data_type.clone();
1394 downcast_primitive! {
1395 data_type => (decode_primitive_helper, rows, data_type, options),
1396 DataType::Null => Arc::new(NullArray::new(rows.len())),
1397 DataType::Boolean => Arc::new(decode_bool(rows, options)),
1398 DataType::Binary => Arc::new(decode_binary::<i32>(rows, options)),
1399 DataType::LargeBinary => Arc::new(decode_binary::<i64>(rows, options)),
1400 DataType::BinaryView => Arc::new(decode_binary_view(rows, options)),
1401 DataType::FixedSizeBinary(size) => Arc::new(decode_fixed_size_binary(rows, size, options)),
1402 DataType::Utf8 => Arc::new(decode_string::<i32>(rows, options, validate_utf8)),
1403 DataType::LargeUtf8 => Arc::new(decode_string::<i64>(rows, options, validate_utf8)),
1404 DataType::Utf8View => Arc::new(decode_string_view(rows, options, validate_utf8)),
1405 _ => return Err(ArrowError::NotYetImplemented(format!("unsupported data type: {}", data_type)))
1406 }
1407 }
1408 Codec::Dictionary(converter, _) => {
1409 let cols = converter.convert_raw(rows, validate_utf8)?;
1410 cols.into_iter().next().unwrap()
1411 }
1412 Codec::Struct(converter, _) => {
1413 let (null_count, nulls) = fixed::decode_nulls(rows);
1414 rows.iter_mut().for_each(|row| *row = &row[1..]);
1415 let children = converter.convert_raw(rows, validate_utf8)?;
1416
1417 let child_data = children.iter().map(|c| c.to_data()).collect();
1418 let builder = ArrayDataBuilder::new(field.data_type.clone())
1419 .len(rows.len())
1420 .null_count(null_count)
1421 .null_bit_buffer(Some(nulls))
1422 .child_data(child_data);
1423
1424 Arc::new(StructArray::from(builder.build_unchecked()))
1425 }
1426 Codec::List(converter) => match &field.data_type {
1427 DataType::List(_) => {
1428 Arc::new(list::decode::<i32>(converter, rows, field, validate_utf8)?)
1429 }
1430 DataType::LargeList(_) => {
1431 Arc::new(list::decode::<i64>(converter, rows, field, validate_utf8)?)
1432 }
1433 _ => unreachable!(),
1434 },
1435 };
1436 Ok(array)
1437}
1438
1439#[cfg(test)]
1440mod tests {
1441 use rand::distr::uniform::SampleUniform;
1442 use rand::distr::{Distribution, StandardUniform};
1443 use rand::{rng, Rng};
1444
1445 use arrow_array::builder::*;
1446 use arrow_array::types::*;
1447 use arrow_array::*;
1448 use arrow_buffer::{i256, NullBuffer};
1449 use arrow_buffer::{Buffer, OffsetBuffer};
1450 use arrow_cast::display::{ArrayFormatter, FormatOptions};
1451 use arrow_ord::sort::{LexicographicalComparator, SortColumn};
1452
1453 use super::*;
1454
1455 #[test]
1456 fn test_fixed_width() {
1457 let cols = [
1458 Arc::new(Int16Array::from_iter([
1459 Some(1),
1460 Some(2),
1461 None,
1462 Some(-5),
1463 Some(2),
1464 Some(2),
1465 Some(0),
1466 ])) as ArrayRef,
1467 Arc::new(Float32Array::from_iter([
1468 Some(1.3),
1469 Some(2.5),
1470 None,
1471 Some(4.),
1472 Some(0.1),
1473 Some(-4.),
1474 Some(-0.),
1475 ])) as ArrayRef,
1476 ];
1477
1478 let converter = RowConverter::new(vec![
1479 SortField::new(DataType::Int16),
1480 SortField::new(DataType::Float32),
1481 ])
1482 .unwrap();
1483 let rows = converter.convert_columns(&cols).unwrap();
1484
1485 assert_eq!(rows.offsets, &[0, 8, 16, 24, 32, 40, 48, 56]);
1486 assert_eq!(
1487 rows.buffer,
1488 &[
1489 1, 128, 1, 1, 191, 166, 102, 102, 1, 128, 2, 1, 192, 32, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 127, 251, 1, 192, 128, 0, 0, 1, 128, 2, 1, 189, 204, 204, 205, 1, 128, 2, 1, 63, 127, 255, 255, 1, 128, 0, 1, 127, 255, 255, 255 ]
1504 );
1505
1506 assert!(rows.row(3) < rows.row(6));
1507 assert!(rows.row(0) < rows.row(1));
1508 assert!(rows.row(3) < rows.row(0));
1509 assert!(rows.row(4) < rows.row(1));
1510 assert!(rows.row(5) < rows.row(4));
1511
1512 let back = converter.convert_rows(&rows).unwrap();
1513 for (expected, actual) in cols.iter().zip(&back) {
1514 assert_eq!(expected, actual);
1515 }
1516 }
1517
1518 #[test]
1519 fn test_decimal128() {
1520 let converter = RowConverter::new(vec![SortField::new(DataType::Decimal128(
1521 DECIMAL128_MAX_PRECISION,
1522 7,
1523 ))])
1524 .unwrap();
1525 let col = Arc::new(
1526 Decimal128Array::from_iter([
1527 None,
1528 Some(i128::MIN),
1529 Some(-13),
1530 Some(46_i128),
1531 Some(5456_i128),
1532 Some(i128::MAX),
1533 ])
1534 .with_precision_and_scale(38, 7)
1535 .unwrap(),
1536 ) as ArrayRef;
1537
1538 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1539 for i in 0..rows.num_rows() - 1 {
1540 assert!(rows.row(i) < rows.row(i + 1));
1541 }
1542
1543 let back = converter.convert_rows(&rows).unwrap();
1544 assert_eq!(back.len(), 1);
1545 assert_eq!(col.as_ref(), back[0].as_ref())
1546 }
1547
1548 #[test]
1549 fn test_decimal256() {
1550 let converter = RowConverter::new(vec![SortField::new(DataType::Decimal256(
1551 DECIMAL256_MAX_PRECISION,
1552 7,
1553 ))])
1554 .unwrap();
1555 let col = Arc::new(
1556 Decimal256Array::from_iter([
1557 None,
1558 Some(i256::MIN),
1559 Some(i256::from_parts(0, -1)),
1560 Some(i256::from_parts(u128::MAX, -1)),
1561 Some(i256::from_parts(u128::MAX, 0)),
1562 Some(i256::from_parts(0, 46_i128)),
1563 Some(i256::from_parts(5, 46_i128)),
1564 Some(i256::MAX),
1565 ])
1566 .with_precision_and_scale(DECIMAL256_MAX_PRECISION, 7)
1567 .unwrap(),
1568 ) as ArrayRef;
1569
1570 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1571 for i in 0..rows.num_rows() - 1 {
1572 assert!(rows.row(i) < rows.row(i + 1));
1573 }
1574
1575 let back = converter.convert_rows(&rows).unwrap();
1576 assert_eq!(back.len(), 1);
1577 assert_eq!(col.as_ref(), back[0].as_ref())
1578 }
1579
1580 #[test]
1581 fn test_bool() {
1582 let converter = RowConverter::new(vec![SortField::new(DataType::Boolean)]).unwrap();
1583
1584 let col = Arc::new(BooleanArray::from_iter([None, Some(false), Some(true)])) as ArrayRef;
1585
1586 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1587 assert!(rows.row(2) > rows.row(1));
1588 assert!(rows.row(2) > rows.row(0));
1589 assert!(rows.row(1) > rows.row(0));
1590
1591 let cols = converter.convert_rows(&rows).unwrap();
1592 assert_eq!(&cols[0], &col);
1593
1594 let converter = RowConverter::new(vec![SortField::new_with_options(
1595 DataType::Boolean,
1596 SortOptions::default().desc().with_nulls_first(false),
1597 )])
1598 .unwrap();
1599
1600 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1601 assert!(rows.row(2) < rows.row(1));
1602 assert!(rows.row(2) < rows.row(0));
1603 assert!(rows.row(1) < rows.row(0));
1604 let cols = converter.convert_rows(&rows).unwrap();
1605 assert_eq!(&cols[0], &col);
1606 }
1607
1608 #[test]
1609 fn test_timezone() {
1610 let a =
1611 TimestampNanosecondArray::from(vec![1, 2, 3, 4, 5]).with_timezone("+01:00".to_string());
1612 let d = a.data_type().clone();
1613
1614 let converter = RowConverter::new(vec![SortField::new(a.data_type().clone())]).unwrap();
1615 let rows = converter.convert_columns(&[Arc::new(a) as _]).unwrap();
1616 let back = converter.convert_rows(&rows).unwrap();
1617 assert_eq!(back.len(), 1);
1618 assert_eq!(back[0].data_type(), &d);
1619
1620 let mut a = PrimitiveDictionaryBuilder::<Int32Type, TimestampNanosecondType>::new();
1622 a.append(34).unwrap();
1623 a.append_null();
1624 a.append(345).unwrap();
1625
1626 let dict = a.finish();
1628 let values = TimestampNanosecondArray::from(dict.values().to_data());
1629 let dict_with_tz = dict.with_values(Arc::new(values.with_timezone("+02:00")));
1630 let v = DataType::Timestamp(TimeUnit::Nanosecond, Some("+02:00".into()));
1631 let d = DataType::Dictionary(Box::new(DataType::Int32), Box::new(v.clone()));
1632
1633 assert_eq!(dict_with_tz.data_type(), &d);
1634 let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
1635 let rows = converter
1636 .convert_columns(&[Arc::new(dict_with_tz) as _])
1637 .unwrap();
1638 let back = converter.convert_rows(&rows).unwrap();
1639 assert_eq!(back.len(), 1);
1640 assert_eq!(back[0].data_type(), &v);
1641 }
1642
1643 #[test]
1644 fn test_null_encoding() {
1645 let col = Arc::new(NullArray::new(10));
1646 let converter = RowConverter::new(vec![SortField::new(DataType::Null)]).unwrap();
1647 let rows = converter.convert_columns(&[col]).unwrap();
1648 assert_eq!(rows.num_rows(), 10);
1649 assert_eq!(rows.row(1).data.len(), 0);
1650 }
1651
1652 #[test]
1653 fn test_variable_width() {
1654 let col = Arc::new(StringArray::from_iter([
1655 Some("hello"),
1656 Some("he"),
1657 None,
1658 Some("foo"),
1659 Some(""),
1660 ])) as ArrayRef;
1661
1662 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
1663 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1664
1665 assert!(rows.row(1) < rows.row(0));
1666 assert!(rows.row(2) < rows.row(4));
1667 assert!(rows.row(3) < rows.row(0));
1668 assert!(rows.row(3) < rows.row(1));
1669
1670 let cols = converter.convert_rows(&rows).unwrap();
1671 assert_eq!(&cols[0], &col);
1672
1673 let col = Arc::new(BinaryArray::from_iter([
1674 None,
1675 Some(vec![0_u8; 0]),
1676 Some(vec![0_u8; 6]),
1677 Some(vec![0_u8; variable::MINI_BLOCK_SIZE]),
1678 Some(vec![0_u8; variable::MINI_BLOCK_SIZE + 1]),
1679 Some(vec![0_u8; variable::BLOCK_SIZE]),
1680 Some(vec![0_u8; variable::BLOCK_SIZE + 1]),
1681 Some(vec![1_u8; 6]),
1682 Some(vec![1_u8; variable::MINI_BLOCK_SIZE]),
1683 Some(vec![1_u8; variable::MINI_BLOCK_SIZE + 1]),
1684 Some(vec![1_u8; variable::BLOCK_SIZE]),
1685 Some(vec![1_u8; variable::BLOCK_SIZE + 1]),
1686 Some(vec![0xFF_u8; 6]),
1687 Some(vec![0xFF_u8; variable::MINI_BLOCK_SIZE]),
1688 Some(vec![0xFF_u8; variable::MINI_BLOCK_SIZE + 1]),
1689 Some(vec![0xFF_u8; variable::BLOCK_SIZE]),
1690 Some(vec![0xFF_u8; variable::BLOCK_SIZE + 1]),
1691 ])) as ArrayRef;
1692
1693 let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
1694 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1695
1696 for i in 0..rows.num_rows() {
1697 for j in i + 1..rows.num_rows() {
1698 assert!(
1699 rows.row(i) < rows.row(j),
1700 "{} < {} - {:?} < {:?}",
1701 i,
1702 j,
1703 rows.row(i),
1704 rows.row(j)
1705 );
1706 }
1707 }
1708
1709 let cols = converter.convert_rows(&rows).unwrap();
1710 assert_eq!(&cols[0], &col);
1711
1712 let converter = RowConverter::new(vec![SortField::new_with_options(
1713 DataType::Binary,
1714 SortOptions::default().desc().with_nulls_first(false),
1715 )])
1716 .unwrap();
1717 let rows = converter.convert_columns(&[Arc::clone(&col)]).unwrap();
1718
1719 for i in 0..rows.num_rows() {
1720 for j in i + 1..rows.num_rows() {
1721 assert!(
1722 rows.row(i) > rows.row(j),
1723 "{} > {} - {:?} > {:?}",
1724 i,
1725 j,
1726 rows.row(i),
1727 rows.row(j)
1728 );
1729 }
1730 }
1731
1732 let cols = converter.convert_rows(&rows).unwrap();
1733 assert_eq!(&cols[0], &col);
1734 }
1735
1736 fn dictionary_eq(a: &dyn Array, b: &dyn Array) {
1738 match b.data_type() {
1739 DataType::Dictionary(_, v) => {
1740 assert_eq!(a.data_type(), v.as_ref());
1741 let b = arrow_cast::cast(b, v).unwrap();
1742 assert_eq!(a, b.as_ref())
1743 }
1744 _ => assert_eq!(a, b),
1745 }
1746 }
1747
1748 #[test]
1749 fn test_string_dictionary() {
1750 let a = Arc::new(DictionaryArray::<Int32Type>::from_iter([
1751 Some("foo"),
1752 Some("hello"),
1753 Some("he"),
1754 None,
1755 Some("hello"),
1756 Some(""),
1757 Some("hello"),
1758 Some("hello"),
1759 ])) as ArrayRef;
1760
1761 let field = SortField::new(a.data_type().clone());
1762 let converter = RowConverter::new(vec![field]).unwrap();
1763 let rows_a = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
1764
1765 assert!(rows_a.row(3) < rows_a.row(5));
1766 assert!(rows_a.row(2) < rows_a.row(1));
1767 assert!(rows_a.row(0) < rows_a.row(1));
1768 assert!(rows_a.row(3) < rows_a.row(0));
1769
1770 assert_eq!(rows_a.row(1), rows_a.row(4));
1771 assert_eq!(rows_a.row(1), rows_a.row(6));
1772 assert_eq!(rows_a.row(1), rows_a.row(7));
1773
1774 let cols = converter.convert_rows(&rows_a).unwrap();
1775 dictionary_eq(&cols[0], &a);
1776
1777 let b = Arc::new(DictionaryArray::<Int32Type>::from_iter([
1778 Some("hello"),
1779 None,
1780 Some("cupcakes"),
1781 ])) as ArrayRef;
1782
1783 let rows_b = converter.convert_columns(&[Arc::clone(&b)]).unwrap();
1784 assert_eq!(rows_a.row(1), rows_b.row(0));
1785 assert_eq!(rows_a.row(3), rows_b.row(1));
1786 assert!(rows_b.row(2) < rows_a.row(0));
1787
1788 let cols = converter.convert_rows(&rows_b).unwrap();
1789 dictionary_eq(&cols[0], &b);
1790
1791 let converter = RowConverter::new(vec![SortField::new_with_options(
1792 a.data_type().clone(),
1793 SortOptions::default().desc().with_nulls_first(false),
1794 )])
1795 .unwrap();
1796
1797 let rows_c = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
1798 assert!(rows_c.row(3) > rows_c.row(5));
1799 assert!(rows_c.row(2) > rows_c.row(1));
1800 assert!(rows_c.row(0) > rows_c.row(1));
1801 assert!(rows_c.row(3) > rows_c.row(0));
1802
1803 let cols = converter.convert_rows(&rows_c).unwrap();
1804 dictionary_eq(&cols[0], &a);
1805
1806 let converter = RowConverter::new(vec![SortField::new_with_options(
1807 a.data_type().clone(),
1808 SortOptions::default().desc().with_nulls_first(true),
1809 )])
1810 .unwrap();
1811
1812 let rows_c = converter.convert_columns(&[Arc::clone(&a)]).unwrap();
1813 assert!(rows_c.row(3) < rows_c.row(5));
1814 assert!(rows_c.row(2) > rows_c.row(1));
1815 assert!(rows_c.row(0) > rows_c.row(1));
1816 assert!(rows_c.row(3) < rows_c.row(0));
1817
1818 let cols = converter.convert_rows(&rows_c).unwrap();
1819 dictionary_eq(&cols[0], &a);
1820 }
1821
1822 #[test]
1823 fn test_struct() {
1824 let a = Arc::new(Int32Array::from(vec![1, 1, 2, 2])) as ArrayRef;
1826 let a_f = Arc::new(Field::new("int", DataType::Int32, false));
1827 let u = Arc::new(StringArray::from(vec!["a", "b", "c", "d"])) as ArrayRef;
1828 let u_f = Arc::new(Field::new("s", DataType::Utf8, false));
1829 let s1 = Arc::new(StructArray::from(vec![(a_f, a), (u_f, u)])) as ArrayRef;
1830
1831 let sort_fields = vec![SortField::new(s1.data_type().clone())];
1832 let converter = RowConverter::new(sort_fields).unwrap();
1833 let r1 = converter.convert_columns(&[Arc::clone(&s1)]).unwrap();
1834
1835 for (a, b) in r1.iter().zip(r1.iter().skip(1)) {
1836 assert!(a < b);
1837 }
1838
1839 let back = converter.convert_rows(&r1).unwrap();
1840 assert_eq!(back.len(), 1);
1841 assert_eq!(&back[0], &s1);
1842
1843 let data = s1
1845 .to_data()
1846 .into_builder()
1847 .null_bit_buffer(Some(Buffer::from_slice_ref([0b00001010])))
1848 .null_count(2)
1849 .build()
1850 .unwrap();
1851
1852 let s2 = Arc::new(StructArray::from(data)) as ArrayRef;
1853 let r2 = converter.convert_columns(&[Arc::clone(&s2)]).unwrap();
1854 assert_eq!(r2.row(0), r2.row(2)); assert!(r2.row(0) < r2.row(1)); assert_ne!(r1.row(0), r2.row(0)); assert_eq!(r1.row(1), r2.row(1)); let back = converter.convert_rows(&r2).unwrap();
1860 assert_eq!(back.len(), 1);
1861 assert_eq!(&back[0], &s2);
1862
1863 back[0].to_data().validate_full().unwrap();
1864 }
1865
1866 #[test]
1867 fn test_primitive_dictionary() {
1868 let mut builder = PrimitiveDictionaryBuilder::<Int32Type, Int32Type>::new();
1869 builder.append(2).unwrap();
1870 builder.append(3).unwrap();
1871 builder.append(0).unwrap();
1872 builder.append_null();
1873 builder.append(5).unwrap();
1874 builder.append(3).unwrap();
1875 builder.append(-1).unwrap();
1876
1877 let a = builder.finish();
1878 let data_type = a.data_type().clone();
1879 let columns = [Arc::new(a) as ArrayRef];
1880
1881 let field = SortField::new(data_type.clone());
1882 let converter = RowConverter::new(vec![field]).unwrap();
1883 let rows = converter.convert_columns(&columns).unwrap();
1884 assert!(rows.row(0) < rows.row(1));
1885 assert!(rows.row(2) < rows.row(0));
1886 assert!(rows.row(3) < rows.row(2));
1887 assert!(rows.row(6) < rows.row(2));
1888 assert!(rows.row(3) < rows.row(6));
1889 }
1890
1891 #[test]
1892 fn test_dictionary_nulls() {
1893 let values = Int32Array::from_iter([Some(1), Some(-1), None, Some(4), None]).into_data();
1894 let keys =
1895 Int32Array::from_iter([Some(0), Some(0), Some(1), Some(2), Some(4), None]).into_data();
1896
1897 let data_type = DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Int32));
1898 let data = keys
1899 .into_builder()
1900 .data_type(data_type.clone())
1901 .child_data(vec![values])
1902 .build()
1903 .unwrap();
1904
1905 let columns = [Arc::new(DictionaryArray::<Int32Type>::from(data)) as ArrayRef];
1906 let field = SortField::new(data_type.clone());
1907 let converter = RowConverter::new(vec![field]).unwrap();
1908 let rows = converter.convert_columns(&columns).unwrap();
1909
1910 assert_eq!(rows.row(0), rows.row(1));
1911 assert_eq!(rows.row(3), rows.row(4));
1912 assert_eq!(rows.row(4), rows.row(5));
1913 assert!(rows.row(3) < rows.row(0));
1914 }
1915
1916 #[test]
1917 #[should_panic(expected = "Encountered non UTF-8 data")]
1918 fn test_invalid_utf8() {
1919 let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
1920 let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _;
1921 let rows = converter.convert_columns(&[array]).unwrap();
1922 let binary_row = rows.row(0);
1923
1924 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
1925 let parser = converter.parser();
1926 let utf8_row = parser.parse(binary_row.as_ref());
1927
1928 converter.convert_rows(std::iter::once(utf8_row)).unwrap();
1929 }
1930
1931 #[test]
1932 #[should_panic(expected = "Encountered non UTF-8 data")]
1933 fn test_invalid_utf8_array() {
1934 let converter = RowConverter::new(vec![SortField::new(DataType::Binary)]).unwrap();
1935 let array = Arc::new(BinaryArray::from_iter_values([&[0xFF]])) as _;
1936 let rows = converter.convert_columns(&[array]).unwrap();
1937 let binary_rows = rows.try_into_binary().expect("known-small rows");
1938
1939 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
1940 let parsed = converter.from_binary(binary_rows);
1941
1942 converter.convert_rows(parsed.iter()).unwrap();
1943 }
1944
1945 #[test]
1946 #[should_panic(expected = "index out of bounds")]
1947 fn test_invalid_empty() {
1948 let binary_row: &[u8] = &[];
1949
1950 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
1951 let parser = converter.parser();
1952 let utf8_row = parser.parse(binary_row.as_ref());
1953
1954 converter.convert_rows(std::iter::once(utf8_row)).unwrap();
1955 }
1956
1957 #[test]
1958 #[should_panic(expected = "index out of bounds")]
1959 fn test_invalid_empty_array() {
1960 let row: &[u8] = &[];
1961 let binary_rows = BinaryArray::from(vec![row]);
1962
1963 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
1964 let parsed = converter.from_binary(binary_rows);
1965
1966 converter.convert_rows(parsed.iter()).unwrap();
1967 }
1968
1969 #[test]
1970 #[should_panic(expected = "index out of bounds")]
1971 fn test_invalid_truncated() {
1972 let binary_row: &[u8] = &[0x02];
1973
1974 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
1975 let parser = converter.parser();
1976 let utf8_row = parser.parse(binary_row.as_ref());
1977
1978 converter.convert_rows(std::iter::once(utf8_row)).unwrap();
1979 }
1980
1981 #[test]
1982 #[should_panic(expected = "index out of bounds")]
1983 fn test_invalid_truncated_array() {
1984 let row: &[u8] = &[0x02];
1985 let binary_rows = BinaryArray::from(vec![row]);
1986
1987 let converter = RowConverter::new(vec![SortField::new(DataType::Utf8)]).unwrap();
1988 let parsed = converter.from_binary(binary_rows);
1989
1990 converter.convert_rows(parsed.iter()).unwrap();
1991 }
1992
1993 #[test]
1994 #[should_panic(expected = "rows were not produced by this RowConverter")]
1995 fn test_different_converter() {
1996 let values = Arc::new(Int32Array::from_iter([Some(1), Some(-1)]));
1997 let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
1998 let rows = converter.convert_columns(&[values]).unwrap();
1999
2000 let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
2001 let _ = converter.convert_rows(&rows);
2002 }
2003
2004 fn test_single_list<O: OffsetSizeTrait>() {
2005 let mut builder = GenericListBuilder::<O, _>::new(Int32Builder::new());
2006 builder.values().append_value(32);
2007 builder.values().append_value(52);
2008 builder.values().append_value(32);
2009 builder.append(true);
2010 builder.values().append_value(32);
2011 builder.values().append_value(52);
2012 builder.values().append_value(12);
2013 builder.append(true);
2014 builder.values().append_value(32);
2015 builder.values().append_value(52);
2016 builder.append(true);
2017 builder.values().append_value(32); builder.values().append_value(52); builder.append(false);
2020 builder.values().append_value(32);
2021 builder.values().append_null();
2022 builder.append(true);
2023 builder.append(true);
2024
2025 let list = Arc::new(builder.finish()) as ArrayRef;
2026 let d = list.data_type().clone();
2027
2028 let converter = RowConverter::new(vec![SortField::new(d.clone())]).unwrap();
2029
2030 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2031 assert!(rows.row(0) > rows.row(1)); assert!(rows.row(2) < rows.row(1)); assert!(rows.row(3) < rows.row(2)); assert!(rows.row(4) < rows.row(2)); assert!(rows.row(5) < rows.row(2)); assert!(rows.row(3) < rows.row(5)); let back = converter.convert_rows(&rows).unwrap();
2039 assert_eq!(back.len(), 1);
2040 back[0].to_data().validate_full().unwrap();
2041 assert_eq!(&back[0], &list);
2042
2043 let options = SortOptions::default().asc().with_nulls_first(false);
2044 let field = SortField::new_with_options(d.clone(), options);
2045 let converter = RowConverter::new(vec![field]).unwrap();
2046 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2047
2048 assert!(rows.row(0) > rows.row(1)); assert!(rows.row(2) < rows.row(1)); assert!(rows.row(3) > rows.row(2)); assert!(rows.row(4) > rows.row(2)); assert!(rows.row(5) < rows.row(2)); assert!(rows.row(3) > rows.row(5)); let back = converter.convert_rows(&rows).unwrap();
2056 assert_eq!(back.len(), 1);
2057 back[0].to_data().validate_full().unwrap();
2058 assert_eq!(&back[0], &list);
2059
2060 let options = SortOptions::default().desc().with_nulls_first(false);
2061 let field = SortField::new_with_options(d.clone(), options);
2062 let converter = RowConverter::new(vec![field]).unwrap();
2063 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2064
2065 assert!(rows.row(0) < rows.row(1)); assert!(rows.row(2) > rows.row(1)); assert!(rows.row(3) > rows.row(2)); assert!(rows.row(4) > rows.row(2)); assert!(rows.row(5) > rows.row(2)); assert!(rows.row(3) > rows.row(5)); let back = converter.convert_rows(&rows).unwrap();
2073 assert_eq!(back.len(), 1);
2074 back[0].to_data().validate_full().unwrap();
2075 assert_eq!(&back[0], &list);
2076
2077 let options = SortOptions::default().desc().with_nulls_first(true);
2078 let field = SortField::new_with_options(d, options);
2079 let converter = RowConverter::new(vec![field]).unwrap();
2080 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2081
2082 assert!(rows.row(0) < rows.row(1)); assert!(rows.row(2) > rows.row(1)); assert!(rows.row(3) < rows.row(2)); assert!(rows.row(4) < rows.row(2)); assert!(rows.row(5) > rows.row(2)); assert!(rows.row(3) < rows.row(5)); let back = converter.convert_rows(&rows).unwrap();
2090 assert_eq!(back.len(), 1);
2091 back[0].to_data().validate_full().unwrap();
2092 assert_eq!(&back[0], &list);
2093 }
2094
2095 fn test_nested_list<O: OffsetSizeTrait>() {
2096 let mut builder =
2097 GenericListBuilder::<O, _>::new(GenericListBuilder::<O, _>::new(Int32Builder::new()));
2098
2099 builder.values().values().append_value(1);
2100 builder.values().values().append_value(2);
2101 builder.values().append(true);
2102 builder.values().values().append_value(1);
2103 builder.values().values().append_null();
2104 builder.values().append(true);
2105 builder.append(true);
2106
2107 builder.values().values().append_value(1);
2108 builder.values().values().append_null();
2109 builder.values().append(true);
2110 builder.values().values().append_value(1);
2111 builder.values().values().append_null();
2112 builder.values().append(true);
2113 builder.append(true);
2114
2115 builder.values().values().append_value(1);
2116 builder.values().values().append_null();
2117 builder.values().append(true);
2118 builder.values().append(false);
2119 builder.append(true);
2120 builder.append(false);
2121
2122 builder.values().values().append_value(1);
2123 builder.values().values().append_value(2);
2124 builder.values().append(true);
2125 builder.append(true);
2126
2127 let list = Arc::new(builder.finish()) as ArrayRef;
2128 let d = list.data_type().clone();
2129
2130 let options = SortOptions::default().asc().with_nulls_first(true);
2138 let field = SortField::new_with_options(d.clone(), options);
2139 let converter = RowConverter::new(vec![field]).unwrap();
2140 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2141
2142 assert!(rows.row(0) > rows.row(1));
2143 assert!(rows.row(1) > rows.row(2));
2144 assert!(rows.row(2) > rows.row(3));
2145 assert!(rows.row(4) < rows.row(0));
2146 assert!(rows.row(4) > rows.row(1));
2147
2148 let back = converter.convert_rows(&rows).unwrap();
2149 assert_eq!(back.len(), 1);
2150 back[0].to_data().validate_full().unwrap();
2151 assert_eq!(&back[0], &list);
2152
2153 let options = SortOptions::default().desc().with_nulls_first(true);
2154 let field = SortField::new_with_options(d.clone(), options);
2155 let converter = RowConverter::new(vec![field]).unwrap();
2156 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2157
2158 assert!(rows.row(0) > rows.row(1));
2159 assert!(rows.row(1) > rows.row(2));
2160 assert!(rows.row(2) > rows.row(3));
2161 assert!(rows.row(4) > rows.row(0));
2162 assert!(rows.row(4) > rows.row(1));
2163
2164 let back = converter.convert_rows(&rows).unwrap();
2165 assert_eq!(back.len(), 1);
2166 back[0].to_data().validate_full().unwrap();
2167 assert_eq!(&back[0], &list);
2168
2169 let options = SortOptions::default().desc().with_nulls_first(false);
2170 let field = SortField::new_with_options(d, options);
2171 let converter = RowConverter::new(vec![field]).unwrap();
2172 let rows = converter.convert_columns(&[Arc::clone(&list)]).unwrap();
2173
2174 assert!(rows.row(0) < rows.row(1));
2175 assert!(rows.row(1) < rows.row(2));
2176 assert!(rows.row(2) < rows.row(3));
2177 assert!(rows.row(4) > rows.row(0));
2178 assert!(rows.row(4) < rows.row(1));
2179
2180 let back = converter.convert_rows(&rows).unwrap();
2181 assert_eq!(back.len(), 1);
2182 back[0].to_data().validate_full().unwrap();
2183 assert_eq!(&back[0], &list);
2184 }
2185
2186 #[test]
2187 fn test_list() {
2188 test_single_list::<i32>();
2189 test_nested_list::<i32>();
2190 }
2191
2192 #[test]
2193 fn test_large_list() {
2194 test_single_list::<i64>();
2195 test_nested_list::<i64>();
2196 }
2197
2198 fn generate_primitive_array<K>(len: usize, valid_percent: f64) -> PrimitiveArray<K>
2199 where
2200 K: ArrowPrimitiveType,
2201 StandardUniform: Distribution<K::Native>,
2202 {
2203 let mut rng = rng();
2204 (0..len)
2205 .map(|_| rng.random_bool(valid_percent).then(|| rng.random()))
2206 .collect()
2207 }
2208
2209 fn generate_strings<O: OffsetSizeTrait>(
2210 len: usize,
2211 valid_percent: f64,
2212 ) -> GenericStringArray<O> {
2213 let mut rng = rng();
2214 (0..len)
2215 .map(|_| {
2216 rng.random_bool(valid_percent).then(|| {
2217 let len = rng.random_range(0..100);
2218 let bytes = (0..len).map(|_| rng.random_range(0..128)).collect();
2219 String::from_utf8(bytes).unwrap()
2220 })
2221 })
2222 .collect()
2223 }
2224
2225 fn generate_string_view(len: usize, valid_percent: f64) -> StringViewArray {
2226 let mut rng = rng();
2227 (0..len)
2228 .map(|_| {
2229 rng.random_bool(valid_percent).then(|| {
2230 let len = rng.random_range(0..100);
2231 let bytes = (0..len).map(|_| rng.random_range(0..128)).collect();
2232 String::from_utf8(bytes).unwrap()
2233 })
2234 })
2235 .collect()
2236 }
2237
2238 fn generate_byte_view(len: usize, valid_percent: f64) -> BinaryViewArray {
2239 let mut rng = rng();
2240 (0..len)
2241 .map(|_| {
2242 rng.random_bool(valid_percent).then(|| {
2243 let len = rng.random_range(0..100);
2244 let bytes: Vec<_> = (0..len).map(|_| rng.random_range(0..128)).collect();
2245 bytes
2246 })
2247 })
2248 .collect()
2249 }
2250
2251 fn generate_dictionary<K>(
2252 values: ArrayRef,
2253 len: usize,
2254 valid_percent: f64,
2255 ) -> DictionaryArray<K>
2256 where
2257 K: ArrowDictionaryKeyType,
2258 K::Native: SampleUniform,
2259 {
2260 let mut rng = rng();
2261 let min_key = K::Native::from_usize(0).unwrap();
2262 let max_key = K::Native::from_usize(values.len()).unwrap();
2263 let keys: PrimitiveArray<K> = (0..len)
2264 .map(|_| {
2265 rng.random_bool(valid_percent)
2266 .then(|| rng.random_range(min_key..max_key))
2267 })
2268 .collect();
2269
2270 let data_type =
2271 DataType::Dictionary(Box::new(K::DATA_TYPE), Box::new(values.data_type().clone()));
2272
2273 let data = keys
2274 .into_data()
2275 .into_builder()
2276 .data_type(data_type)
2277 .add_child_data(values.to_data())
2278 .build()
2279 .unwrap();
2280
2281 DictionaryArray::from(data)
2282 }
2283
2284 fn generate_fixed_size_binary(len: usize, valid_percent: f64) -> FixedSizeBinaryArray {
2285 let mut rng = rng();
2286 let width = rng.random_range(0..20);
2287 let mut builder = FixedSizeBinaryBuilder::new(width);
2288
2289 let mut b = vec![0; width as usize];
2290 for _ in 0..len {
2291 match rng.random_bool(valid_percent) {
2292 true => {
2293 b.iter_mut().for_each(|x| *x = rng.random());
2294 builder.append_value(&b).unwrap();
2295 }
2296 false => builder.append_null(),
2297 }
2298 }
2299
2300 builder.finish()
2301 }
2302
2303 fn generate_struct(len: usize, valid_percent: f64) -> StructArray {
2304 let mut rng = rng();
2305 let nulls = NullBuffer::from_iter((0..len).map(|_| rng.random_bool(valid_percent)));
2306 let a = generate_primitive_array::<Int32Type>(len, valid_percent);
2307 let b = generate_strings::<i32>(len, valid_percent);
2308 let fields = Fields::from(vec![
2309 Field::new("a", DataType::Int32, true),
2310 Field::new("b", DataType::Utf8, true),
2311 ]);
2312 let values = vec![Arc::new(a) as _, Arc::new(b) as _];
2313 StructArray::new(fields, values, Some(nulls))
2314 }
2315
2316 fn generate_list<F>(len: usize, valid_percent: f64, values: F) -> ListArray
2317 where
2318 F: FnOnce(usize) -> ArrayRef,
2319 {
2320 let mut rng = rng();
2321 let offsets = OffsetBuffer::<i32>::from_lengths((0..len).map(|_| rng.random_range(0..10)));
2322 let values_len = offsets.last().unwrap().to_usize().unwrap();
2323 let values = values(values_len);
2324 let nulls = NullBuffer::from_iter((0..len).map(|_| rng.random_bool(valid_percent)));
2325 let field = Arc::new(Field::new_list_field(values.data_type().clone(), true));
2326 ListArray::new(field, offsets, values, Some(nulls))
2327 }
2328
2329 fn generate_column(len: usize) -> ArrayRef {
2330 let mut rng = rng();
2331 match rng.random_range(0..16) {
2332 0 => Arc::new(generate_primitive_array::<Int32Type>(len, 0.8)),
2333 1 => Arc::new(generate_primitive_array::<UInt32Type>(len, 0.8)),
2334 2 => Arc::new(generate_primitive_array::<Int64Type>(len, 0.8)),
2335 3 => Arc::new(generate_primitive_array::<UInt64Type>(len, 0.8)),
2336 4 => Arc::new(generate_primitive_array::<Float32Type>(len, 0.8)),
2337 5 => Arc::new(generate_primitive_array::<Float64Type>(len, 0.8)),
2338 6 => Arc::new(generate_strings::<i32>(len, 0.8)),
2339 7 => Arc::new(generate_dictionary::<Int64Type>(
2340 Arc::new(generate_strings::<i32>(rng.random_range(1..len), 1.0)),
2342 len,
2343 0.8,
2344 )),
2345 8 => Arc::new(generate_dictionary::<Int64Type>(
2346 Arc::new(generate_primitive_array::<Int64Type>(
2348 rng.random_range(1..len),
2349 1.0,
2350 )),
2351 len,
2352 0.8,
2353 )),
2354 9 => Arc::new(generate_fixed_size_binary(len, 0.8)),
2355 10 => Arc::new(generate_struct(len, 0.8)),
2356 11 => Arc::new(generate_list(len, 0.8, |values_len| {
2357 Arc::new(generate_primitive_array::<Int64Type>(values_len, 0.8))
2358 })),
2359 12 => Arc::new(generate_list(len, 0.8, |values_len| {
2360 Arc::new(generate_strings::<i32>(values_len, 0.8))
2361 })),
2362 13 => Arc::new(generate_list(len, 0.8, |values_len| {
2363 Arc::new(generate_struct(values_len, 0.8))
2364 })),
2365 14 => Arc::new(generate_string_view(len, 0.8)),
2366 15 => Arc::new(generate_byte_view(len, 0.8)),
2367 _ => unreachable!(),
2368 }
2369 }
2370
2371 fn print_row(cols: &[SortColumn], row: usize) -> String {
2372 let t: Vec<_> = cols
2373 .iter()
2374 .map(|x| match x.values.is_valid(row) {
2375 true => {
2376 let opts = FormatOptions::default().with_null("NULL");
2377 let formatter = ArrayFormatter::try_new(x.values.as_ref(), &opts).unwrap();
2378 formatter.value(row).to_string()
2379 }
2380 false => "NULL".to_string(),
2381 })
2382 .collect();
2383 t.join(",")
2384 }
2385
2386 fn print_col_types(cols: &[SortColumn]) -> String {
2387 let t: Vec<_> = cols
2388 .iter()
2389 .map(|x| x.values.data_type().to_string())
2390 .collect();
2391 t.join(",")
2392 }
2393
2394 #[test]
2395 #[cfg_attr(miri, ignore)]
2396 fn fuzz_test() {
2397 for _ in 0..100 {
2398 let mut rng = rng();
2399 let num_columns = rng.random_range(1..5);
2400 let len = rng.random_range(5..100);
2401 let arrays: Vec<_> = (0..num_columns).map(|_| generate_column(len)).collect();
2402
2403 let options: Vec<_> = (0..num_columns)
2404 .map(|_| SortOptions {
2405 descending: rng.random_bool(0.5),
2406 nulls_first: rng.random_bool(0.5),
2407 })
2408 .collect();
2409
2410 let sort_columns: Vec<_> = options
2411 .iter()
2412 .zip(&arrays)
2413 .map(|(o, c)| SortColumn {
2414 values: Arc::clone(c),
2415 options: Some(*o),
2416 })
2417 .collect();
2418
2419 let comparator = LexicographicalComparator::try_new(&sort_columns).unwrap();
2420
2421 let columns: Vec<SortField> = options
2422 .into_iter()
2423 .zip(&arrays)
2424 .map(|(o, a)| SortField::new_with_options(a.data_type().clone(), o))
2425 .collect();
2426
2427 let converter = RowConverter::new(columns).unwrap();
2428 let rows = converter.convert_columns(&arrays).unwrap();
2429
2430 for i in 0..len {
2431 for j in 0..len {
2432 let row_i = rows.row(i);
2433 let row_j = rows.row(j);
2434 let row_cmp = row_i.cmp(&row_j);
2435 let lex_cmp = comparator.compare(i, j);
2436 assert_eq!(
2437 row_cmp,
2438 lex_cmp,
2439 "({:?} vs {:?}) vs ({:?} vs {:?}) for types {}",
2440 print_row(&sort_columns, i),
2441 print_row(&sort_columns, j),
2442 row_i,
2443 row_j,
2444 print_col_types(&sort_columns)
2445 );
2446 }
2447 }
2448
2449 let back = converter.convert_rows(&rows).unwrap();
2450 for (actual, expected) in back.iter().zip(&arrays) {
2451 actual.to_data().validate_full().unwrap();
2452 dictionary_eq(actual, expected)
2453 }
2454
2455 let rows = rows.try_into_binary().expect("reasonable size");
2457 let parser = converter.parser();
2458 let back = converter
2459 .convert_rows(rows.iter().map(|b| parser.parse(b.expect("valid bytes"))))
2460 .unwrap();
2461 for (actual, expected) in back.iter().zip(&arrays) {
2462 actual.to_data().validate_full().unwrap();
2463 dictionary_eq(actual, expected)
2464 }
2465
2466 let rows = converter.from_binary(rows);
2467 let back = converter.convert_rows(&rows).unwrap();
2468 for (actual, expected) in back.iter().zip(&arrays) {
2469 actual.to_data().validate_full().unwrap();
2470 dictionary_eq(actual, expected)
2471 }
2472 }
2473 }
2474
2475 #[test]
2476 fn test_clear() {
2477 let converter = RowConverter::new(vec![SortField::new(DataType::Int32)]).unwrap();
2478 let mut rows = converter.empty_rows(3, 128);
2479
2480 let first = Int32Array::from(vec![None, Some(2), Some(4)]);
2481 let second = Int32Array::from(vec![Some(2), None, Some(4)]);
2482 let arrays = [Arc::new(first) as ArrayRef, Arc::new(second) as ArrayRef];
2483
2484 for array in arrays.iter() {
2485 rows.clear();
2486 converter.append(&mut rows, &[array.clone()]).unwrap();
2487 let back = converter.convert_rows(&rows).unwrap();
2488 assert_eq!(&back[0], array);
2489 }
2490
2491 let mut rows_expected = converter.empty_rows(3, 128);
2492 converter.append(&mut rows_expected, &arrays[1..]).unwrap();
2493
2494 for (i, (actual, expected)) in rows.iter().zip(rows_expected.iter()).enumerate() {
2495 assert_eq!(
2496 actual, expected,
2497 "For row {}: expected {:?}, actual: {:?}",
2498 i, expected, actual
2499 );
2500 }
2501 }
2502
2503 #[test]
2504 fn test_append_codec_dictionary_binary() {
2505 use DataType::*;
2506 let converter = RowConverter::new(vec![SortField::new(Dictionary(
2508 Box::new(Int32),
2509 Box::new(Binary),
2510 ))])
2511 .unwrap();
2512 let mut rows = converter.empty_rows(4, 128);
2513
2514 let keys = Int32Array::from_iter_values([0, 1, 2, 3]);
2515 let values = BinaryArray::from(vec![
2516 Some("a".as_bytes()),
2517 Some(b"b"),
2518 Some(b"c"),
2519 Some(b"d"),
2520 ]);
2521 let dict_array = DictionaryArray::new(keys, Arc::new(values));
2522
2523 rows.clear();
2524 let array = Arc::new(dict_array) as ArrayRef;
2525 converter.append(&mut rows, &[array.clone()]).unwrap();
2526 let back = converter.convert_rows(&rows).unwrap();
2527
2528 dictionary_eq(&back[0], &array);
2529 }
2530
2531 #[test]
2532 fn test_list_prefix() {
2533 let mut a = ListBuilder::new(Int8Builder::new());
2534 a.append_value([None]);
2535 a.append_value([None, None]);
2536 let a = a.finish();
2537
2538 let converter = RowConverter::new(vec![SortField::new(a.data_type().clone())]).unwrap();
2539 let rows = converter.convert_columns(&[Arc::new(a) as _]).unwrap();
2540 assert_eq!(rows.row(0).cmp(&rows.row(1)), Ordering::Less);
2541 }
2542}