parquet/arrow/push_decoder/mod.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ParquetPushDecoder`]: decodes Parquet data with data provided by the
19//! caller (rather than from an underlying reader).
20
21mod reader_builder;
22mod remaining;
23
24use crate::DecodeResult;
25use crate::arrow::arrow_reader::{
26 ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader,
27};
28use crate::errors::ParquetError;
29use crate::file::metadata::ParquetMetaData;
30use crate::util::push_buffers::PushBuffers;
31use arrow_array::RecordBatch;
32use bytes::Bytes;
33use reader_builder::{RowBudget, RowGroupReaderBuilder};
34use remaining::RemainingRowGroups;
35use std::ops::Range;
36use std::sync::Arc;
37
38/// A builder for [`ParquetPushDecoder`].
39///
40/// To create a new decoder, use [`ParquetPushDecoderBuilder::try_new_decoder`].
41///
42/// You can decode the metadata from a Parquet file using either
43/// [`ParquetMetadataReader`] or [`ParquetMetaDataPushDecoder`].
44///
45/// [`ParquetMetadataReader`]: crate::file::metadata::ParquetMetaDataReader
46/// [`ParquetMetaDataPushDecoder`]: crate::file::metadata::ParquetMetaDataPushDecoder
47///
48/// Note the "input" type is `u64` which represents the length of the Parquet file
49/// being decoded. This is needed to initialize the internal buffers that track
50/// what data has been provided to the decoder.
51///
52/// # Example
53/// ```
54/// # use std::ops::Range;
55/// # use std::sync::Arc;
56/// # use bytes::Bytes;
57/// # use arrow_array::record_batch;
58/// # use parquet::DecodeResult;
59/// # use parquet::arrow::push_decoder::ParquetPushDecoderBuilder;
60/// # use parquet::arrow::ArrowWriter;
61/// # use parquet::file::metadata::ParquetMetaDataPushDecoder;
62/// # let file_bytes = {
63/// # let mut buffer = vec![];
64/// # let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
65/// # let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
66/// # writer.write(&batch).unwrap();
67/// # writer.close().unwrap();
68/// # Bytes::from(buffer)
69/// # };
70/// # // mimic IO by returning a function that returns the bytes for a given range
71/// # let get_range = |range: &Range<u64>| -> Bytes {
72/// # let start = range.start as usize;
73/// # let end = range.end as usize;
74/// # file_bytes.slice(start..end)
75/// # };
76/// # let file_length = file_bytes.len() as u64;
77/// # let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_length).unwrap();
78/// # metadata_decoder.push_ranges(vec![0..file_length], vec![file_bytes.clone()]).unwrap();
79/// # let DecodeResult::Data(parquet_metadata) = metadata_decoder.try_decode().unwrap() else { panic!("failed to decode metadata") };
80/// # let parquet_metadata = Arc::new(parquet_metadata);
81/// // The file length and metadata are required to create the decoder
82/// let mut decoder =
83/// ParquetPushDecoderBuilder::try_new_decoder(parquet_metadata)
84/// .unwrap()
85/// // Optionally configure the decoder, e.g. batch size
86/// .with_batch_size(1024)
87/// // Build the decoder
88/// .build()
89/// .unwrap();
90///
91/// // In a loop, ask the decoder what it needs next, and provide it with the required data
92/// loop {
93/// match decoder.try_decode().unwrap() {
94/// DecodeResult::NeedsData(ranges) => {
95/// // The decoder needs more data. Fetch the data for the given ranges
96/// let data = ranges.iter().map(|r| get_range(r)).collect::<Vec<_>>();
97/// // Push the data to the decoder
98/// decoder.push_ranges(ranges, data).unwrap();
99/// // After pushing the data, we can try to decode again on the next iteration
100/// }
101/// DecodeResult::Data(batch) => {
102/// // Successfully decoded a batch of data
103/// assert!(batch.num_rows() > 0);
104/// }
105/// DecodeResult::Finished => {
106/// // The decoder has finished decoding exit the loop
107/// break;
108/// }
109/// }
110/// }
111/// ```
112pub type ParquetPushDecoderBuilder = ArrowReaderBuilder<NoInput>;
113
114/// Type that represents "No input" for the [`ParquetPushDecoderBuilder`]
115///
116/// There is no "input" for the push decoder by design (the idea is that
117/// the caller pushes data to the decoder as needed)..
118///
119/// However, [`ArrowReaderBuilder`] is shared with the sync and async readers,
120/// which DO have an `input`. To support reusing the same builder code for
121/// all three types of decoders, we define this `NoInput` for the push decoder to
122/// denote in the type system there is no type.
123#[derive(Debug, Clone, Copy)]
124pub struct NoInput;
125
126/// Methods for building a ParquetDecoder. See the base [`ArrowReaderBuilder`] for
127/// more options that can be configured.
128impl ParquetPushDecoderBuilder {
129 /// Create a new `ParquetDecoderBuilder` for configuring a Parquet decoder for the given file.
130 ///
131 /// See [`ParquetMetadataDecoder`] for a builder that can read the metadata from a Parquet file.
132 ///
133 /// [`ParquetMetadataDecoder`]: crate::file::metadata::ParquetMetaDataPushDecoder
134 ///
135 /// See example on [`ParquetPushDecoderBuilder`]
136 pub fn try_new_decoder(parquet_metadata: Arc<ParquetMetaData>) -> Result<Self, ParquetError> {
137 Self::try_new_decoder_with_options(parquet_metadata, ArrowReaderOptions::default())
138 }
139
140 /// Create a new `ParquetDecoderBuilder` for configuring a Parquet decoder for the given file
141 /// with the given reader options.
142 ///
143 /// This is similar to [`Self::try_new_decoder`] but allows configuring
144 /// options such as Arrow schema
145 pub fn try_new_decoder_with_options(
146 parquet_metadata: Arc<ParquetMetaData>,
147 arrow_reader_options: ArrowReaderOptions,
148 ) -> Result<Self, ParquetError> {
149 let arrow_reader_metadata =
150 ArrowReaderMetadata::try_new(parquet_metadata, arrow_reader_options)?;
151 Ok(Self::new_with_metadata(arrow_reader_metadata))
152 }
153
154 /// Create a new `ParquetDecoderBuilder` given [`ArrowReaderMetadata`].
155 ///
156 /// See [`ArrowReaderMetadata::try_new`] for how to create the metadata from
157 /// the Parquet metadata and reader options.
158 pub fn new_with_metadata(arrow_reader_metadata: ArrowReaderMetadata) -> Self {
159 Self::new_builder(NoInput, arrow_reader_metadata)
160 }
161
162 /// Create a [`ParquetPushDecoder`] with the configured options
163 pub fn build(self) -> Result<ParquetPushDecoder, ParquetError> {
164 let Self {
165 input: NoInput,
166 metadata: parquet_metadata,
167 schema: _,
168 fields,
169 batch_size,
170 row_groups,
171 projection,
172 filter,
173 selection,
174 limit,
175 offset,
176 metrics,
177 row_selection_policy,
178 max_predicate_cache_size,
179 } = self;
180
181 // If no row groups were specified, read all of them
182 let row_groups =
183 row_groups.unwrap_or_else(|| (0..parquet_metadata.num_row_groups()).collect());
184 let has_predicates = filter
185 .as_ref()
186 .is_some_and(|filter| !filter.predicates.is_empty());
187
188 // Prepare to build RowGroup readers
189 let file_len = 0; // not used in push decoder
190 let buffers = PushBuffers::new(file_len);
191 let row_group_reader_builder = RowGroupReaderBuilder::new(
192 batch_size,
193 projection,
194 Arc::clone(&parquet_metadata),
195 fields,
196 filter,
197 metrics,
198 max_predicate_cache_size,
199 buffers,
200 row_selection_policy,
201 );
202
203 // Initialize the decoder with the configured options
204 let remaining_row_groups = RemainingRowGroups::new(
205 parquet_metadata,
206 row_groups,
207 selection,
208 RowBudget::new(offset, limit),
209 has_predicates,
210 row_group_reader_builder,
211 );
212
213 Ok(ParquetPushDecoder {
214 state: ParquetDecoderState::ReadingRowGroup {
215 remaining_row_groups: Box::new(remaining_row_groups),
216 },
217 })
218 }
219}
220
221/// A push based Parquet Decoder
222///
223/// See [`ParquetPushDecoderBuilder`] for an example of how to build and use the decoder.
224///
225/// [`ParquetPushDecoder`] is a low level API for decoding Parquet data without an
226/// underlying reader for performing IO, and thus offers fine grained control
227/// over how data is fetched and decoded.
228///
229/// When more data is needed to make progress, instead of reading data directly
230/// from a reader, the decoder returns [`DecodeResult`] indicating what ranges
231/// are needed. Once the caller provides the requested ranges via
232/// [`Self::push_ranges`], they try to decode again by calling
233/// [`Self::try_decode`].
234///
235/// The decoder's internal state tracks what has been already decoded and what
236/// is needed next.
237#[derive(Debug)]
238pub struct ParquetPushDecoder {
239 /// The inner state.
240 ///
241 /// This state is consumed on every transition and a new state is produced
242 /// so the Rust compiler can ensure that the state is always valid and
243 /// transitions are not missed.
244 state: ParquetDecoderState,
245}
246
247impl ParquetPushDecoder {
248 /// Attempt to decode the next batch of data, or return what data is needed
249 ///
250 /// The the decoder communicates the next state with a [`DecodeResult`]
251 ///
252 /// See full example in [`ParquetPushDecoderBuilder`]
253 ///
254 /// ```no_run
255 /// # use parquet::arrow::push_decoder::ParquetPushDecoder;
256 /// use parquet::DecodeResult;
257 /// # fn get_decoder() -> ParquetPushDecoder { unimplemented!() }
258 /// # fn push_data(decoder: &mut ParquetPushDecoder, ranges: Vec<std::ops::Range<u64>>) { unimplemented!() }
259 /// let mut decoder = get_decoder();
260 /// loop {
261 /// match decoder.try_decode().unwrap() {
262 /// DecodeResult::NeedsData(ranges) => {
263 /// // The decoder needs more data. Fetch the data for the given ranges
264 /// // call decoder.push_ranges(ranges, data) and call again
265 /// push_data(&mut decoder, ranges);
266 /// }
267 /// DecodeResult::Data(batch) => {
268 /// // Successfully decoded the next batch of data
269 /// println!("Got batch with {} rows", batch.num_rows());
270 /// }
271 /// DecodeResult::Finished => {
272 /// // The decoder has finished decoding all data
273 /// break;
274 /// }
275 /// }
276 /// }
277 ///```
278 pub fn try_decode(&mut self) -> Result<DecodeResult<RecordBatch>, ParquetError> {
279 let current_state = std::mem::replace(&mut self.state, ParquetDecoderState::Finished);
280 let (new_state, decode_result) = current_state.try_next_batch()?;
281 self.state = new_state;
282 Ok(decode_result)
283 }
284
285 /// Return a [`ParquetRecordBatchReader`] that reads the next set of rows, or
286 /// return what data is needed to produce it.
287 ///
288 /// This API can be used to get a reader for decoding the next set of
289 /// RecordBatches while proceeding to begin fetching data for the set (e.g
290 /// row group)
291 ///
292 /// Example
293 /// ```no_run
294 /// # use parquet::arrow::push_decoder::ParquetPushDecoder;
295 /// use parquet::DecodeResult;
296 /// # fn get_decoder() -> ParquetPushDecoder { unimplemented!() }
297 /// # fn push_data(decoder: &mut ParquetPushDecoder, ranges: Vec<std::ops::Range<u64>>) { unimplemented!() }
298 /// let mut decoder = get_decoder();
299 /// loop {
300 /// match decoder.try_next_reader().unwrap() {
301 /// DecodeResult::NeedsData(ranges) => {
302 /// // The decoder needs more data. Fetch the data for the given ranges
303 /// // call decoder.push_ranges(ranges, data) and call again
304 /// push_data(&mut decoder, ranges);
305 /// }
306 /// DecodeResult::Data(reader) => {
307 /// // spawn a thread to read the batches in parallel
308 /// // with fetching the next row group / data
309 /// std::thread::spawn(move || {
310 /// for batch in reader {
311 /// let batch = batch.unwrap();
312 /// println!("Got batch with {} rows", batch.num_rows());
313 /// }
314 /// });
315 /// }
316 /// DecodeResult::Finished => {
317 /// // The decoder has finished decoding all data
318 /// break;
319 /// }
320 /// }
321 /// }
322 ///```
323 pub fn try_next_reader(
324 &mut self,
325 ) -> Result<DecodeResult<ParquetRecordBatchReader>, ParquetError> {
326 let current_state = std::mem::replace(&mut self.state, ParquetDecoderState::Finished);
327 let (new_state, decode_result) = current_state.try_next_reader()?;
328 self.state = new_state;
329 Ok(decode_result)
330 }
331
332 /// Push data into the decoder for processing
333 ///
334 /// This is a convenience wrapper around [`Self::push_ranges`] for pushing a
335 /// single range of data.
336 ///
337 /// Note this can be the entire file or just a part of it. If it is part of the file,
338 /// the ranges should correspond to the data ranges requested by the decoder.
339 ///
340 /// See example in [`ParquetPushDecoderBuilder`]
341 pub fn push_range(&mut self, range: Range<u64>, data: Bytes) -> Result<(), ParquetError> {
342 self.push_ranges(vec![range], vec![data])
343 }
344
345 /// Push data into the decoder for processing
346 ///
347 /// This should correspond to the data ranges requested by the decoder
348 pub fn push_ranges(
349 &mut self,
350 ranges: Vec<Range<u64>>,
351 data: Vec<Bytes>,
352 ) -> Result<(), ParquetError> {
353 let current_state = std::mem::replace(&mut self.state, ParquetDecoderState::Finished);
354 self.state = current_state.push_data(ranges, data)?;
355 Ok(())
356 }
357
358 /// Returns the total number of buffered bytes in the decoder
359 ///
360 /// This is the sum of the size of all [`Bytes`] that has been pushed to the
361 /// decoder but not yet consumed.
362 ///
363 /// Note that this does not include any overhead of the internal data
364 /// structures and that since [`Bytes`] are ref counted memory, this may not
365 /// reflect additional memory usage.
366 ///
367 /// This can be used to monitor memory usage of the decoder.
368 pub fn buffered_bytes(&self) -> u64 {
369 self.state.buffered_bytes()
370 }
371
372 /// Clear any staged byte ranges currently buffered for future decode work.
373 ///
374 /// This clears byte ranges still owned by the decoder's internal
375 /// `PushBuffers`. It does not affect any data that has already been handed
376 /// off to an active [`ParquetRecordBatchReader`].
377 pub fn clear_all_ranges(&mut self) {
378 self.state.clear_all_ranges();
379 }
380}
381
382/// Internal state machine for the [`ParquetPushDecoder`]
383#[derive(Debug)]
384enum ParquetDecoderState {
385 /// Waiting for data needed to decode the next RowGroup
386 ReadingRowGroup {
387 remaining_row_groups: Box<RemainingRowGroups>,
388 },
389 /// The decoder is actively decoding a RowGroup
390 DecodingRowGroup {
391 /// Current active reader
392 record_batch_reader: Box<ParquetRecordBatchReader>,
393 remaining_row_groups: Box<RemainingRowGroups>,
394 },
395 /// The decoder has finished processing all data
396 Finished,
397}
398
399impl ParquetDecoderState {
400 /// If actively reading a RowGroup, return the currently active
401 /// ParquetRecordBatchReader and advance to the next group.
402 fn try_next_reader(
403 self,
404 ) -> Result<(Self, DecodeResult<ParquetRecordBatchReader>), ParquetError> {
405 let mut current_state = self;
406 loop {
407 let (next_state, decode_result) = current_state.transition()?;
408 // if more data is needed to transition, can't proceed further without it
409 match decode_result {
410 DecodeResult::NeedsData(ranges) => {
411 return Ok((next_state, DecodeResult::NeedsData(ranges)));
412 }
413 // act next based on state
414 DecodeResult::Data(()) | DecodeResult::Finished => {}
415 }
416 match next_state {
417 // not ready to read yet, continue transitioning
418 Self::ReadingRowGroup { .. } => current_state = next_state,
419 // have a reader ready, so return it and set ourself to ReadingRowGroup
420 Self::DecodingRowGroup {
421 record_batch_reader,
422 remaining_row_groups,
423 } => {
424 let result = DecodeResult::Data(*record_batch_reader);
425 let next_state = Self::ReadingRowGroup {
426 remaining_row_groups,
427 };
428 return Ok((next_state, result));
429 }
430 Self::Finished => {
431 return Ok((Self::Finished, DecodeResult::Finished));
432 }
433 }
434 }
435 }
436
437 /// Current state --> next state + output
438 ///
439 /// This function is called to get the next RecordBatch
440 ///
441 /// This structure is used to reduce the indentation level of the main loop
442 /// in try_build
443 fn try_next_batch(self) -> Result<(Self, DecodeResult<RecordBatch>), ParquetError> {
444 let mut current_state = self;
445 loop {
446 let (new_state, decode_result) = current_state.transition()?;
447 // if more data is needed to transition, can't proceed further without it
448 match decode_result {
449 DecodeResult::NeedsData(ranges) => {
450 return Ok((new_state, DecodeResult::NeedsData(ranges)));
451 }
452 // act next based on state
453 DecodeResult::Data(()) | DecodeResult::Finished => {}
454 }
455 match new_state {
456 // not ready to read yet, continue transitioning
457 Self::ReadingRowGroup { .. } => current_state = new_state,
458 // have a reader ready, so decode the next batch
459 Self::DecodingRowGroup {
460 mut record_batch_reader,
461 remaining_row_groups,
462 } => {
463 match record_batch_reader.next() {
464 // Successfully decoded a batch, return it
465 Some(Ok(batch)) => {
466 let result = DecodeResult::Data(batch);
467 let next_state = Self::DecodingRowGroup {
468 record_batch_reader,
469 remaining_row_groups,
470 };
471 return Ok((next_state, result));
472 }
473 // No more batches in this row group, move to the next row group
474 None => {
475 current_state = Self::ReadingRowGroup {
476 remaining_row_groups,
477 }
478 }
479 // some error occurred while decoding, so return that
480 Some(Err(e)) => {
481 // TODO: preserve ArrowError in ParquetError (rather than convert to a string)
482 return Err(ParquetError::ArrowError(e.to_string()));
483 }
484 }
485 }
486 Self::Finished => {
487 return Ok((Self::Finished, DecodeResult::Finished));
488 }
489 }
490 }
491 }
492
493 /// Transition to the next state with a reader (data can be produced), if not end of stream
494 ///
495 /// This function is called in a loop until the decoder is ready to return
496 /// data (has the required pages buffered) or is finished.
497 fn transition(self) -> Result<(Self, DecodeResult<()>), ParquetError> {
498 // result returned when there is data ready
499 let data_ready = DecodeResult::Data(());
500 match self {
501 Self::ReadingRowGroup {
502 mut remaining_row_groups,
503 } => {
504 match remaining_row_groups.try_next_reader()? {
505 // If we have a next reader, we can transition to decoding it
506 DecodeResult::Data(record_batch_reader) => {
507 // Transition to decoding the row group
508 Ok((
509 Self::DecodingRowGroup {
510 record_batch_reader: Box::new(record_batch_reader),
511 remaining_row_groups,
512 },
513 data_ready,
514 ))
515 }
516 DecodeResult::NeedsData(ranges) => {
517 // If we need more data, we return the ranges needed and stay in Reading
518 // RowGroup state
519 Ok((
520 Self::ReadingRowGroup {
521 remaining_row_groups,
522 },
523 DecodeResult::NeedsData(ranges),
524 ))
525 }
526 // If there are no more readers, we are finished
527 DecodeResult::Finished => {
528 // No more row groups to read, we are finished
529 Ok((Self::Finished, DecodeResult::Finished))
530 }
531 }
532 }
533 // if we are already in DecodingRowGroup, just return data ready
534 Self::DecodingRowGroup { .. } => Ok((self, data_ready)),
535 // if finished, just return finished
536 Self::Finished => Ok((self, DecodeResult::Finished)),
537 }
538 }
539
540 /// Push data, and transition state if needed
541 ///
542 /// This should correspond to the data ranges requested by the decoder
543 pub fn push_data(
544 self,
545 ranges: Vec<Range<u64>>,
546 data: Vec<Bytes>,
547 ) -> Result<Self, ParquetError> {
548 match self {
549 ParquetDecoderState::ReadingRowGroup {
550 mut remaining_row_groups,
551 } => {
552 // Push data to the RowGroupReaderBuilder
553 remaining_row_groups.push_data(ranges, data);
554 Ok(ParquetDecoderState::ReadingRowGroup {
555 remaining_row_groups,
556 })
557 }
558 // it is ok to get data before we asked for it
559 ParquetDecoderState::DecodingRowGroup {
560 record_batch_reader,
561 mut remaining_row_groups,
562 } => {
563 remaining_row_groups.push_data(ranges, data);
564 Ok(ParquetDecoderState::DecodingRowGroup {
565 record_batch_reader,
566 remaining_row_groups,
567 })
568 }
569 ParquetDecoderState::Finished => Err(ParquetError::General(
570 "Cannot push data to a finished decoder".to_string(),
571 )),
572 }
573 }
574
575 /// How many bytes are currently buffered in the decoder?
576 fn buffered_bytes(&self) -> u64 {
577 match self {
578 ParquetDecoderState::ReadingRowGroup {
579 remaining_row_groups,
580 } => remaining_row_groups.buffered_bytes(),
581 ParquetDecoderState::DecodingRowGroup {
582 record_batch_reader: _,
583 remaining_row_groups,
584 } => remaining_row_groups.buffered_bytes(),
585 ParquetDecoderState::Finished => 0,
586 }
587 }
588
589 /// Clear any staged ranges currently buffered in the decoder.
590 fn clear_all_ranges(&mut self) {
591 match self {
592 ParquetDecoderState::ReadingRowGroup {
593 remaining_row_groups,
594 } => remaining_row_groups.clear_all_ranges(),
595 ParquetDecoderState::DecodingRowGroup {
596 record_batch_reader: _,
597 remaining_row_groups,
598 } => remaining_row_groups.clear_all_ranges(),
599 ParquetDecoderState::Finished => {}
600 }
601 }
602}
603
604#[cfg(test)]
605mod test {
606 use super::*;
607 use crate::DecodeResult;
608 use crate::arrow::arrow_reader::{ArrowPredicateFn, RowFilter, RowSelection, RowSelector};
609 use crate::arrow::push_decoder::{ParquetPushDecoder, ParquetPushDecoderBuilder};
610 use crate::arrow::{ArrowWriter, ProjectionMask};
611 use crate::errors::ParquetError;
612 use crate::file::metadata::ParquetMetaDataPushDecoder;
613 use crate::file::properties::WriterProperties;
614 use arrow::compute::kernels::cmp::{gt, lt};
615 use arrow_array::cast::AsArray;
616 use arrow_array::types::Int64Type;
617 use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringViewArray};
618 use arrow_select::concat::concat_batches;
619 use bytes::Bytes;
620 use std::fmt::Debug;
621 use std::ops::Range;
622 use std::sync::{Arc, LazyLock};
623
624 /// Test decoder struct size (as they are copied around on each transition, they
625 /// should not grow too large)
626 #[test]
627 fn test_decoder_size() {
628 assert_eq!(std::mem::size_of::<ParquetDecoderState>(), 24);
629 }
630
631 /// Decode the entire file at once, simulating a scenario where all data is
632 /// available in memory
633 #[test]
634 fn test_decoder_all_data() {
635 let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
636 .unwrap()
637 .build()
638 .unwrap();
639
640 decoder
641 .push_range(test_file_range(), TEST_FILE_DATA.clone())
642 .unwrap();
643
644 let results = vec![
645 // first row group should be decoded without needing more data
646 expect_data(decoder.try_decode()),
647 // second row group should be decoded without needing more data
648 expect_data(decoder.try_decode()),
649 ];
650 expect_finished(decoder.try_decode());
651
652 let all_output = concat_batches(&TEST_BATCH.schema(), &results).unwrap();
653 // Check that the output matches the input batch
654 assert_eq!(all_output, *TEST_BATCH);
655 }
656
657 /// Decode the entire file incrementally, simulating a scenario where data is
658 /// fetched as needed
659 #[test]
660 fn test_decoder_incremental() {
661 let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
662 .unwrap()
663 .build()
664 .unwrap();
665
666 let mut results = vec![];
667
668 // First row group, expect a single request
669 let ranges = expect_needs_data(decoder.try_decode());
670 let num_bytes_requested: u64 = ranges.iter().map(|r| r.end - r.start).sum();
671 push_ranges_to_decoder(&mut decoder, ranges);
672 // The decoder should currently only store the data it needs to decode the first row group
673 assert_eq!(decoder.buffered_bytes(), num_bytes_requested);
674 results.push(expect_data(decoder.try_decode()));
675 // the decoder should have consumed the data for the first row group and freed it
676 assert_eq!(decoder.buffered_bytes(), 0);
677
678 // Second row group,
679 let ranges = expect_needs_data(decoder.try_decode());
680 let num_bytes_requested: u64 = ranges.iter().map(|r| r.end - r.start).sum();
681 push_ranges_to_decoder(&mut decoder, ranges);
682 // The decoder should currently only store the data it needs to decode the second row group
683 assert_eq!(decoder.buffered_bytes(), num_bytes_requested);
684 results.push(expect_data(decoder.try_decode()));
685 // the decoder should have consumed the data for the second row group and freed it
686 assert_eq!(decoder.buffered_bytes(), 0);
687 expect_finished(decoder.try_decode());
688
689 // Check that the output matches the input batch
690 let all_output = concat_batches(&TEST_BATCH.schema(), &results).unwrap();
691 assert_eq!(all_output, *TEST_BATCH);
692 }
693
694 /// Releasing staged ranges should free speculative buffers without affecting
695 /// the active row group reader.
696 #[test]
697 fn test_decoder_clear_all_ranges() {
698 let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
699 .unwrap()
700 .with_batch_size(100)
701 .build()
702 .unwrap();
703
704 decoder
705 .push_range(test_file_range(), TEST_FILE_DATA.clone())
706 .unwrap();
707 assert_eq!(decoder.buffered_bytes(), test_file_len());
708
709 // The current row group reader is built from the prefetched bytes, but
710 // the speculative full-file range remains staged in the decoder.
711 let batch1 = expect_data(decoder.try_decode());
712 assert_eq!(batch1, TEST_BATCH.slice(0, 100));
713 assert_eq!(decoder.buffered_bytes(), test_file_len());
714
715 // All of the buffer is released
716 decoder.clear_all_ranges();
717 assert_eq!(decoder.buffered_bytes(), 0);
718
719 // The active reader still owns the current row group's bytes, so it can
720 // continue decoding without consulting PushBuffers.
721 let batch2 = expect_data(decoder.try_decode());
722 assert_eq!(batch2, TEST_BATCH.slice(100, 100));
723 assert_eq!(decoder.buffered_bytes(), 0);
724
725 // Moving to the next row group now requires the decoder to ask for data
726 // again because the staged speculative ranges were released.
727 let ranges = expect_needs_data(decoder.try_decode());
728 let num_bytes_requested: u64 = ranges.iter().map(|r| r.end - r.start).sum();
729 push_ranges_to_decoder(&mut decoder, ranges);
730 assert_eq!(decoder.buffered_bytes(), num_bytes_requested);
731
732 let batch3 = expect_data(decoder.try_decode());
733 assert_eq!(batch3, TEST_BATCH.slice(200, 100));
734 assert_eq!(decoder.buffered_bytes(), 0);
735
736 let batch4 = expect_data(decoder.try_decode());
737 assert_eq!(batch4, TEST_BATCH.slice(300, 100));
738 assert_eq!(decoder.buffered_bytes(), 0);
739
740 expect_finished(decoder.try_decode());
741 }
742
743 /// Decode the entire file incrementally, simulating partial reads
744 #[test]
745 fn test_decoder_partial() {
746 let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
747 .unwrap()
748 .build()
749 .unwrap();
750
751 // First row group, expect a single request for all data needed to read "a" and "b"
752 let ranges = expect_needs_data(decoder.try_decode());
753 push_ranges_to_decoder(&mut decoder, ranges);
754
755 let batch1 = expect_data(decoder.try_decode());
756 let expected1 = TEST_BATCH.slice(0, 200);
757 assert_eq!(batch1, expected1);
758
759 // Second row group, this time provide the data in two steps
760 let ranges = expect_needs_data(decoder.try_decode());
761 let (ranges1, ranges2) = ranges.split_at(ranges.len() / 2);
762 assert!(!ranges1.is_empty());
763 assert!(!ranges2.is_empty());
764 // push first half to simulate partial read
765 push_ranges_to_decoder(&mut decoder, ranges1.to_vec());
766
767 // still expect more data
768 let ranges = expect_needs_data(decoder.try_decode());
769 assert_eq!(ranges, ranges2); // should be the remaining ranges
770 // push empty ranges should be a no-op
771 push_ranges_to_decoder(&mut decoder, vec![]);
772 let ranges = expect_needs_data(decoder.try_decode());
773 assert_eq!(ranges, ranges2); // should be the remaining ranges
774 push_ranges_to_decoder(&mut decoder, ranges);
775
776 let batch2 = expect_data(decoder.try_decode());
777 let expected2 = TEST_BATCH.slice(200, 200);
778 assert_eq!(batch2, expected2);
779
780 expect_finished(decoder.try_decode());
781 }
782
783 /// Decode multiple columns "a" and "b", expect that the decoder requests
784 /// only a single request per row group
785 #[test]
786 fn test_decoder_selection_does_one_request() {
787 let builder =
788 ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
789
790 let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
791
792 let mut decoder = builder
793 .with_projection(
794 ProjectionMask::columns(&schema_descr, ["a", "b"]), // read "a", "b"
795 )
796 .build()
797 .unwrap();
798
799 // First row group, expect a single request for all data needed to read "a" and "b"
800 let ranges = expect_needs_data(decoder.try_decode());
801 push_ranges_to_decoder(&mut decoder, ranges);
802
803 let batch1 = expect_data(decoder.try_decode());
804 let expected1 = TEST_BATCH.slice(0, 200).project(&[0, 1]).unwrap();
805 assert_eq!(batch1, expected1);
806
807 // Second row group, similarly expect a single request for all data needed to read "a" and "b"
808 let ranges = expect_needs_data(decoder.try_decode());
809 push_ranges_to_decoder(&mut decoder, ranges);
810
811 let batch2 = expect_data(decoder.try_decode());
812 let expected2 = TEST_BATCH.slice(200, 200).project(&[0, 1]).unwrap();
813 assert_eq!(batch2, expected2);
814
815 expect_finished(decoder.try_decode());
816 }
817
818 /// Decode with a filter that requires multiple requests, but only provide part
819 /// of the data needed for the filter at a time simulating partial reads.
820 #[test]
821 fn test_decoder_single_filter_partial() {
822 let builder =
823 ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
824
825 // Values in column "a" range 0..399
826 // First filter: "a" > 250 (nothing in Row Group 0, both data pages in Row Group 1)
827 let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
828
829 // a > 250
830 let row_filter_a = ArrowPredicateFn::new(
831 // claim to use both a and b so we get two ranges requests for the filter pages
832 ProjectionMask::columns(&schema_descr, ["a", "b"]),
833 |batch: RecordBatch| {
834 let scalar_250 = Int64Array::new_scalar(250);
835 let column = batch.column(0).as_primitive::<Int64Type>();
836 gt(column, &scalar_250)
837 },
838 );
839
840 let mut decoder = builder
841 .with_projection(
842 // read only column "a" to test that filter pages are reused
843 ProjectionMask::columns(&schema_descr, ["a"]), // read "a"
844 )
845 .with_row_filter(RowFilter::new(vec![Box::new(row_filter_a)]))
846 .build()
847 .unwrap();
848
849 // First row group, evaluating filters
850 let ranges = expect_needs_data(decoder.try_decode());
851 // only provide half the ranges
852 let (ranges1, ranges2) = ranges.split_at(ranges.len() / 2);
853 assert!(!ranges1.is_empty());
854 assert!(!ranges2.is_empty());
855 push_ranges_to_decoder(&mut decoder, ranges1.to_vec());
856 // still expect more data
857 let ranges = expect_needs_data(decoder.try_decode());
858 assert_eq!(ranges, ranges2); // should be the remaining ranges
859 let ranges = expect_needs_data(decoder.try_decode());
860 assert_eq!(ranges, ranges2); // should be the remaining ranges
861 push_ranges_to_decoder(&mut decoder, ranges2.to_vec());
862
863 // Since no rows in the first row group pass the filters, there is no
864 // additional requests to read data pages for "b" here
865
866 // Second row group
867 let ranges = expect_needs_data(decoder.try_decode());
868 push_ranges_to_decoder(&mut decoder, ranges);
869
870 let batch = expect_data(decoder.try_decode());
871 let expected = TEST_BATCH.slice(251, 149).project(&[0]).unwrap();
872 assert_eq!(batch, expected);
873
874 expect_finished(decoder.try_decode());
875 }
876
877 /// Decode with a filter where we also skip one of the RowGroups via a RowSelection
878 #[test]
879 fn test_decoder_single_filter_and_row_selection() {
880 let builder =
881 ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
882
883 // Values in column "a" range 0..399
884 // First filter: "a" > 250 (nothing in Row Group 0, last data page in Row Group 1)
885 let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
886
887 // a > 250
888 let row_filter_a = ArrowPredicateFn::new(
889 ProjectionMask::columns(&schema_descr, ["a"]),
890 |batch: RecordBatch| {
891 let scalar_250 = Int64Array::new_scalar(250);
892 let column = batch.column(0).as_primitive::<Int64Type>();
893 gt(column, &scalar_250)
894 },
895 );
896
897 let mut decoder = builder
898 .with_projection(
899 // read only column "a" to test that filter pages are reused
900 ProjectionMask::columns(&schema_descr, ["b"]), // read "b"
901 )
902 .with_row_filter(RowFilter::new(vec![Box::new(row_filter_a)]))
903 .with_row_selection(RowSelection::from(vec![
904 RowSelector::skip(200), // skip first row group
905 RowSelector::select(100), // first 100 rows of second row group
906 RowSelector::skip(100),
907 ]))
908 .build()
909 .unwrap();
910
911 // expect the first row group to be filtered out (no filter is evaluated due to row selection)
912
913 // First row group, first filter (a > 250)
914 let ranges = expect_needs_data(decoder.try_decode());
915 push_ranges_to_decoder(&mut decoder, ranges);
916
917 // Second row group
918 let ranges = expect_needs_data(decoder.try_decode());
919 push_ranges_to_decoder(&mut decoder, ranges);
920
921 let batch = expect_data(decoder.try_decode());
922 let expected = TEST_BATCH.slice(251, 49).project(&[1]).unwrap();
923 assert_eq!(batch, expected);
924
925 expect_finished(decoder.try_decode());
926 }
927
928 /// Decode with multiple filters that require multiple requests
929 #[test]
930 fn test_decoder_multi_filters() {
931 // Create a decoder for decoding parquet data (note it does not have any IO / readers)
932 let builder =
933 ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
934
935 // Values in column "a" range 0..399
936 // Values in column "b" range 400..799
937 // First filter: "a" > 175 (last data page in Row Group 0)
938 // Second filter: "b" < 625 (last data page in Row Group 0 and first DataPage in RowGroup 1)
939 let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
940
941 // a > 175
942 let row_filter_a = ArrowPredicateFn::new(
943 ProjectionMask::columns(&schema_descr, ["a"]),
944 |batch: RecordBatch| {
945 let scalar_175 = Int64Array::new_scalar(175);
946 let column = batch.column(0).as_primitive::<Int64Type>();
947 gt(column, &scalar_175)
948 },
949 );
950
951 // b < 625
952 let row_filter_b = ArrowPredicateFn::new(
953 ProjectionMask::columns(&schema_descr, ["b"]),
954 |batch: RecordBatch| {
955 let scalar_625 = Int64Array::new_scalar(625);
956 let column = batch.column(0).as_primitive::<Int64Type>();
957 lt(column, &scalar_625)
958 },
959 );
960
961 let mut decoder = builder
962 .with_projection(
963 ProjectionMask::columns(&schema_descr, ["c"]), // read "c"
964 )
965 .with_row_filter(RowFilter::new(vec![
966 Box::new(row_filter_a),
967 Box::new(row_filter_b),
968 ]))
969 .build()
970 .unwrap();
971
972 // First row group, first filter (a > 175)
973 let ranges = expect_needs_data(decoder.try_decode());
974 push_ranges_to_decoder(&mut decoder, ranges);
975
976 // first row group, second filter (b < 625)
977 let ranges = expect_needs_data(decoder.try_decode());
978 push_ranges_to_decoder(&mut decoder, ranges);
979
980 // first row group, data pages for "c"
981 let ranges = expect_needs_data(decoder.try_decode());
982 push_ranges_to_decoder(&mut decoder, ranges);
983
984 // expect the first batch to be decoded: rows 176..199, column "c"
985 let batch1 = expect_data(decoder.try_decode());
986 let expected1 = TEST_BATCH.slice(176, 24).project(&[2]).unwrap();
987 assert_eq!(batch1, expected1);
988
989 // Second row group, first filter (a > 175)
990 let ranges = expect_needs_data(decoder.try_decode());
991 push_ranges_to_decoder(&mut decoder, ranges);
992
993 // Second row group, second filter (b < 625)
994 let ranges = expect_needs_data(decoder.try_decode());
995 push_ranges_to_decoder(&mut decoder, ranges);
996
997 // Second row group, data pages for "c"
998 let ranges = expect_needs_data(decoder.try_decode());
999 push_ranges_to_decoder(&mut decoder, ranges);
1000
1001 // expect the second batch to be decoded: rows 200..224, column "c"
1002 let batch2 = expect_data(decoder.try_decode());
1003 let expected2 = TEST_BATCH.slice(200, 25).project(&[2]).unwrap();
1004 assert_eq!(batch2, expected2);
1005
1006 expect_finished(decoder.try_decode());
1007 }
1008
1009 /// Decode with a filter that uses a column that is also projected, and expect
1010 /// that the filter pages are reused (don't refetch them)
1011 #[test]
1012 fn test_decoder_reuses_filter_pages() {
1013 // Create a decoder for decoding parquet data (note it does not have any IO / readers)
1014 let builder =
1015 ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
1016
1017 // Values in column "a" range 0..399
1018 // First filter: "a" > 250 (nothing in Row Group 0, last data page in Row Group 1)
1019 let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
1020
1021 // a > 250
1022 let row_filter_a = ArrowPredicateFn::new(
1023 ProjectionMask::columns(&schema_descr, ["a"]),
1024 |batch: RecordBatch| {
1025 let scalar_250 = Int64Array::new_scalar(250);
1026 let column = batch.column(0).as_primitive::<Int64Type>();
1027 gt(column, &scalar_250)
1028 },
1029 );
1030
1031 let mut decoder = builder
1032 .with_projection(
1033 // read only column "a" to test that filter pages are reused
1034 ProjectionMask::columns(&schema_descr, ["a"]), // read "a"
1035 )
1036 .with_row_filter(RowFilter::new(vec![Box::new(row_filter_a)]))
1037 .build()
1038 .unwrap();
1039
1040 // First row group, first filter (a > 175)
1041 let ranges = expect_needs_data(decoder.try_decode());
1042 push_ranges_to_decoder(&mut decoder, ranges);
1043
1044 // expect the first row group to be filtered out (no rows match)
1045
1046 // Second row group, first filter (a > 250)
1047 let ranges = expect_needs_data(decoder.try_decode());
1048 push_ranges_to_decoder(&mut decoder, ranges);
1049
1050 // expect that the second row group is decoded: rows 251..399, column "a"
1051 // Note that the filter pages for "a" should be reused and no additional data
1052 // should be requested
1053 let batch = expect_data(decoder.try_decode());
1054 let expected = TEST_BATCH.slice(251, 149).project(&[0]).unwrap();
1055 assert_eq!(batch, expected);
1056
1057 expect_finished(decoder.try_decode());
1058 }
1059
1060 #[test]
1061 fn test_decoder_empty_filters() {
1062 let builder =
1063 ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
1064 let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
1065
1066 // only read column "c", but with empty filters
1067 let mut decoder = builder
1068 .with_projection(
1069 ProjectionMask::columns(&schema_descr, ["c"]), // read "c"
1070 )
1071 .with_row_filter(RowFilter::new(vec![
1072 // empty filters should be ignored
1073 ]))
1074 .build()
1075 .unwrap();
1076
1077 // First row group
1078 let ranges = expect_needs_data(decoder.try_decode());
1079 push_ranges_to_decoder(&mut decoder, ranges);
1080
1081 // expect the first batch to be decoded: rows 0..199, column "c"
1082 let batch1 = expect_data(decoder.try_decode());
1083 let expected1 = TEST_BATCH.slice(0, 200).project(&[2]).unwrap();
1084 assert_eq!(batch1, expected1);
1085
1086 // Second row group,
1087 let ranges = expect_needs_data(decoder.try_decode());
1088 push_ranges_to_decoder(&mut decoder, ranges);
1089
1090 // expect the second batch to be decoded: rows 200..399, column "c"
1091 let batch2 = expect_data(decoder.try_decode());
1092 let expected2 = TEST_BATCH.slice(200, 200).project(&[2]).unwrap();
1093
1094 assert_eq!(batch2, expected2);
1095
1096 expect_finished(decoder.try_decode());
1097 }
1098
1099 /// When filter pushdown is combined with a `LIMIT`, the predicate must
1100 /// not be evaluated for rows beyond the `limit`-th match.
1101 ///
1102 /// Filter `a > 175` produces 24 matches in row group 0 (rows 176..199).
1103 /// With `limit = 10`, only the first 10 matches (rows 176..185) should be
1104 /// emitted, AND the predicate counter should observe that evaluation was
1105 /// short-circuited.
1106 #[test]
1107 fn test_decoder_filter_with_limit_short_circuits_within_row_group() {
1108 use std::sync::atomic::{AtomicUsize, Ordering};
1109
1110 let builder =
1111 ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
1112 let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
1113
1114 let rows_filtered = Arc::new(AtomicUsize::new(0));
1115 let rows_filtered_for_predicate = Arc::clone(&rows_filtered);
1116
1117 let row_filter_a = ArrowPredicateFn::new(
1118 ProjectionMask::columns(&schema_descr, ["a"]),
1119 move |batch: RecordBatch| {
1120 rows_filtered_for_predicate.fetch_add(batch.num_rows(), Ordering::Relaxed);
1121 let scalar_175 = Int64Array::new_scalar(175);
1122 let column = batch.column(0).as_primitive::<Int64Type>();
1123 gt(column, &scalar_175)
1124 },
1125 );
1126
1127 // Use a small batch size so the row group is evaluated across
1128 // multiple predicate batches; that is the regime where Layer 2's
1129 // short-circuit saves predicate evaluation work. Matching rows are
1130 // 176..199 (24 rows); with batch_size = 10 those span batches 17, 18,
1131 // and 19 (rows 170..199). A limit of 10 should stop filter evaluation
1132 // in the middle of batch 18.
1133 let mut decoder = builder
1134 .with_projection(ProjectionMask::columns(&schema_descr, ["a"]))
1135 .with_row_filter(RowFilter::new(vec![Box::new(row_filter_a)]))
1136 .with_batch_size(10)
1137 .with_limit(10)
1138 .build()
1139 .unwrap();
1140
1141 // First row group: filter columns fetch (predicate is evaluated here)
1142 let ranges = expect_needs_data(decoder.try_decode());
1143 push_ranges_to_decoder(&mut decoder, ranges);
1144
1145 // The first 10 matching rows come out: 176..185, column "a"
1146 let batch = expect_data(decoder.try_decode());
1147 let expected = TEST_BATCH.slice(176, 10).project(&[0]).unwrap();
1148 assert_eq!(batch, expected);
1149
1150 // no data for row group 1 should be requested — the limit
1151 // was satisfied by row group 0 and the `Start` state for row group 1
1152 // short-circuits to `Finished`.
1153 expect_finished(decoder.try_decode());
1154
1155 // Row 186 is the 11th match; the scan should stop no later than the
1156 // batch containing it (batch 18 of 10 rows = rows 180..189), so at
1157 // most 190 rows are evaluated.
1158 let evaluated = rows_filtered.load(Ordering::Relaxed);
1159 assert!(
1160 evaluated <= 190,
1161 "predicate evaluated {evaluated} rows; expected ≤ 190 (stop within batch containing 11th match)"
1162 );
1163 }
1164
1165 /// Once the limit has been satisfied by a prior row group, subsequent
1166 /// row groups should be skipped entirely — no data request for their
1167 /// filter columns.
1168 #[test]
1169 fn test_decoder_filter_with_limit_skips_later_row_groups() {
1170 let builder =
1171 ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
1172 let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
1173
1174 // `a > 175` matches rows 176..199 in row group 0 (24 matches) and
1175 // 200..399 in row group 1 (200 matches). With limit = 5, all matches
1176 // should come from row group 0.
1177 let row_filter_a = ArrowPredicateFn::new(
1178 ProjectionMask::columns(&schema_descr, ["a"]),
1179 |batch: RecordBatch| {
1180 let scalar_175 = Int64Array::new_scalar(175);
1181 let column = batch.column(0).as_primitive::<Int64Type>();
1182 gt(column, &scalar_175)
1183 },
1184 );
1185
1186 let mut decoder = builder
1187 .with_projection(ProjectionMask::columns(&schema_descr, ["a"]))
1188 .with_row_filter(RowFilter::new(vec![Box::new(row_filter_a)]))
1189 .with_limit(5)
1190 .build()
1191 .unwrap();
1192
1193 // Row group 0: fetch filter pages
1194 let ranges = expect_needs_data(decoder.try_decode());
1195 push_ranges_to_decoder(&mut decoder, ranges);
1196
1197 // First 5 matches: 176..180
1198 let batch = expect_data(decoder.try_decode());
1199 let expected = TEST_BATCH.slice(176, 5).project(&[0]).unwrap();
1200 assert_eq!(batch, expected);
1201
1202 // Row group 1 must NOT request data — the limit is already satisfied
1203 // so `Start` in row group 1 short-circuits to `Finished`.
1204 expect_finished(decoder.try_decode());
1205 }
1206
1207 /// The predicate short-circuit must account for `self.offset` as well as
1208 /// `self.limit`. The post-predicate `with_offset` step skips that many
1209 /// already-selected rows before `with_limit` counts output rows — so the
1210 /// predicate must retain at least `offset + limit` matches. Without the
1211 /// fix, Layer 2 caps at just `limit` and the later `with_offset` consumes
1212 /// all of them, producing 0 rows instead of `limit`.
1213 ///
1214 /// `a > 175` matches rows 176..199 in row group 0 (24 matches). With
1215 /// `offset = 10, limit = 5`, the expected output is rows 186..190 (the
1216 /// 11th through 15th matches).
1217 #[test]
1218 fn test_decoder_filter_with_offset_and_limit() {
1219 let builder =
1220 ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
1221 let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
1222
1223 let row_filter_a = ArrowPredicateFn::new(
1224 ProjectionMask::columns(&schema_descr, ["a"]),
1225 |batch: RecordBatch| {
1226 let scalar_175 = Int64Array::new_scalar(175);
1227 let column = batch.column(0).as_primitive::<Int64Type>();
1228 gt(column, &scalar_175)
1229 },
1230 );
1231
1232 let mut decoder = builder
1233 .with_projection(ProjectionMask::columns(&schema_descr, ["a"]))
1234 .with_row_filter(RowFilter::new(vec![Box::new(row_filter_a)]))
1235 .with_offset(10)
1236 .with_limit(5)
1237 .build()
1238 .unwrap();
1239
1240 let ranges = expect_needs_data(decoder.try_decode());
1241 push_ranges_to_decoder(&mut decoder, ranges);
1242
1243 let batch = expect_data(decoder.try_decode());
1244 let expected = TEST_BATCH.slice(186, 5).project(&[0]).unwrap();
1245 assert_eq!(batch, expected);
1246
1247 expect_finished(decoder.try_decode());
1248 }
1249
1250 /// The limit short-circuit must also be correct when the limited predicate
1251 /// is the last predicate in a multi-predicate chain.
1252 ///
1253 /// `a > 175` first narrows row group 0 to rows 176..199. The final
1254 /// predicate `b < 625` is then evaluated only over those 24 rows, all of
1255 /// which match. With `limit = 10`, the final output should still be rows
1256 /// 176..185, and the second predicate should stop before consuming all 24
1257 /// selected rows.
1258 #[test]
1259 fn test_decoder_multi_filters_with_limit() {
1260 use std::sync::atomic::{AtomicUsize, Ordering};
1261
1262 let builder =
1263 ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
1264 let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
1265
1266 let first_predicate_rows = Arc::new(AtomicUsize::new(0));
1267 let second_predicate_rows = Arc::new(AtomicUsize::new(0));
1268
1269 let first_predicate_rows_for_filter = Arc::clone(&first_predicate_rows);
1270 let row_filter_a = ArrowPredicateFn::new(
1271 ProjectionMask::columns(&schema_descr, ["a"]),
1272 move |batch: RecordBatch| {
1273 first_predicate_rows_for_filter.fetch_add(batch.num_rows(), Ordering::Relaxed);
1274 let scalar_175 = Int64Array::new_scalar(175);
1275 let column = batch.column(0).as_primitive::<Int64Type>();
1276 gt(column, &scalar_175)
1277 },
1278 );
1279
1280 let second_predicate_rows_for_filter = Arc::clone(&second_predicate_rows);
1281 let row_filter_b = ArrowPredicateFn::new(
1282 ProjectionMask::columns(&schema_descr, ["b"]),
1283 move |batch: RecordBatch| {
1284 second_predicate_rows_for_filter.fetch_add(batch.num_rows(), Ordering::Relaxed);
1285 let scalar_625 = Int64Array::new_scalar(625);
1286 let column = batch.column(0).as_primitive::<Int64Type>();
1287 lt(column, &scalar_625)
1288 },
1289 );
1290
1291 let mut decoder = builder
1292 .with_projection(ProjectionMask::columns(&schema_descr, ["c"]))
1293 .with_row_filter(RowFilter::new(vec![
1294 Box::new(row_filter_a),
1295 Box::new(row_filter_b),
1296 ]))
1297 .with_batch_size(10)
1298 .with_limit(10)
1299 .build()
1300 .unwrap();
1301
1302 // Row group 0, first predicate
1303 let ranges = expect_needs_data(decoder.try_decode());
1304 push_ranges_to_decoder(&mut decoder, ranges);
1305
1306 // Row group 0, second predicate
1307 let ranges = expect_needs_data(decoder.try_decode());
1308 push_ranges_to_decoder(&mut decoder, ranges);
1309
1310 // Final projected data
1311 let ranges = expect_needs_data(decoder.try_decode());
1312 push_ranges_to_decoder(&mut decoder, ranges);
1313
1314 let batch = expect_data(decoder.try_decode());
1315 let expected = TEST_BATCH.slice(176, 10).project(&[2]).unwrap();
1316 assert_eq!(batch, expected);
1317
1318 // The overall limit was satisfied by row group 0.
1319 expect_finished(decoder.try_decode());
1320
1321 assert_eq!(first_predicate_rows.load(Ordering::Relaxed), 200);
1322 assert!(
1323 second_predicate_rows.load(Ordering::Relaxed) < 24,
1324 "final predicate should short-circuit before consuming all 24 rows selected by the first predicate"
1325 );
1326 }
1327
1328 /// When a row selection already exists, limiting the predicate must still
1329 /// preserve alignment with that prior selection.
1330 ///
1331 /// The explicit selection narrows row group 0 to rows 150..199. Applying
1332 /// `a > 175` over that selection yields rows 176..199. With `limit = 10`,
1333 /// the decoder should emit rows 176..185 and stop without evaluating the
1334 /// remaining selected rows.
1335 #[test]
1336 fn test_decoder_filter_with_row_selection_and_limit() {
1337 use std::sync::atomic::{AtomicUsize, Ordering};
1338
1339 let builder =
1340 ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
1341 let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
1342
1343 let rows_filtered = Arc::new(AtomicUsize::new(0));
1344 let rows_filtered_for_predicate = Arc::clone(&rows_filtered);
1345
1346 let row_filter_a = ArrowPredicateFn::new(
1347 ProjectionMask::columns(&schema_descr, ["a"]),
1348 move |batch: RecordBatch| {
1349 rows_filtered_for_predicate.fetch_add(batch.num_rows(), Ordering::Relaxed);
1350 let scalar_175 = Int64Array::new_scalar(175);
1351 let column = batch.column(0).as_primitive::<Int64Type>();
1352 gt(column, &scalar_175)
1353 },
1354 );
1355
1356 let mut decoder = builder
1357 .with_projection(ProjectionMask::columns(&schema_descr, ["a"]))
1358 .with_row_selection(RowSelection::from(vec![
1359 RowSelector::skip(150),
1360 RowSelector::select(50),
1361 ]))
1362 .with_row_filter(RowFilter::new(vec![Box::new(row_filter_a)]))
1363 .with_batch_size(10)
1364 .with_limit(10)
1365 .build()
1366 .unwrap();
1367
1368 let ranges = expect_needs_data(decoder.try_decode());
1369 push_ranges_to_decoder(&mut decoder, ranges);
1370
1371 let batch = expect_data(decoder.try_decode());
1372 let expected = TEST_BATCH.slice(176, 10).project(&[0]).unwrap();
1373 assert_eq!(batch, expected);
1374
1375 expect_finished(decoder.try_decode());
1376
1377 assert!(
1378 rows_filtered.load(Ordering::Relaxed) < 50,
1379 "predicate should short-circuit before consuming all 50 rows from the explicit row selection"
1380 );
1381 }
1382
1383 #[test]
1384 fn test_decoder_offset_limit() {
1385 let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
1386 .unwrap()
1387 // skip entire first row group (200 rows) and first 25 rows of second row group
1388 .with_offset(225)
1389 // and limit to 20 rows
1390 .with_limit(20)
1391 .build()
1392 .unwrap();
1393
1394 // First row group should be skipped,
1395
1396 // Second row group
1397 let ranges = expect_needs_data(decoder.try_decode());
1398 push_ranges_to_decoder(&mut decoder, ranges);
1399
1400 // expect the first and only batch to be decoded
1401 let batch1 = expect_data(decoder.try_decode());
1402 let expected1 = TEST_BATCH.slice(225, 20);
1403 assert_eq!(batch1, expected1);
1404
1405 expect_finished(decoder.try_decode());
1406 }
1407
1408 #[test]
1409 fn test_decoder_try_next_reader_offset_limit() {
1410 let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
1411 .unwrap()
1412 .with_offset(225)
1413 .with_limit(20)
1414 .build()
1415 .unwrap();
1416
1417 let ranges = expect_needs_data(decoder.try_next_reader());
1418 push_ranges_to_decoder(&mut decoder, ranges);
1419
1420 let reader = expect_data(decoder.try_next_reader());
1421 let batches = reader
1422 .map(|batch| batch.expect("expected decoded batch"))
1423 .collect::<Vec<_>>();
1424 let output = concat_batches(&TEST_BATCH.schema(), &batches).unwrap();
1425 assert_eq!(output, TEST_BATCH.slice(225, 20));
1426
1427 expect_finished(decoder.try_next_reader());
1428 }
1429
1430 #[test]
1431 fn test_decoder_row_group_selection() {
1432 // take only the second row group
1433 let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
1434 .unwrap()
1435 .with_row_groups(vec![1])
1436 .build()
1437 .unwrap();
1438
1439 // First row group should be skipped,
1440
1441 // Second row group
1442 let ranges = expect_needs_data(decoder.try_decode());
1443 push_ranges_to_decoder(&mut decoder, ranges);
1444
1445 // expect the first and only batch to be decoded
1446 let batch1 = expect_data(decoder.try_decode());
1447 let expected1 = TEST_BATCH.slice(200, 200);
1448 assert_eq!(batch1, expected1);
1449
1450 expect_finished(decoder.try_decode());
1451 }
1452
1453 #[test]
1454 fn test_decoder_row_selection() {
1455 // take only the second row group
1456 let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
1457 .unwrap()
1458 .with_row_selection(RowSelection::from(vec![
1459 RowSelector::skip(225), // skip first row group and 25 rows of second])
1460 RowSelector::select(20), // take 20 rows
1461 ]))
1462 .build()
1463 .unwrap();
1464
1465 // First row group should be skipped,
1466
1467 // Second row group
1468 let ranges = expect_needs_data(decoder.try_decode());
1469 push_ranges_to_decoder(&mut decoder, ranges);
1470
1471 // expect the first ane only batch to be decoded
1472 let batch1 = expect_data(decoder.try_decode());
1473 let expected1 = TEST_BATCH.slice(225, 20);
1474 assert_eq!(batch1, expected1);
1475
1476 expect_finished(decoder.try_decode());
1477 }
1478
1479 /// Returns a batch with 400 rows, with 3 columns: "a", "b", "c"
1480 ///
1481 /// Note c is a different types (so the data page sizes will be different)
1482 static TEST_BATCH: LazyLock<RecordBatch> = LazyLock::new(|| {
1483 let a: ArrayRef = Arc::new(Int64Array::from_iter_values(0..400));
1484 let b: ArrayRef = Arc::new(Int64Array::from_iter_values(400..800));
1485 let c: ArrayRef = Arc::new(StringViewArray::from_iter_values((0..400).map(|i| {
1486 if i % 2 == 0 {
1487 format!("string_{i}")
1488 } else {
1489 format!("A string larger than 12 bytes and thus not inlined {i}")
1490 }
1491 })));
1492
1493 RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap()
1494 });
1495
1496 /// Create a parquet file in memory for testing.
1497 ///
1498 /// See [`TEST_BATCH`] for the data in the file.
1499 ///
1500 /// Each column is written in 4 data pages, each with 100 rows, across 2
1501 /// row groups. Each column in each row group has two data pages.
1502 ///
1503 /// The data is split across row groups like this
1504 ///
1505 /// Column | Values | Data Page | Row Group
1506 /// -------|------------------------|-----------|-----------
1507 /// a | 0..99 | 1 | 0
1508 /// a | 100..199 | 2 | 0
1509 /// a | 200..299 | 1 | 1
1510 /// a | 300..399 | 2 | 1
1511 ///
1512 /// b | 400..499 | 1 | 0
1513 /// b | 500..599 | 2 | 0
1514 /// b | 600..699 | 1 | 1
1515 /// b | 700..799 | 2 | 1
1516 ///
1517 /// c | "string_0".."string_99" | 1 | 0
1518 /// c | "string_100".."string_199" | 2 | 0
1519 /// c | "string_200".."string_299" | 1 | 1
1520 /// c | "string_300".."string_399" | 2 | 1
1521 static TEST_FILE_DATA: LazyLock<Bytes> = LazyLock::new(|| {
1522 let input_batch = &TEST_BATCH;
1523 let mut output = Vec::new();
1524
1525 let writer_options = WriterProperties::builder()
1526 .set_max_row_group_row_count(Some(200))
1527 .set_data_page_row_count_limit(100)
1528 .build();
1529 let mut writer =
1530 ArrowWriter::try_new(&mut output, input_batch.schema(), Some(writer_options)).unwrap();
1531
1532 // since the limits are only enforced on batch boundaries, write the input
1533 // batch in chunks of 50
1534 let mut row_remain = input_batch.num_rows();
1535 while row_remain > 0 {
1536 let chunk_size = row_remain.min(50);
1537 let chunk = input_batch.slice(input_batch.num_rows() - row_remain, chunk_size);
1538 writer.write(&chunk).unwrap();
1539 row_remain -= chunk_size;
1540 }
1541 writer.close().unwrap();
1542 Bytes::from(output)
1543 });
1544
1545 /// Return the length of [`TEST_FILE_DATA`], in bytes
1546 fn test_file_len() -> u64 {
1547 TEST_FILE_DATA.len() as u64
1548 }
1549
1550 /// Return a range that covers the entire [`TEST_FILE_DATA`]
1551 fn test_file_range() -> Range<u64> {
1552 0..test_file_len()
1553 }
1554
1555 /// Return a slice of the test file data from the given range
1556 pub fn test_file_slice(range: Range<u64>) -> Bytes {
1557 let start: usize = range.start.try_into().unwrap();
1558 let end: usize = range.end.try_into().unwrap();
1559 TEST_FILE_DATA.slice(start..end)
1560 }
1561
1562 /// return the metadata for the test file
1563 pub fn test_file_parquet_metadata() -> Arc<crate::file::metadata::ParquetMetaData> {
1564 let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(test_file_len()).unwrap();
1565 push_ranges_to_metadata_decoder(&mut metadata_decoder, vec![test_file_range()]);
1566 let metadata = metadata_decoder.try_decode().unwrap();
1567 let DecodeResult::Data(metadata) = metadata else {
1568 panic!("Expected metadata to be decoded successfully");
1569 };
1570 Arc::new(metadata)
1571 }
1572
1573 /// Push the given ranges to the metadata decoder, simulating reading from a file
1574 fn push_ranges_to_metadata_decoder(
1575 metadata_decoder: &mut ParquetMetaDataPushDecoder,
1576 ranges: Vec<Range<u64>>,
1577 ) {
1578 let data = ranges
1579 .iter()
1580 .map(|range| test_file_slice(range.clone()))
1581 .collect::<Vec<_>>();
1582 metadata_decoder.push_ranges(ranges, data).unwrap();
1583 }
1584
1585 fn push_ranges_to_decoder(decoder: &mut ParquetPushDecoder, ranges: Vec<Range<u64>>) {
1586 let data = ranges
1587 .iter()
1588 .map(|range| test_file_slice(range.clone()))
1589 .collect::<Vec<_>>();
1590 decoder.push_ranges(ranges, data).unwrap();
1591 }
1592
1593 /// Expect that the [`DecodeResult`] is a [`DecodeResult::Data`] and return the corresponding element
1594 fn expect_data<T: Debug>(result: Result<DecodeResult<T>, ParquetError>) -> T {
1595 match result.expect("Expected Ok(DecodeResult::Data(T))") {
1596 DecodeResult::Data(data) => data,
1597 result => panic!("Expected DecodeResult::Data, got {result:?}"),
1598 }
1599 }
1600
1601 /// Expect that the [`DecodeResult`] is a [`DecodeResult::NeedsData`] and return the corresponding ranges
1602 fn expect_needs_data<T: Debug>(
1603 result: Result<DecodeResult<T>, ParquetError>,
1604 ) -> Vec<Range<u64>> {
1605 match result.expect("Expected Ok(DecodeResult::NeedsData{ranges})") {
1606 DecodeResult::NeedsData(ranges) => ranges,
1607 result => panic!("Expected DecodeResult::NeedsData, got {result:?}"),
1608 }
1609 }
1610
1611 fn expect_finished<T: Debug>(result: Result<DecodeResult<T>, ParquetError>) {
1612 match result.expect("Expected Ok(DecodeResult::Finished)") {
1613 DecodeResult::Finished => {}
1614 result => panic!("Expected DecodeResult::Finished, got {result:?}"),
1615 }
1616 }
1617}