parquet/arrow/push_decoder/mod.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ParquetPushDecoder`]: decodes Parquet data with data provided by the
19//! caller (rather than from an underlying reader).
20
21mod reader_builder;
22mod remaining;
23
24use crate::DecodeResult;
25use crate::arrow::arrow_reader::{
26 ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader,
27};
28use crate::errors::ParquetError;
29use crate::file::metadata::ParquetMetaData;
30use crate::util::push_buffers::PushBuffers;
31use arrow_array::RecordBatch;
32use bytes::Bytes;
33use reader_builder::RowGroupReaderBuilder;
34use remaining::RemainingRowGroups;
35use std::ops::Range;
36use std::sync::Arc;
37
38/// A builder for [`ParquetPushDecoder`].
39///
40/// To create a new decoder, use [`ParquetPushDecoderBuilder::try_new_decoder`] and pass
41/// the file length and metadata of the Parquet file to decode.
42///
43/// You can decode the metadata from a Parquet file using either
44/// [`ParquetMetadataReader`] or [`ParquetMetaDataPushDecoder`].
45///
46/// [`ParquetMetadataReader`]: crate::file::metadata::ParquetMetaDataReader
47/// [`ParquetMetaDataPushDecoder`]: crate::file::metadata::ParquetMetaDataPushDecoder
48///
49/// Note the "input" type is `u64` which represents the length of the Parquet file
50/// being decoded. This is needed to initialize the internal buffers that track
51/// what data has been provided to the decoder.
52///
53/// # Example
54/// ```
55/// # use std::ops::Range;
56/// # use std::sync::Arc;
57/// # use bytes::Bytes;
58/// # use arrow_array::record_batch;
59/// # use parquet::DecodeResult;
60/// # use parquet::arrow::push_decoder::ParquetPushDecoderBuilder;
61/// # use parquet::arrow::ArrowWriter;
62/// # use parquet::file::metadata::ParquetMetaDataPushDecoder;
63/// # let file_bytes = {
64/// # let mut buffer = vec![];
65/// # let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
66/// # let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
67/// # writer.write(&batch).unwrap();
68/// # writer.close().unwrap();
69/// # Bytes::from(buffer)
70/// # };
71/// # // mimic IO by returning a function that returns the bytes for a given range
72/// # let get_range = |range: &Range<u64>| -> Bytes {
73/// # let start = range.start as usize;
74/// # let end = range.end as usize;
75/// # file_bytes.slice(start..end)
76/// # };
77/// # let file_length = file_bytes.len() as u64;
78/// # let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_length).unwrap();
79/// # metadata_decoder.push_ranges(vec![0..file_length], vec![file_bytes.clone()]).unwrap();
80/// # let DecodeResult::Data(parquet_metadata) = metadata_decoder.try_decode().unwrap() else { panic!("failed to decode metadata") };
81/// # let parquet_metadata = Arc::new(parquet_metadata);
82/// // The file length and metadata are required to create the decoder
83/// let mut decoder =
84/// ParquetPushDecoderBuilder::try_new_decoder(file_length, parquet_metadata)
85/// .unwrap()
86/// // Optionally configure the decoder, e.g. batch size
87/// .with_batch_size(1024)
88/// // Build the decoder
89/// .build()
90/// .unwrap();
91///
92/// // In a loop, ask the decoder what it needs next, and provide it with the required data
93/// loop {
94/// match decoder.try_decode().unwrap() {
95/// DecodeResult::NeedsData(ranges) => {
96/// // The decoder needs more data. Fetch the data for the given ranges
97/// let data = ranges.iter().map(|r| get_range(r)).collect::<Vec<_>>();
98/// // Push the data to the decoder
99/// decoder.push_ranges(ranges, data).unwrap();
100/// // After pushing the data, we can try to decode again on the next iteration
101/// }
102/// DecodeResult::Data(batch) => {
103/// // Successfully decoded a batch of data
104/// assert!(batch.num_rows() > 0);
105/// }
106/// DecodeResult::Finished => {
107/// // The decoder has finished decoding exit the loop
108/// break;
109/// }
110/// }
111/// }
112/// ```
113pub type ParquetPushDecoderBuilder = ArrowReaderBuilder<u64>;
114
115/// Methods for building a ParquetDecoder. See the base [`ArrowReaderBuilder`] for
116/// more options that can be configured.
117impl ParquetPushDecoderBuilder {
118 /// Create a new `ParquetDecoderBuilder` for configuring a Parquet decoder for the given file.
119 ///
120 /// See [`ParquetMetadataDecoder`] for a builder that can read the metadata from a Parquet file.
121 ///
122 /// [`ParquetMetadataDecoder`]: crate::file::metadata::ParquetMetaDataPushDecoder
123 ///
124 /// See example on [`ParquetPushDecoderBuilder`]
125 pub fn try_new_decoder(
126 file_len: u64,
127 parquet_metadata: Arc<ParquetMetaData>,
128 ) -> Result<Self, ParquetError> {
129 Self::try_new_decoder_with_options(
130 file_len,
131 parquet_metadata,
132 ArrowReaderOptions::default(),
133 )
134 }
135
136 /// Create a new `ParquetDecoderBuilder` for configuring a Parquet decoder for the given file
137 /// with the given reader options.
138 ///
139 /// This is similar to [`Self::try_new_decoder`] but allows configuring
140 /// options such as Arrow schema
141 pub fn try_new_decoder_with_options(
142 file_len: u64,
143 parquet_metadata: Arc<ParquetMetaData>,
144 arrow_reader_options: ArrowReaderOptions,
145 ) -> Result<Self, ParquetError> {
146 let arrow_reader_metadata =
147 ArrowReaderMetadata::try_new(parquet_metadata, arrow_reader_options)?;
148 Ok(Self::new_with_metadata(file_len, arrow_reader_metadata))
149 }
150
151 /// Create a new `ParquetDecoderBuilder` given [`ArrowReaderMetadata`].
152 ///
153 /// See [`ArrowReaderMetadata::try_new`] for how to create the metadata from
154 /// the Parquet metadata and reader options.
155 pub fn new_with_metadata(file_len: u64, arrow_reader_metadata: ArrowReaderMetadata) -> Self {
156 Self::new_builder(file_len, arrow_reader_metadata)
157 }
158
159 /// Create a [`ParquetPushDecoder`] with the configured options
160 pub fn build(self) -> Result<ParquetPushDecoder, ParquetError> {
161 let Self {
162 input: file_len,
163 metadata: parquet_metadata,
164 schema: _,
165 fields,
166 batch_size,
167 row_groups,
168 projection,
169 filter,
170 selection,
171 limit,
172 offset,
173 metrics,
174 max_predicate_cache_size,
175 } = self;
176
177 // If no row groups were specified, read all of them
178 let row_groups =
179 row_groups.unwrap_or_else(|| (0..parquet_metadata.num_row_groups()).collect());
180
181 // Prepare to build RowGroup readers
182 let buffers = PushBuffers::new(file_len);
183 let row_group_reader_builder = RowGroupReaderBuilder::new(
184 batch_size,
185 projection,
186 Arc::clone(&parquet_metadata),
187 fields,
188 filter,
189 limit,
190 offset,
191 metrics,
192 max_predicate_cache_size,
193 buffers,
194 );
195
196 // Initialize the decoder with the configured options
197 let remaining_row_groups = RemainingRowGroups::new(
198 parquet_metadata,
199 row_groups,
200 selection,
201 row_group_reader_builder,
202 );
203
204 Ok(ParquetPushDecoder {
205 state: ParquetDecoderState::ReadingRowGroup {
206 remaining_row_groups: Box::new(remaining_row_groups),
207 },
208 })
209 }
210}
211
212/// A push based Parquet Decoder
213///
214/// See [`ParquetPushDecoderBuilder`] for an example of how to build and use the decoder.
215///
216/// [`ParquetPushDecoder`] is a low level API for decoding Parquet data without an
217/// underlying reader for performing IO, and thus offers fine grained control
218/// over how data is fetched and decoded.
219///
220/// When more data is needed to make progress, instead of reading data directly
221/// from a reader, the decoder returns [`DecodeResult`] indicating what ranges
222/// are needed. Once the caller provides the requested ranges via
223/// [`Self::push_ranges`], they try to decode again by calling
224/// [`Self::try_decode`].
225///
226/// The decoder's internal state tracks what has been already decoded and what
227/// is needed next.
228#[derive(Debug)]
229pub struct ParquetPushDecoder {
230 /// The inner state.
231 ///
232 /// This state is consumed on every transition and a new state is produced
233 /// so the Rust compiler can ensure that the state is always valid and
234 /// transitions are not missed.
235 state: ParquetDecoderState,
236}
237
238impl ParquetPushDecoder {
239 /// Attempt to decode the next batch of data, or return what data is needed
240 ///
241 /// The the decoder communicates the next state with a [`DecodeResult`]
242 ///
243 /// See full example in [`ParquetPushDecoderBuilder`]
244 ///
245 /// ```no_run
246 /// # use parquet::arrow::push_decoder::ParquetPushDecoder;
247 /// use parquet::DecodeResult;
248 /// # fn get_decoder() -> ParquetPushDecoder { unimplemented!() }
249 /// # fn push_data(decoder: &mut ParquetPushDecoder, ranges: Vec<std::ops::Range<u64>>) { unimplemented!() }
250 /// let mut decoder = get_decoder();
251 /// loop {
252 /// match decoder.try_decode().unwrap() {
253 /// DecodeResult::NeedsData(ranges) => {
254 /// // The decoder needs more data. Fetch the data for the given ranges
255 /// // call decoder.push_ranges(ranges, data) and call again
256 /// push_data(&mut decoder, ranges);
257 /// }
258 /// DecodeResult::Data(batch) => {
259 /// // Successfully decoded the next batch of data
260 /// println!("Got batch with {} rows", batch.num_rows());
261 /// }
262 /// DecodeResult::Finished => {
263 /// // The decoder has finished decoding all data
264 /// break;
265 /// }
266 /// }
267 /// }
268 ///```
269 pub fn try_decode(&mut self) -> Result<DecodeResult<RecordBatch>, ParquetError> {
270 let current_state = std::mem::replace(&mut self.state, ParquetDecoderState::Finished);
271 let (new_state, decode_result) = current_state.try_transition()?;
272 self.state = new_state;
273 Ok(decode_result)
274 }
275
276 /// Push data into the decoder for processing
277 ///
278 /// This is a convenience wrapper around [`Self::push_ranges`] for pushing a
279 /// single range of data.
280 ///
281 /// Note this can be the entire file or just a part of it. If it is part of the file,
282 /// the ranges should correspond to the data ranges requested by the decoder.
283 ///
284 /// See example in [`ParquetPushDecoderBuilder`]
285 pub fn push_range(&mut self, range: Range<u64>, data: Bytes) -> Result<(), ParquetError> {
286 self.push_ranges(vec![range], vec![data])
287 }
288
289 /// Push data into the decoder for processing
290 ///
291 /// This should correspond to the data ranges requested by the decoder
292 pub fn push_ranges(
293 &mut self,
294 ranges: Vec<Range<u64>>,
295 data: Vec<Bytes>,
296 ) -> Result<(), ParquetError> {
297 let current_state = std::mem::replace(&mut self.state, ParquetDecoderState::Finished);
298 self.state = current_state.push_data(ranges, data)?;
299 Ok(())
300 }
301
302 /// Returns the total number of buffered bytes in the decoder
303 ///
304 /// This is the sum of the size of all [`Bytes`] that has been pushed to the
305 /// decoder but not yet consumed.
306 ///
307 /// Note that this does not include any overhead of the internal data
308 /// structures and that since [`Bytes`] are ref counted memory, this may not
309 /// reflect additional memory usage.
310 ///
311 /// This can be used to monitor memory usage of the decoder.
312 pub fn buffered_bytes(&self) -> u64 {
313 self.state.buffered_bytes()
314 }
315}
316
317/// Internal state machine for the [`ParquetPushDecoder`]
318#[derive(Debug)]
319enum ParquetDecoderState {
320 /// Waiting for data needed to decode the next RowGroup
321 ReadingRowGroup {
322 remaining_row_groups: Box<RemainingRowGroups>,
323 },
324 /// The decoder is actively decoding a RowGroup
325 DecodingRowGroup {
326 /// Current active reader
327 record_batch_reader: Box<ParquetRecordBatchReader>,
328 remaining_row_groups: Box<RemainingRowGroups>,
329 },
330 /// The decoder has finished processing all data
331 Finished,
332}
333
334impl ParquetDecoderState {
335 /// Current state --> next state + output
336 ///
337 /// This function is called to check if the decoder has any RecordBatches
338 /// and [`Self::push_data`] is called when new data is available.
339 ///
340 /// # Notes
341 ///
342 /// This structure is used to reduce the indentation level of the main loop
343 /// in try_build
344 fn try_transition(self) -> Result<(Self, DecodeResult<RecordBatch>), ParquetError> {
345 match self {
346 Self::ReadingRowGroup {
347 mut remaining_row_groups,
348 } => {
349 match remaining_row_groups.try_next_reader()? {
350 // If we have a next reader, we can transition to decoding it
351 DecodeResult::Data(record_batch_reader) => {
352 // Transition to decoding the row group
353 Self::DecodingRowGroup {
354 record_batch_reader: Box::new(record_batch_reader),
355 remaining_row_groups,
356 }
357 .try_transition()
358 }
359 // If there are no more readers, we are finished
360 DecodeResult::NeedsData(ranges) => {
361 // If we need more data, we return the ranges needed and stay in Reading
362 // RowGroup state
363 Ok((
364 Self::ReadingRowGroup {
365 remaining_row_groups,
366 },
367 DecodeResult::NeedsData(ranges),
368 ))
369 }
370 DecodeResult::Finished => {
371 // No more row groups to read, we are finished
372 Ok((Self::Finished, DecodeResult::Finished))
373 }
374 }
375 }
376 Self::DecodingRowGroup {
377 mut record_batch_reader,
378 remaining_row_groups,
379 } => {
380 // Decide the next record batch
381 match record_batch_reader.next() {
382 Some(Ok(batch)) => {
383 // Successfully decoded a batch, return it
384 Ok((
385 Self::DecodingRowGroup {
386 record_batch_reader,
387 remaining_row_groups,
388 },
389 DecodeResult::Data(batch),
390 ))
391 }
392 None => {
393 // No more batches in this row group, move to the next row group
394 // or finish if there are no more row groups
395 Self::ReadingRowGroup {
396 remaining_row_groups,
397 }
398 .try_transition()
399 }
400 Some(Err(e)) => Err(ParquetError::from(e)), // some error occurred while decoding
401 }
402 }
403 Self::Finished => Ok((Self::Finished, DecodeResult::Finished)),
404 }
405 }
406
407 /// Push data, and transition state if needed
408 ///
409 /// This should correspond to the data ranges requested by the decoder
410 pub fn push_data(
411 self,
412 ranges: Vec<Range<u64>>,
413 data: Vec<Bytes>,
414 ) -> Result<Self, ParquetError> {
415 match self {
416 ParquetDecoderState::ReadingRowGroup {
417 mut remaining_row_groups,
418 } => {
419 // Push data to the RowGroupReaderBuilder
420 remaining_row_groups.push_data(ranges, data);
421 Ok(ParquetDecoderState::ReadingRowGroup {
422 remaining_row_groups,
423 })
424 }
425 // it is ok to get data before we asked for it
426 ParquetDecoderState::DecodingRowGroup {
427 record_batch_reader,
428 mut remaining_row_groups,
429 } => {
430 remaining_row_groups.push_data(ranges, data);
431 Ok(ParquetDecoderState::DecodingRowGroup {
432 record_batch_reader,
433 remaining_row_groups,
434 })
435 }
436 ParquetDecoderState::Finished => Err(ParquetError::General(
437 "Cannot push data to a finished decoder".to_string(),
438 )),
439 }
440 }
441
442 /// How many bytes are currently buffered in the decoder?
443 fn buffered_bytes(&self) -> u64 {
444 match self {
445 ParquetDecoderState::ReadingRowGroup {
446 remaining_row_groups,
447 } => remaining_row_groups.buffered_bytes(),
448 ParquetDecoderState::DecodingRowGroup {
449 record_batch_reader: _,
450 remaining_row_groups,
451 } => remaining_row_groups.buffered_bytes(),
452 ParquetDecoderState::Finished => 0,
453 }
454 }
455}
456
457#[cfg(test)]
458mod test {
459 use super::*;
460 use crate::DecodeResult;
461 use crate::arrow::arrow_reader::{ArrowPredicateFn, RowFilter, RowSelection, RowSelector};
462 use crate::arrow::push_decoder::{ParquetPushDecoder, ParquetPushDecoderBuilder};
463 use crate::arrow::{ArrowWriter, ProjectionMask};
464 use crate::errors::ParquetError;
465 use crate::file::metadata::ParquetMetaDataPushDecoder;
466 use crate::file::properties::WriterProperties;
467 use arrow::compute::kernels::cmp::{gt, lt};
468 use arrow_array::cast::AsArray;
469 use arrow_array::types::Int64Type;
470 use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringViewArray};
471 use arrow_select::concat::concat_batches;
472 use bytes::Bytes;
473 use std::fmt::Debug;
474 use std::ops::Range;
475 use std::sync::{Arc, LazyLock};
476
477 /// Test decoder struct size (as they are copied around on each transition, they
478 /// should not grow too large)
479 #[test]
480 fn test_decoder_size() {
481 assert_eq!(std::mem::size_of::<ParquetDecoderState>(), 24);
482 }
483
484 /// Decode the entire file at once, simulating a scenario where all data is
485 /// available in memory
486 #[test]
487 fn test_decoder_all_data() {
488 let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(
489 test_file_len(),
490 test_file_parquet_metadata(),
491 )
492 .unwrap()
493 .build()
494 .unwrap();
495
496 decoder
497 .push_range(test_file_range(), TEST_FILE_DATA.clone())
498 .unwrap();
499
500 let results = vec![
501 // first row group should be decoded without needing more data
502 expect_data(decoder.try_decode()),
503 // second row group should be decoded without needing more data
504 expect_data(decoder.try_decode()),
505 ];
506 expect_finished(decoder.try_decode());
507
508 let all_output = concat_batches(&TEST_BATCH.schema(), &results).unwrap();
509 // Check that the output matches the input batch
510 assert_eq!(all_output, *TEST_BATCH);
511 }
512
513 /// Decode the entire file incrementally, simulating a scenario where data is
514 /// fetched as needed
515 #[test]
516 fn test_decoder_incremental() {
517 let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(
518 test_file_len(),
519 test_file_parquet_metadata(),
520 )
521 .unwrap()
522 .build()
523 .unwrap();
524
525 let mut results = vec![];
526
527 // First row group, expect a single request
528 let ranges = expect_needs_data(decoder.try_decode());
529 let num_bytes_requested: u64 = ranges.iter().map(|r| r.end - r.start).sum();
530 push_ranges_to_decoder(&mut decoder, ranges);
531 // The decoder should currently only store the data it needs to decode the first row group
532 assert_eq!(decoder.buffered_bytes(), num_bytes_requested);
533 results.push(expect_data(decoder.try_decode()));
534 // the decoder should have consumed the data for the first row group and freed it
535 assert_eq!(decoder.buffered_bytes(), 0);
536
537 // Second row group,
538 let ranges = expect_needs_data(decoder.try_decode());
539 let num_bytes_requested: u64 = ranges.iter().map(|r| r.end - r.start).sum();
540 push_ranges_to_decoder(&mut decoder, ranges);
541 // The decoder should currently only store the data it needs to decode the second row group
542 assert_eq!(decoder.buffered_bytes(), num_bytes_requested);
543 results.push(expect_data(decoder.try_decode()));
544 // the decoder should have consumed the data for the second row group and freed it
545 assert_eq!(decoder.buffered_bytes(), 0);
546 expect_finished(decoder.try_decode());
547
548 // Check that the output matches the input batch
549 let all_output = concat_batches(&TEST_BATCH.schema(), &results).unwrap();
550 assert_eq!(all_output, *TEST_BATCH);
551 }
552
553 /// Decode the entire file incrementally, simulating partial reads
554 #[test]
555 fn test_decoder_partial() {
556 let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(
557 test_file_len(),
558 test_file_parquet_metadata(),
559 )
560 .unwrap()
561 .build()
562 .unwrap();
563
564 // First row group, expect a single request for all data needed to read "a" and "b"
565 let ranges = expect_needs_data(decoder.try_decode());
566 push_ranges_to_decoder(&mut decoder, ranges);
567
568 let batch1 = expect_data(decoder.try_decode());
569 let expected1 = TEST_BATCH.slice(0, 200);
570 assert_eq!(batch1, expected1);
571
572 // Second row group, this time provide the data in two steps
573 let ranges = expect_needs_data(decoder.try_decode());
574 let (ranges1, ranges2) = ranges.split_at(ranges.len() / 2);
575 assert!(!ranges1.is_empty());
576 assert!(!ranges2.is_empty());
577 // push first half to simulate partial read
578 push_ranges_to_decoder(&mut decoder, ranges1.to_vec());
579
580 // still expect more data
581 let ranges = expect_needs_data(decoder.try_decode());
582 assert_eq!(ranges, ranges2); // should be the remaining ranges
583 // push empty ranges should be a no-op
584 push_ranges_to_decoder(&mut decoder, vec![]);
585 let ranges = expect_needs_data(decoder.try_decode());
586 assert_eq!(ranges, ranges2); // should be the remaining ranges
587 push_ranges_to_decoder(&mut decoder, ranges);
588
589 let batch2 = expect_data(decoder.try_decode());
590 let expected2 = TEST_BATCH.slice(200, 200);
591 assert_eq!(batch2, expected2);
592
593 expect_finished(decoder.try_decode());
594 }
595
596 /// Decode multiple columns "a" and "b", expect that the decoder requests
597 /// only a single request per row group
598 #[test]
599 fn test_decoder_selection_does_one_request() {
600 let builder = ParquetPushDecoderBuilder::try_new_decoder(
601 test_file_len(),
602 test_file_parquet_metadata(),
603 )
604 .unwrap();
605
606 let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
607
608 let mut decoder = builder
609 .with_projection(
610 ProjectionMask::columns(&schema_descr, ["a", "b"]), // read "a", "b"
611 )
612 .build()
613 .unwrap();
614
615 // First row group, expect a single request for all data needed to read "a" and "b"
616 let ranges = expect_needs_data(decoder.try_decode());
617 push_ranges_to_decoder(&mut decoder, ranges);
618
619 let batch1 = expect_data(decoder.try_decode());
620 let expected1 = TEST_BATCH.slice(0, 200).project(&[0, 1]).unwrap();
621 assert_eq!(batch1, expected1);
622
623 // Second row group, similarly expect a single request for all data needed to read "a" and "b"
624 let ranges = expect_needs_data(decoder.try_decode());
625 push_ranges_to_decoder(&mut decoder, ranges);
626
627 let batch2 = expect_data(decoder.try_decode());
628 let expected2 = TEST_BATCH.slice(200, 200).project(&[0, 1]).unwrap();
629 assert_eq!(batch2, expected2);
630
631 expect_finished(decoder.try_decode());
632 }
633
634 /// Decode with a filter that requires multiple requests, but only provide part
635 /// of the data needed for the filter at a time simulating partial reads.
636 #[test]
637 fn test_decoder_single_filter_partial() {
638 let builder = ParquetPushDecoderBuilder::try_new_decoder(
639 test_file_len(),
640 test_file_parquet_metadata(),
641 )
642 .unwrap();
643
644 // Values in column "a" range 0..399
645 // First filter: "a" > 250 (nothing in Row Group 0, both data pages in Row Group 1)
646 let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
647
648 // a > 250
649 let row_filter_a = ArrowPredicateFn::new(
650 // claim to use both a and b so we get two ranges requests for the filter pages
651 ProjectionMask::columns(&schema_descr, ["a", "b"]),
652 |batch: RecordBatch| {
653 let scalar_250 = Int64Array::new_scalar(250);
654 let column = batch.column(0).as_primitive::<Int64Type>();
655 gt(column, &scalar_250)
656 },
657 );
658
659 let mut decoder = builder
660 .with_projection(
661 // read only column "a" to test that filter pages are reused
662 ProjectionMask::columns(&schema_descr, ["a"]), // read "a"
663 )
664 .with_row_filter(RowFilter::new(vec![Box::new(row_filter_a)]))
665 .build()
666 .unwrap();
667
668 // First row group, evaluating filters
669 let ranges = expect_needs_data(decoder.try_decode());
670 // only provide half the ranges
671 let (ranges1, ranges2) = ranges.split_at(ranges.len() / 2);
672 assert!(!ranges1.is_empty());
673 assert!(!ranges2.is_empty());
674 push_ranges_to_decoder(&mut decoder, ranges1.to_vec());
675 // still expect more data
676 let ranges = expect_needs_data(decoder.try_decode());
677 assert_eq!(ranges, ranges2); // should be the remaining ranges
678 let ranges = expect_needs_data(decoder.try_decode());
679 assert_eq!(ranges, ranges2); // should be the remaining ranges
680 push_ranges_to_decoder(&mut decoder, ranges2.to_vec());
681
682 // Since no rows in the first row group pass the filters, there is no
683 // additional requests to read data pages for "b" here
684
685 // Second row group
686 let ranges = expect_needs_data(decoder.try_decode());
687 push_ranges_to_decoder(&mut decoder, ranges);
688
689 let batch = expect_data(decoder.try_decode());
690 let expected = TEST_BATCH.slice(251, 149).project(&[0]).unwrap();
691 assert_eq!(batch, expected);
692
693 expect_finished(decoder.try_decode());
694 }
695
696 /// Decode with a filter where we also skip one of the RowGroups via a RowSelection
697 #[test]
698 fn test_decoder_single_filter_and_row_selection() {
699 let builder = ParquetPushDecoderBuilder::try_new_decoder(
700 test_file_len(),
701 test_file_parquet_metadata(),
702 )
703 .unwrap();
704
705 // Values in column "a" range 0..399
706 // First filter: "a" > 250 (nothing in Row Group 0, last data page in Row Group 1)
707 let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
708
709 // a > 250
710 let row_filter_a = ArrowPredicateFn::new(
711 ProjectionMask::columns(&schema_descr, ["a"]),
712 |batch: RecordBatch| {
713 let scalar_250 = Int64Array::new_scalar(250);
714 let column = batch.column(0).as_primitive::<Int64Type>();
715 gt(column, &scalar_250)
716 },
717 );
718
719 let mut decoder = builder
720 .with_projection(
721 // read only column "a" to test that filter pages are reused
722 ProjectionMask::columns(&schema_descr, ["b"]), // read "b"
723 )
724 .with_row_filter(RowFilter::new(vec![Box::new(row_filter_a)]))
725 .with_row_selection(RowSelection::from(vec![
726 RowSelector::skip(200), // skip first row group
727 RowSelector::select(100), // first 100 rows of second row group
728 RowSelector::skip(100),
729 ]))
730 .build()
731 .unwrap();
732
733 // expect the first row group to be filtered out (no filter is evaluated due to row selection)
734
735 // First row group, first filter (a > 250)
736 let ranges = expect_needs_data(decoder.try_decode());
737 push_ranges_to_decoder(&mut decoder, ranges);
738
739 // Second row group
740 let ranges = expect_needs_data(decoder.try_decode());
741 push_ranges_to_decoder(&mut decoder, ranges);
742
743 let batch = expect_data(decoder.try_decode());
744 let expected = TEST_BATCH.slice(251, 49).project(&[1]).unwrap();
745 assert_eq!(batch, expected);
746
747 expect_finished(decoder.try_decode());
748 }
749
750 /// Decode with multiple filters that require multiple requests
751 #[test]
752 fn test_decoder_multi_filters() {
753 // Create a decoder for decoding parquet data (note it does not have any IO / readers)
754 let builder = ParquetPushDecoderBuilder::try_new_decoder(
755 test_file_len(),
756 test_file_parquet_metadata(),
757 )
758 .unwrap();
759
760 // Values in column "a" range 0..399
761 // Values in column "b" range 400..799
762 // First filter: "a" > 175 (last data page in Row Group 0)
763 // Second filter: "b" < 625 (last data page in Row Group 0 and first DataPage in RowGroup 1)
764 let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
765
766 // a > 175
767 let row_filter_a = ArrowPredicateFn::new(
768 ProjectionMask::columns(&schema_descr, ["a"]),
769 |batch: RecordBatch| {
770 let scalar_175 = Int64Array::new_scalar(175);
771 let column = batch.column(0).as_primitive::<Int64Type>();
772 gt(column, &scalar_175)
773 },
774 );
775
776 // b < 625
777 let row_filter_b = ArrowPredicateFn::new(
778 ProjectionMask::columns(&schema_descr, ["b"]),
779 |batch: RecordBatch| {
780 let scalar_625 = Int64Array::new_scalar(625);
781 let column = batch.column(0).as_primitive::<Int64Type>();
782 lt(column, &scalar_625)
783 },
784 );
785
786 let mut decoder = builder
787 .with_projection(
788 ProjectionMask::columns(&schema_descr, ["c"]), // read "c"
789 )
790 .with_row_filter(RowFilter::new(vec![
791 Box::new(row_filter_a),
792 Box::new(row_filter_b),
793 ]))
794 .build()
795 .unwrap();
796
797 // First row group, first filter (a > 175)
798 let ranges = expect_needs_data(decoder.try_decode());
799 push_ranges_to_decoder(&mut decoder, ranges);
800
801 // first row group, second filter (b < 625)
802 let ranges = expect_needs_data(decoder.try_decode());
803 push_ranges_to_decoder(&mut decoder, ranges);
804
805 // first row group, data pages for "c"
806 let ranges = expect_needs_data(decoder.try_decode());
807 push_ranges_to_decoder(&mut decoder, ranges);
808
809 // expect the first batch to be decoded: rows 176..199, column "c"
810 let batch1 = expect_data(decoder.try_decode());
811 let expected1 = TEST_BATCH.slice(176, 24).project(&[2]).unwrap();
812 assert_eq!(batch1, expected1);
813
814 // Second row group, first filter (a > 175)
815 let ranges = expect_needs_data(decoder.try_decode());
816 push_ranges_to_decoder(&mut decoder, ranges);
817
818 // Second row group, second filter (b < 625)
819 let ranges = expect_needs_data(decoder.try_decode());
820 push_ranges_to_decoder(&mut decoder, ranges);
821
822 // Second row group, data pages for "c"
823 let ranges = expect_needs_data(decoder.try_decode());
824 push_ranges_to_decoder(&mut decoder, ranges);
825
826 // expect the second batch to be decoded: rows 200..224, column "c"
827 let batch2 = expect_data(decoder.try_decode());
828 let expected2 = TEST_BATCH.slice(200, 25).project(&[2]).unwrap();
829 assert_eq!(batch2, expected2);
830
831 expect_finished(decoder.try_decode());
832 }
833
834 /// Decode with a filter that uses a column that is also projected, and expect
835 /// that the filter pages are reused (don't refetch them)
836 #[test]
837 fn test_decoder_reuses_filter_pages() {
838 // Create a decoder for decoding parquet data (note it does not have any IO / readers)
839 let builder = ParquetPushDecoderBuilder::try_new_decoder(
840 test_file_len(),
841 test_file_parquet_metadata(),
842 )
843 .unwrap();
844
845 // Values in column "a" range 0..399
846 // First filter: "a" > 250 (nothing in Row Group 0, last data page in Row Group 1)
847 let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
848
849 // a > 250
850 let row_filter_a = ArrowPredicateFn::new(
851 ProjectionMask::columns(&schema_descr, ["a"]),
852 |batch: RecordBatch| {
853 let scalar_250 = Int64Array::new_scalar(250);
854 let column = batch.column(0).as_primitive::<Int64Type>();
855 gt(column, &scalar_250)
856 },
857 );
858
859 let mut decoder = builder
860 .with_projection(
861 // read only column "a" to test that filter pages are reused
862 ProjectionMask::columns(&schema_descr, ["a"]), // read "a"
863 )
864 .with_row_filter(RowFilter::new(vec![Box::new(row_filter_a)]))
865 .build()
866 .unwrap();
867
868 // First row group, first filter (a > 175)
869 let ranges = expect_needs_data(decoder.try_decode());
870 push_ranges_to_decoder(&mut decoder, ranges);
871
872 // expect the first row group to be filtered out (no rows match)
873
874 // Second row group, first filter (a > 250)
875 let ranges = expect_needs_data(decoder.try_decode());
876 push_ranges_to_decoder(&mut decoder, ranges);
877
878 // expect that the second row group is decoded: rows 251..399, column "a"
879 // Note that the filter pages for "a" should be reused and no additional data
880 // should be requested
881 let batch = expect_data(decoder.try_decode());
882 let expected = TEST_BATCH.slice(251, 149).project(&[0]).unwrap();
883 assert_eq!(batch, expected);
884
885 expect_finished(decoder.try_decode());
886 }
887
888 #[test]
889 fn test_decoder_empty_filters() {
890 let builder = ParquetPushDecoderBuilder::try_new_decoder(
891 test_file_len(),
892 test_file_parquet_metadata(),
893 )
894 .unwrap();
895 let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
896
897 // only read column "c", but with empty filters
898 let mut decoder = builder
899 .with_projection(
900 ProjectionMask::columns(&schema_descr, ["c"]), // read "c"
901 )
902 .with_row_filter(RowFilter::new(vec![
903 // empty filters should be ignored
904 ]))
905 .build()
906 .unwrap();
907
908 // First row group
909 let ranges = expect_needs_data(decoder.try_decode());
910 push_ranges_to_decoder(&mut decoder, ranges);
911
912 // expect the first batch to be decoded: rows 0..199, column "c"
913 let batch1 = expect_data(decoder.try_decode());
914 let expected1 = TEST_BATCH.slice(0, 200).project(&[2]).unwrap();
915 assert_eq!(batch1, expected1);
916
917 // Second row group,
918 let ranges = expect_needs_data(decoder.try_decode());
919 push_ranges_to_decoder(&mut decoder, ranges);
920
921 // expect the second batch to be decoded: rows 200..399, column "c"
922 let batch2 = expect_data(decoder.try_decode());
923 let expected2 = TEST_BATCH.slice(200, 200).project(&[2]).unwrap();
924
925 assert_eq!(batch2, expected2);
926
927 expect_finished(decoder.try_decode());
928 }
929
930 #[test]
931 fn test_decoder_offset_limit() {
932 let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(
933 test_file_len(),
934 test_file_parquet_metadata(),
935 )
936 .unwrap()
937 // skip entire first row group (200 rows) and first 25 rows of second row group
938 .with_offset(225)
939 // and limit to 20 rows
940 .with_limit(20)
941 .build()
942 .unwrap();
943
944 // First row group should be skipped,
945
946 // Second row group
947 let ranges = expect_needs_data(decoder.try_decode());
948 push_ranges_to_decoder(&mut decoder, ranges);
949
950 // expect the first and only batch to be decoded
951 let batch1 = expect_data(decoder.try_decode());
952 let expected1 = TEST_BATCH.slice(225, 20);
953 assert_eq!(batch1, expected1);
954
955 expect_finished(decoder.try_decode());
956 }
957
958 #[test]
959 fn test_decoder_row_group_selection() {
960 // take only the second row group
961 let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(
962 test_file_len(),
963 test_file_parquet_metadata(),
964 )
965 .unwrap()
966 .with_row_groups(vec![1])
967 .build()
968 .unwrap();
969
970 // First row group should be skipped,
971
972 // Second row group
973 let ranges = expect_needs_data(decoder.try_decode());
974 push_ranges_to_decoder(&mut decoder, ranges);
975
976 // expect the first and only batch to be decoded
977 let batch1 = expect_data(decoder.try_decode());
978 let expected1 = TEST_BATCH.slice(200, 200);
979 assert_eq!(batch1, expected1);
980
981 expect_finished(decoder.try_decode());
982 }
983
984 #[test]
985 fn test_decoder_row_selection() {
986 // take only the second row group
987 let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(
988 test_file_len(),
989 test_file_parquet_metadata(),
990 )
991 .unwrap()
992 .with_row_selection(RowSelection::from(vec![
993 RowSelector::skip(225), // skip first row group and 25 rows of second])
994 RowSelector::select(20), // take 20 rows
995 ]))
996 .build()
997 .unwrap();
998
999 // First row group should be skipped,
1000
1001 // Second row group
1002 let ranges = expect_needs_data(decoder.try_decode());
1003 push_ranges_to_decoder(&mut decoder, ranges);
1004
1005 // expect the first ane only batch to be decoded
1006 let batch1 = expect_data(decoder.try_decode());
1007 let expected1 = TEST_BATCH.slice(225, 20);
1008 assert_eq!(batch1, expected1);
1009
1010 expect_finished(decoder.try_decode());
1011 }
1012
1013 /// Returns a batch with 400 rows, with 3 columns: "a", "b", "c"
1014 ///
1015 /// Note c is a different types (so the data page sizes will be different)
1016 static TEST_BATCH: LazyLock<RecordBatch> = LazyLock::new(|| {
1017 let a: ArrayRef = Arc::new(Int64Array::from_iter_values(0..400));
1018 let b: ArrayRef = Arc::new(Int64Array::from_iter_values(400..800));
1019 let c: ArrayRef = Arc::new(StringViewArray::from_iter_values((0..400).map(|i| {
1020 if i % 2 == 0 {
1021 format!("string_{i}")
1022 } else {
1023 format!("A string larger than 12 bytes and thus not inlined {i}")
1024 }
1025 })));
1026
1027 RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap()
1028 });
1029
1030 /// Create a parquet file in memory for testing.
1031 ///
1032 /// See [`TEST_BATCH`] for the data in the file.
1033 ///
1034 /// Each column is written in 4 data pages, each with 100 rows, across 2
1035 /// row groups. Each column in each row group has two data pages.
1036 ///
1037 /// The data is split across row groups like this
1038 ///
1039 /// Column | Values | Data Page | Row Group
1040 /// -------|------------------------|-----------|-----------
1041 /// a | 0..99 | 1 | 0
1042 /// a | 100..199 | 2 | 0
1043 /// a | 200..299 | 1 | 1
1044 /// a | 300..399 | 2 | 1
1045 ///
1046 /// b | 400..499 | 1 | 0
1047 /// b | 500..599 | 2 | 0
1048 /// b | 600..699 | 1 | 1
1049 /// b | 700..799 | 2 | 1
1050 ///
1051 /// c | "string_0".."string_99" | 1 | 0
1052 /// c | "string_100".."string_199" | 2 | 0
1053 /// c | "string_200".."string_299" | 1 | 1
1054 /// c | "string_300".."string_399" | 2 | 1
1055 static TEST_FILE_DATA: LazyLock<Bytes> = LazyLock::new(|| {
1056 let input_batch = &TEST_BATCH;
1057 let mut output = Vec::new();
1058
1059 let writer_options = WriterProperties::builder()
1060 .set_max_row_group_size(200)
1061 .set_data_page_row_count_limit(100)
1062 .build();
1063 let mut writer =
1064 ArrowWriter::try_new(&mut output, input_batch.schema(), Some(writer_options)).unwrap();
1065
1066 // since the limits are only enforced on batch boundaries, write the input
1067 // batch in chunks of 50
1068 let mut row_remain = input_batch.num_rows();
1069 while row_remain > 0 {
1070 let chunk_size = row_remain.min(50);
1071 let chunk = input_batch.slice(input_batch.num_rows() - row_remain, chunk_size);
1072 writer.write(&chunk).unwrap();
1073 row_remain -= chunk_size;
1074 }
1075 writer.close().unwrap();
1076 Bytes::from(output)
1077 });
1078
1079 /// Return the length of [`TEST_FILE_DATA`], in bytes
1080 fn test_file_len() -> u64 {
1081 TEST_FILE_DATA.len() as u64
1082 }
1083
1084 /// Return a range that covers the entire [`TEST_FILE_DATA`]
1085 fn test_file_range() -> Range<u64> {
1086 0..test_file_len()
1087 }
1088
1089 /// Return a slice of the test file data from the given range
1090 pub fn test_file_slice(range: Range<u64>) -> Bytes {
1091 let start: usize = range.start.try_into().unwrap();
1092 let end: usize = range.end.try_into().unwrap();
1093 TEST_FILE_DATA.slice(start..end)
1094 }
1095
1096 /// return the metadata for the test file
1097 pub fn test_file_parquet_metadata() -> Arc<crate::file::metadata::ParquetMetaData> {
1098 let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(test_file_len()).unwrap();
1099 push_ranges_to_metadata_decoder(&mut metadata_decoder, vec![test_file_range()]);
1100 let metadata = metadata_decoder.try_decode().unwrap();
1101 let DecodeResult::Data(metadata) = metadata else {
1102 panic!("Expected metadata to be decoded successfully");
1103 };
1104 Arc::new(metadata)
1105 }
1106
1107 /// Push the given ranges to the metadata decoder, simulating reading from a file
1108 fn push_ranges_to_metadata_decoder(
1109 metadata_decoder: &mut ParquetMetaDataPushDecoder,
1110 ranges: Vec<Range<u64>>,
1111 ) {
1112 let data = ranges
1113 .iter()
1114 .map(|range| test_file_slice(range.clone()))
1115 .collect::<Vec<_>>();
1116 metadata_decoder.push_ranges(ranges, data).unwrap();
1117 }
1118
1119 fn push_ranges_to_decoder(decoder: &mut ParquetPushDecoder, ranges: Vec<Range<u64>>) {
1120 let data = ranges
1121 .iter()
1122 .map(|range| test_file_slice(range.clone()))
1123 .collect::<Vec<_>>();
1124 decoder.push_ranges(ranges, data).unwrap();
1125 }
1126
1127 /// Expect that the [`DecodeResult`] is a [`DecodeResult::Data`] and return the corresponding element
1128 fn expect_data<T: Debug>(result: Result<DecodeResult<T>, ParquetError>) -> T {
1129 match result.expect("Expected Ok(DecodeResult::Data(T))") {
1130 DecodeResult::Data(data) => data,
1131 result => panic!("Expected DecodeResult::Data, got {result:?}"),
1132 }
1133 }
1134
1135 /// Expect that the [`DecodeResult`] is a [`DecodeResult::NeedsData`] and return the corresponding ranges
1136 fn expect_needs_data<T: Debug>(
1137 result: Result<DecodeResult<T>, ParquetError>,
1138 ) -> Vec<Range<u64>> {
1139 match result.expect("Expected Ok(DecodeResult::NeedsData{ranges})") {
1140 DecodeResult::NeedsData(ranges) => ranges,
1141 result => panic!("Expected DecodeResult::NeedsData, got {result:?}"),
1142 }
1143 }
1144
1145 fn expect_finished<T: Debug>(result: Result<DecodeResult<T>, ParquetError>) {
1146 match result.expect("Expected Ok(DecodeResult::Finished)") {
1147 DecodeResult::Finished => {}
1148 result => panic!("Expected DecodeResult::Finished, got {result:?}"),
1149 }
1150 }
1151}