1use std::any::Any;
19use std::marker::PhantomData;
20
21use arrow_array::{Array, ArrayRef, OffsetSizeTrait, new_empty_array};
22use arrow_buffer::ArrowNativeType;
23use arrow_schema::DataType as ArrowType;
24use bytes::Bytes;
25
26use crate::arrow::array_reader::byte_array::{ByteArrayDecoder, ByteArrayDecoderPlain};
27use crate::arrow::array_reader::{ArrayReader, read_records, skip_records};
28use crate::arrow::buffer::{dictionary_buffer::DictionaryBuffer, offset_buffer::OffsetBuffer};
29use crate::arrow::record_reader::GenericRecordReader;
30use crate::arrow::schema::parquet_to_arrow_field;
31use crate::basic::{ConvertedType, Encoding};
32use crate::column::page::PageIterator;
33use crate::column::reader::decoder::ColumnValueDecoder;
34use crate::encodings::rle::RleDecoder;
35use crate::errors::{ParquetError, Result};
36use crate::schema::types::ColumnDescPtr;
37use crate::util::bit_util::FromBytes;
38
39macro_rules! make_reader {
41 (
42 ($pages:expr, $column_desc:expr, $data_type:expr) => match ($k:expr, $v:expr) {
43 $(($key_arrow:pat, $value_arrow:pat) => ($key_type:ty, $value_type:ty),)+
44 }
45 ) => {
46 match (($k, $v)) {
47 $(
48 ($key_arrow, $value_arrow) => {
49 let reader = GenericRecordReader::new($column_desc);
50 Ok(Box::new(ByteArrayDictionaryReader::<$key_type, $value_type>::new(
51 $pages, $data_type, reader,
52 )))
53 }
54 )+
55 _ => Err(general_err!(
56 "unsupported data type for byte array dictionary reader - {}",
57 $data_type
58 )),
59 }
60 }
61}
62
63pub fn make_byte_array_dictionary_reader(
76 pages: Box<dyn PageIterator>,
77 column_desc: ColumnDescPtr,
78 arrow_type: Option<ArrowType>,
79) -> Result<Box<dyn ArrayReader>> {
80 let data_type = match arrow_type {
82 Some(t) => t,
83 None => parquet_to_arrow_field(column_desc.as_ref())?
84 .data_type()
85 .clone(),
86 };
87
88 match &data_type {
89 ArrowType::Dictionary(key_type, value_type) => {
90 make_reader! {
91 (pages, column_desc, data_type) => match (key_type.as_ref(), value_type.as_ref()) {
92 (ArrowType::UInt8, ArrowType::Binary | ArrowType::Utf8 | ArrowType::FixedSizeBinary(_)) => (u8, i32),
93 (ArrowType::UInt8, ArrowType::LargeBinary | ArrowType::LargeUtf8) => (u8, i64),
94 (ArrowType::Int8, ArrowType::Binary | ArrowType::Utf8 | ArrowType::FixedSizeBinary(_)) => (i8, i32),
95 (ArrowType::Int8, ArrowType::LargeBinary | ArrowType::LargeUtf8) => (i8, i64),
96 (ArrowType::UInt16, ArrowType::Binary | ArrowType::Utf8 | ArrowType::FixedSizeBinary(_)) => (u16, i32),
97 (ArrowType::UInt16, ArrowType::LargeBinary | ArrowType::LargeUtf8) => (u16, i64),
98 (ArrowType::Int16, ArrowType::Binary | ArrowType::Utf8 | ArrowType::FixedSizeBinary(_)) => (i16, i32),
99 (ArrowType::Int16, ArrowType::LargeBinary | ArrowType::LargeUtf8) => (i16, i64),
100 (ArrowType::UInt32, ArrowType::Binary | ArrowType::Utf8 | ArrowType::FixedSizeBinary(_)) => (u32, i32),
101 (ArrowType::UInt32, ArrowType::LargeBinary | ArrowType::LargeUtf8) => (u32, i64),
102 (ArrowType::Int32, ArrowType::Binary | ArrowType::Utf8 | ArrowType::FixedSizeBinary(_)) => (i32, i32),
103 (ArrowType::Int32, ArrowType::LargeBinary | ArrowType::LargeUtf8) => (i32, i64),
104 (ArrowType::UInt64, ArrowType::Binary | ArrowType::Utf8 | ArrowType::FixedSizeBinary(_)) => (u64, i32),
105 (ArrowType::UInt64, ArrowType::LargeBinary | ArrowType::LargeUtf8) => (u64, i64),
106 (ArrowType::Int64, ArrowType::Binary | ArrowType::Utf8 | ArrowType::FixedSizeBinary(_)) => (i64, i32),
107 (ArrowType::Int64, ArrowType::LargeBinary | ArrowType::LargeUtf8) => (i64, i64),
108 }
109 }
110 }
111 _ => Err(general_err!(
112 "invalid non-dictionary data type for byte array dictionary reader - {}",
113 data_type
114 )),
115 }
116}
117
118struct ByteArrayDictionaryReader<K: ArrowNativeType, V: OffsetSizeTrait> {
122 data_type: ArrowType,
123 pages: Box<dyn PageIterator>,
124 def_levels_buffer: Option<Vec<i16>>,
125 rep_levels_buffer: Option<Vec<i16>>,
126 record_reader: GenericRecordReader<DictionaryBuffer<K, V>, DictionaryDecoder<K, V>>,
127}
128
129impl<K, V> ByteArrayDictionaryReader<K, V>
130where
131 K: FromBytes + Ord + ArrowNativeType,
132 V: OffsetSizeTrait,
133{
134 fn new(
135 pages: Box<dyn PageIterator>,
136 data_type: ArrowType,
137 record_reader: GenericRecordReader<DictionaryBuffer<K, V>, DictionaryDecoder<K, V>>,
138 ) -> Self {
139 Self {
140 data_type,
141 pages,
142 def_levels_buffer: None,
143 rep_levels_buffer: None,
144 record_reader,
145 }
146 }
147}
148
149impl<K, V> ArrayReader for ByteArrayDictionaryReader<K, V>
150where
151 K: FromBytes + Ord + ArrowNativeType,
152 V: OffsetSizeTrait,
153{
154 fn as_any(&self) -> &dyn Any {
155 self
156 }
157
158 fn get_data_type(&self) -> &ArrowType {
159 &self.data_type
160 }
161
162 fn read_records(&mut self, batch_size: usize) -> Result<usize> {
163 read_records(&mut self.record_reader, self.pages.as_mut(), batch_size)
164 }
165
166 fn consume_batch(&mut self) -> Result<ArrayRef> {
167 self.def_levels_buffer = self.record_reader.consume_def_levels();
169 self.rep_levels_buffer = self.record_reader.consume_rep_levels();
170
171 if self.record_reader.num_values() == 0 {
172 return Ok(new_empty_array(&self.data_type));
176 }
177
178 let buffer = self.record_reader.consume_record_data();
179 let null_buffer = self.record_reader.consume_bitmap_buffer();
180 let array = buffer.into_array(null_buffer, &self.data_type)?;
181 self.record_reader.reset();
182
183 Ok(array)
184 }
185
186 fn skip_records(&mut self, num_records: usize) -> Result<usize> {
187 skip_records(&mut self.record_reader, self.pages.as_mut(), num_records)
188 }
189
190 fn get_def_levels(&self) -> Option<&[i16]> {
191 self.def_levels_buffer.as_deref()
192 }
193
194 fn get_rep_levels(&self) -> Option<&[i16]> {
195 self.rep_levels_buffer.as_deref()
196 }
197}
198
199enum MaybeDictionaryDecoder {
203 Dict {
204 decoder: RleDecoder,
205 max_remaining_values: usize,
208 },
209 Fallback(ByteArrayDecoder),
210}
211
212struct DictionaryDecoder<K, V> {
214 dict: Option<ArrayRef>,
216
217 decoder: Option<MaybeDictionaryDecoder>,
219
220 validate_utf8: bool,
221
222 value_type: ArrowType,
223
224 phantom: PhantomData<(K, V)>,
225}
226
227impl<K, V> ColumnValueDecoder for DictionaryDecoder<K, V>
228where
229 K: FromBytes + Ord + ArrowNativeType,
230 V: OffsetSizeTrait,
231{
232 type Buffer = DictionaryBuffer<K, V>;
233
234 fn new(col: &ColumnDescPtr) -> Self {
235 let validate_utf8 = col.converted_type() == ConvertedType::UTF8;
236
237 let value_type = match (V::IS_LARGE, col.converted_type() == ConvertedType::UTF8) {
238 (true, true) => ArrowType::LargeUtf8,
239 (true, false) => ArrowType::LargeBinary,
240 (false, true) => ArrowType::Utf8,
241 (false, false) => ArrowType::Binary,
242 };
243
244 Self {
245 dict: None,
246 decoder: None,
247 validate_utf8,
248 value_type,
249 phantom: Default::default(),
250 }
251 }
252
253 fn set_dict(
254 &mut self,
255 buf: Bytes,
256 num_values: u32,
257 encoding: Encoding,
258 _is_sorted: bool,
259 ) -> Result<()> {
260 if !matches!(
261 encoding,
262 Encoding::PLAIN | Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY
263 ) {
264 return Err(nyi_err!(
265 "Invalid/Unsupported encoding type for dictionary: {}",
266 encoding
267 ));
268 }
269
270 if K::from_usize(num_values as usize).is_none() {
271 return Err(general_err!("dictionary too large for index type"));
272 }
273
274 let len = num_values as usize;
275 let mut buffer = OffsetBuffer::<V>::default();
276 let mut decoder = ByteArrayDecoderPlain::new(buf, len, Some(len), self.validate_utf8);
277 decoder.read(&mut buffer, usize::MAX)?;
278
279 let array = buffer.into_array(None, self.value_type.clone());
280 self.dict = Some(array);
281 Ok(())
282 }
283
284 fn set_data(
285 &mut self,
286 encoding: Encoding,
287 data: Bytes,
288 num_levels: usize,
289 num_values: Option<usize>,
290 ) -> Result<()> {
291 let decoder = match encoding {
292 Encoding::RLE_DICTIONARY | Encoding::PLAIN_DICTIONARY => {
293 let bit_width = data[0];
294 let mut decoder = RleDecoder::new(bit_width);
295 decoder.set_data(data.slice(1..))?;
296 MaybeDictionaryDecoder::Dict {
297 decoder,
298 max_remaining_values: num_values.unwrap_or(num_levels),
299 }
300 }
301 _ => MaybeDictionaryDecoder::Fallback(ByteArrayDecoder::new(
302 encoding,
303 data,
304 num_levels,
305 num_values,
306 self.validate_utf8,
307 )?),
308 };
309
310 self.decoder = Some(decoder);
311 Ok(())
312 }
313
314 fn read(&mut self, out: &mut Self::Buffer, num_values: usize) -> Result<usize> {
315 match self.decoder.as_mut().expect("decoder set") {
316 MaybeDictionaryDecoder::Fallback(decoder) => {
317 decoder.read(out.spill_values()?, num_values, None)
318 }
319 MaybeDictionaryDecoder::Dict {
320 decoder,
321 max_remaining_values,
322 } => {
323 let len = num_values.min(*max_remaining_values);
324
325 let dict = self
326 .dict
327 .as_ref()
328 .ok_or_else(|| general_err!("missing dictionary page for column"))?;
329
330 assert_eq!(dict.data_type(), &self.value_type);
331
332 if dict.is_empty() {
333 return Ok(0); }
335
336 match out.as_keys(dict) {
337 Some(keys) => {
338 let start = keys.len();
343 keys.resize(start + len, K::default());
344 let len = decoder.get_batch(&mut keys[start..])?;
345 keys.truncate(start + len);
346 *max_remaining_values -= len;
347 Ok(len)
348 }
349 None => {
350 let values = out.spill_values()?;
355 let mut keys = vec![K::default(); len];
356 let len = decoder.get_batch(&mut keys)?;
357
358 assert_eq!(dict.data_type(), &self.value_type);
359
360 let data = dict.to_data();
361 let dict_buffers = data.buffers();
362 let dict_offsets = dict_buffers[0].typed_data::<V>();
363 let dict_values = dict_buffers[1].as_slice();
364
365 values.extend_from_dictionary(&keys[..len], dict_offsets, dict_values)?;
366 *max_remaining_values -= len;
367 Ok(len)
368 }
369 }
370 }
371 }
372 }
373
374 fn skip_values(&mut self, num_values: usize) -> Result<usize> {
375 match self.decoder.as_mut().expect("decoder set") {
376 MaybeDictionaryDecoder::Fallback(decoder) => decoder.skip::<V>(num_values, None),
377 MaybeDictionaryDecoder::Dict {
378 decoder,
379 max_remaining_values,
380 } => {
381 let num_values = num_values.min(*max_remaining_values);
382 *max_remaining_values -= num_values;
383 decoder.skip(num_values)
384 }
385 }
386 }
387}
388
389#[cfg(test)]
390mod tests {
391 use arrow::compute::cast;
392 use arrow_array::{Array, StringArray};
393 use arrow_buffer::Buffer;
394
395 use crate::arrow::array_reader::test_util::{
396 byte_array_all_encodings, encode_dictionary, utf8_column,
397 };
398 use crate::arrow::record_reader::buffer::ValuesBuffer;
399 use crate::data_type::ByteArray;
400
401 use super::*;
402
403 fn utf8_dictionary() -> ArrowType {
404 ArrowType::Dictionary(Box::new(ArrowType::Int32), Box::new(ArrowType::Utf8))
405 }
406
407 #[test]
408 fn test_dictionary_preservation() {
409 let data_type = utf8_dictionary();
410
411 let data: Vec<_> = vec!["0", "1", "0", "1", "2", "1", "2"]
412 .into_iter()
413 .map(ByteArray::from)
414 .collect();
415 let (dict, encoded) = encode_dictionary(&data);
416
417 let column_desc = utf8_column();
418 let mut decoder = DictionaryDecoder::<i32, i32>::new(&column_desc);
419
420 decoder
421 .set_dict(dict, 3, Encoding::RLE_DICTIONARY, false)
422 .unwrap();
423
424 decoder
425 .set_data(Encoding::RLE_DICTIONARY, encoded, 14, Some(data.len()))
426 .unwrap();
427
428 let mut output = DictionaryBuffer::<i32, i32>::default();
429 assert_eq!(decoder.read(&mut output, 3).unwrap(), 3);
430
431 let mut valid = vec![false, false, true, true, false, true];
432 let valid_buffer = Buffer::from_iter(valid.iter().cloned());
433 output.pad_nulls(0, 3, valid.len(), valid_buffer.as_slice());
434
435 assert!(matches!(output, DictionaryBuffer::Dict { .. }));
436
437 assert_eq!(decoder.read(&mut output, 4).unwrap(), 4);
438
439 valid.extend_from_slice(&[false, false, true, true, false, true, true, false]);
440 let valid_buffer = Buffer::from_iter(valid.iter().cloned());
441 output.pad_nulls(6, 4, 8, valid_buffer.as_slice());
442
443 assert!(matches!(output, DictionaryBuffer::Dict { .. }));
444
445 let array = output.into_array(Some(valid_buffer), &data_type).unwrap();
446 assert_eq!(array.data_type(), &data_type);
447
448 let array = cast(&array, &ArrowType::Utf8).unwrap();
449 let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
450 assert_eq!(strings.len(), 14);
451
452 assert_eq!(
453 strings.iter().collect::<Vec<_>>(),
454 vec![
455 None,
456 None,
457 Some("0"),
458 Some("1"),
459 None,
460 Some("0"),
461 None,
462 None,
463 Some("1"),
464 Some("2"),
465 None,
466 Some("1"),
467 Some("2"),
468 None
469 ]
470 )
471 }
472
473 #[test]
474 fn test_dictionary_preservation_skip() {
475 let data_type = utf8_dictionary();
476
477 let data: Vec<_> = vec!["0", "1", "0", "1", "2", "1", "2"]
478 .into_iter()
479 .map(ByteArray::from)
480 .collect();
481 let (dict, encoded) = encode_dictionary(&data);
482
483 let column_desc = utf8_column();
484 let mut decoder = DictionaryDecoder::<i32, i32>::new(&column_desc);
485
486 decoder
487 .set_dict(dict, 3, Encoding::RLE_DICTIONARY, false)
488 .unwrap();
489
490 decoder
491 .set_data(Encoding::RLE_DICTIONARY, encoded, 7, Some(data.len()))
492 .unwrap();
493
494 let mut output = DictionaryBuffer::<i32, i32>::default();
495
496 assert_eq!(decoder.read(&mut output, 2).unwrap(), 2);
498 assert_eq!(decoder.skip_values(1).unwrap(), 1);
499
500 assert!(matches!(output, DictionaryBuffer::Dict { .. }));
501
502 assert_eq!(decoder.read(&mut output, 2).unwrap(), 2);
504 assert_eq!(decoder.skip_values(1).unwrap(), 1);
505
506 assert_eq!(decoder.read(&mut output, 1).unwrap(), 1);
508 assert_eq!(decoder.skip_values(4).unwrap(), 0);
509
510 let valid = [true, true, true, true, true];
511 let valid_buffer = Buffer::from_iter(valid.iter().cloned());
512 output.pad_nulls(0, 5, 5, valid_buffer.as_slice());
513
514 assert!(matches!(output, DictionaryBuffer::Dict { .. }));
515
516 let array = output.into_array(Some(valid_buffer), &data_type).unwrap();
517 assert_eq!(array.data_type(), &data_type);
518
519 let array = cast(&array, &ArrowType::Utf8).unwrap();
520 let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
521 assert_eq!(strings.len(), 5);
522
523 assert_eq!(
524 strings.iter().collect::<Vec<_>>(),
525 vec![Some("0"), Some("1"), Some("1"), Some("2"), Some("2"),]
526 )
527 }
528
529 #[test]
530 fn test_dictionary_fallback() {
531 let data_type = utf8_dictionary();
532 let data = vec!["hello", "world", "a", "b"];
533
534 let (pages, encoded_dictionary) = byte_array_all_encodings(data.clone());
535 let num_encodings = pages.len();
536
537 let column_desc = utf8_column();
538 let mut decoder = DictionaryDecoder::<i32, i32>::new(&column_desc);
539
540 decoder
541 .set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false)
542 .unwrap();
543
544 let mut output = DictionaryBuffer::<i32, i32>::default();
546
547 for (encoding, page) in pages {
548 decoder.set_data(encoding, page, 4, Some(4)).unwrap();
549 assert_eq!(decoder.read(&mut output, 1024).unwrap(), 4);
550 }
551 let array = output.into_array(None, &data_type).unwrap();
552 assert_eq!(array.data_type(), &data_type);
553
554 let array = cast(&array, &ArrowType::Utf8).unwrap();
555 let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
556 assert_eq!(strings.len(), data.len() * num_encodings);
557
558 for i in 0..num_encodings {
560 assert_eq!(
561 strings
562 .iter()
563 .skip(i * data.len())
564 .take(data.len())
565 .map(|x| x.unwrap())
566 .collect::<Vec<_>>(),
567 data
568 )
569 }
570 }
571
572 #[test]
573 fn test_dictionary_skip_fallback() {
574 let data_type = utf8_dictionary();
575 let data = vec!["hello", "world", "a", "b"];
576
577 let (pages, encoded_dictionary) = byte_array_all_encodings(data.clone());
578 let num_encodings = pages.len();
579
580 let column_desc = utf8_column();
581 let mut decoder = DictionaryDecoder::<i32, i32>::new(&column_desc);
582
583 decoder
584 .set_dict(encoded_dictionary, 4, Encoding::RLE_DICTIONARY, false)
585 .unwrap();
586
587 let mut output = DictionaryBuffer::<i32, i32>::default();
589
590 for (encoding, page) in pages {
591 decoder.set_data(encoding, page, 4, Some(4)).unwrap();
592 decoder.skip_values(2).expect("skipping two values");
593 assert_eq!(decoder.read(&mut output, 1024).unwrap(), 2);
594 }
595 let array = output.into_array(None, &data_type).unwrap();
596 assert_eq!(array.data_type(), &data_type);
597
598 let array = cast(&array, &ArrowType::Utf8).unwrap();
599 let strings = array.as_any().downcast_ref::<StringArray>().unwrap();
600 assert_eq!(strings.len(), (data.len() - 2) * num_encodings);
601
602 for i in 0..num_encodings {
604 assert_eq!(
605 &strings
606 .iter()
607 .skip(i * (data.len() - 2))
608 .take(data.len() - 2)
609 .map(|x| x.unwrap())
610 .collect::<Vec<_>>(),
611 &data[2..]
612 )
613 }
614 }
615
616 #[test]
617 fn test_too_large_dictionary() {
618 let data: Vec<_> = (0..128)
619 .map(|x| ByteArray::from(x.to_string().as_str()))
620 .collect();
621 let (dictionary, _) = encode_dictionary(&data);
622
623 let column_desc = utf8_column();
624
625 let mut decoder = DictionaryDecoder::<i8, i32>::new(&column_desc);
626 let err = decoder
627 .set_dict(dictionary.clone(), 128, Encoding::RLE_DICTIONARY, false)
628 .unwrap_err()
629 .to_string();
630
631 assert!(err.contains("dictionary too large for index type"));
632
633 let mut decoder = DictionaryDecoder::<i16, i32>::new(&column_desc);
634 decoder
635 .set_dict(dictionary, 128, Encoding::RLE_DICTIONARY, false)
636 .unwrap();
637 }
638
639 #[test]
640 fn test_nulls() {
641 let data_type = utf8_dictionary();
642 let (pages, encoded_dictionary) = byte_array_all_encodings(Vec::<&str>::new());
643
644 let column_desc = utf8_column();
645 let mut decoder = DictionaryDecoder::new(&column_desc);
646
647 decoder
648 .set_dict(encoded_dictionary, 4, Encoding::PLAIN_DICTIONARY, false)
649 .unwrap();
650
651 for (encoding, page) in pages.clone() {
652 let mut output = DictionaryBuffer::<i32, i32>::default();
653 decoder.set_data(encoding, page, 8, None).unwrap();
654 assert_eq!(decoder.read(&mut output, 1024).unwrap(), 0);
655
656 output.pad_nulls(0, 0, 8, &[0]);
657 let array = output
658 .into_array(Some(Buffer::from(&[0])), &data_type)
659 .unwrap();
660
661 assert_eq!(array.len(), 8);
662 assert_eq!(array.null_count(), 8);
663 assert_eq!(array.logical_null_count(), 8);
664 }
665
666 for (encoding, page) in pages {
667 let mut output = DictionaryBuffer::<i32, i32>::default();
668 decoder.set_data(encoding, page, 8, None).unwrap();
669 assert_eq!(decoder.skip_values(1024).unwrap(), 0);
670
671 output.pad_nulls(0, 0, 8, &[0]);
672 let array = output
673 .into_array(Some(Buffer::from(&[0])), &data_type)
674 .unwrap();
675
676 assert_eq!(array.len(), 8);
677 assert_eq!(array.null_count(), 8);
678 assert_eq!(array.logical_null_count(), 8);
679 }
680 }
681}