parquet/arrow/push_decoder/mod.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ParquetPushDecoder`]: decodes Parquet data with data provided by the
19//! caller (rather than from an underlying reader).
20
21mod reader_builder;
22mod remaining;
23
24use crate::DecodeResult;
25use crate::arrow::arrow_reader::{
26 ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader,
27};
28use crate::errors::ParquetError;
29use crate::file::metadata::ParquetMetaData;
30use crate::util::push_buffers::PushBuffers;
31use arrow_array::RecordBatch;
32use bytes::Bytes;
33use reader_builder::RowGroupReaderBuilder;
34use remaining::RemainingRowGroups;
35use std::ops::Range;
36use std::sync::Arc;
37
38/// A builder for [`ParquetPushDecoder`].
39///
40/// To create a new decoder, use [`ParquetPushDecoderBuilder::try_new_decoder`].
41///
42/// You can decode the metadata from a Parquet file using either
43/// [`ParquetMetadataReader`] or [`ParquetMetaDataPushDecoder`].
44///
45/// [`ParquetMetadataReader`]: crate::file::metadata::ParquetMetaDataReader
46/// [`ParquetMetaDataPushDecoder`]: crate::file::metadata::ParquetMetaDataPushDecoder
47///
48/// Note the "input" type is `u64` which represents the length of the Parquet file
49/// being decoded. This is needed to initialize the internal buffers that track
50/// what data has been provided to the decoder.
51///
52/// # Example
53/// ```
54/// # use std::ops::Range;
55/// # use std::sync::Arc;
56/// # use bytes::Bytes;
57/// # use arrow_array::record_batch;
58/// # use parquet::DecodeResult;
59/// # use parquet::arrow::push_decoder::ParquetPushDecoderBuilder;
60/// # use parquet::arrow::ArrowWriter;
61/// # use parquet::file::metadata::ParquetMetaDataPushDecoder;
62/// # let file_bytes = {
63/// # let mut buffer = vec![];
64/// # let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
65/// # let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
66/// # writer.write(&batch).unwrap();
67/// # writer.close().unwrap();
68/// # Bytes::from(buffer)
69/// # };
70/// # // mimic IO by returning a function that returns the bytes for a given range
71/// # let get_range = |range: &Range<u64>| -> Bytes {
72/// # let start = range.start as usize;
73/// # let end = range.end as usize;
74/// # file_bytes.slice(start..end)
75/// # };
76/// # let file_length = file_bytes.len() as u64;
77/// # let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_length).unwrap();
78/// # metadata_decoder.push_ranges(vec![0..file_length], vec![file_bytes.clone()]).unwrap();
79/// # let DecodeResult::Data(parquet_metadata) = metadata_decoder.try_decode().unwrap() else { panic!("failed to decode metadata") };
80/// # let parquet_metadata = Arc::new(parquet_metadata);
81/// // The file length and metadata are required to create the decoder
82/// let mut decoder =
83/// ParquetPushDecoderBuilder::try_new_decoder(parquet_metadata)
84/// .unwrap()
85/// // Optionally configure the decoder, e.g. batch size
86/// .with_batch_size(1024)
87/// // Build the decoder
88/// .build()
89/// .unwrap();
90///
91/// // In a loop, ask the decoder what it needs next, and provide it with the required data
92/// loop {
93/// match decoder.try_decode().unwrap() {
94/// DecodeResult::NeedsData(ranges) => {
95/// // The decoder needs more data. Fetch the data for the given ranges
96/// let data = ranges.iter().map(|r| get_range(r)).collect::<Vec<_>>();
97/// // Push the data to the decoder
98/// decoder.push_ranges(ranges, data).unwrap();
99/// // After pushing the data, we can try to decode again on the next iteration
100/// }
101/// DecodeResult::Data(batch) => {
102/// // Successfully decoded a batch of data
103/// assert!(batch.num_rows() > 0);
104/// }
105/// DecodeResult::Finished => {
106/// // The decoder has finished decoding exit the loop
107/// break;
108/// }
109/// }
110/// }
111/// ```
112pub type ParquetPushDecoderBuilder = ArrowReaderBuilder<NoInput>;
113
114/// Type that represents "No input" for the [`ParquetPushDecoderBuilder`]
115///
116/// There is no "input" for the push decoder by design (the idea is that
117/// the caller pushes data to the decoder as needed)..
118///
119/// However, [`ArrowReaderBuilder`] is shared with the sync and async readers,
120/// which DO have an `input`. To support reusing the same builder code for
121/// all three types of decoders, we define this `NoInput` for the push decoder to
122/// denote in the type system there is no type.
123#[derive(Debug, Clone, Copy)]
124pub struct NoInput;
125
126/// Methods for building a ParquetDecoder. See the base [`ArrowReaderBuilder`] for
127/// more options that can be configured.
128impl ParquetPushDecoderBuilder {
129 /// Create a new `ParquetDecoderBuilder` for configuring a Parquet decoder for the given file.
130 ///
131 /// See [`ParquetMetadataDecoder`] for a builder that can read the metadata from a Parquet file.
132 ///
133 /// [`ParquetMetadataDecoder`]: crate::file::metadata::ParquetMetaDataPushDecoder
134 ///
135 /// See example on [`ParquetPushDecoderBuilder`]
136 pub fn try_new_decoder(parquet_metadata: Arc<ParquetMetaData>) -> Result<Self, ParquetError> {
137 Self::try_new_decoder_with_options(parquet_metadata, ArrowReaderOptions::default())
138 }
139
140 /// Create a new `ParquetDecoderBuilder` for configuring a Parquet decoder for the given file
141 /// with the given reader options.
142 ///
143 /// This is similar to [`Self::try_new_decoder`] but allows configuring
144 /// options such as Arrow schema
145 pub fn try_new_decoder_with_options(
146 parquet_metadata: Arc<ParquetMetaData>,
147 arrow_reader_options: ArrowReaderOptions,
148 ) -> Result<Self, ParquetError> {
149 let arrow_reader_metadata =
150 ArrowReaderMetadata::try_new(parquet_metadata, arrow_reader_options)?;
151 Ok(Self::new_with_metadata(arrow_reader_metadata))
152 }
153
154 /// Create a new `ParquetDecoderBuilder` given [`ArrowReaderMetadata`].
155 ///
156 /// See [`ArrowReaderMetadata::try_new`] for how to create the metadata from
157 /// the Parquet metadata and reader options.
158 pub fn new_with_metadata(arrow_reader_metadata: ArrowReaderMetadata) -> Self {
159 Self::new_builder(NoInput, arrow_reader_metadata)
160 }
161
162 /// Create a [`ParquetPushDecoder`] with the configured options
163 pub fn build(self) -> Result<ParquetPushDecoder, ParquetError> {
164 let Self {
165 input: NoInput,
166 metadata: parquet_metadata,
167 schema: _,
168 fields,
169 batch_size,
170 row_groups,
171 projection,
172 filter,
173 selection,
174 limit,
175 offset,
176 metrics,
177 row_selection_policy,
178 max_predicate_cache_size,
179 } = self;
180
181 // If no row groups were specified, read all of them
182 let row_groups =
183 row_groups.unwrap_or_else(|| (0..parquet_metadata.num_row_groups()).collect());
184
185 // Prepare to build RowGroup readers
186 let file_len = 0; // not used in push decoder
187 let buffers = PushBuffers::new(file_len);
188 let row_group_reader_builder = RowGroupReaderBuilder::new(
189 batch_size,
190 projection,
191 Arc::clone(&parquet_metadata),
192 fields,
193 filter,
194 limit,
195 offset,
196 metrics,
197 max_predicate_cache_size,
198 buffers,
199 row_selection_policy,
200 );
201
202 // Initialize the decoder with the configured options
203 let remaining_row_groups = RemainingRowGroups::new(
204 parquet_metadata,
205 row_groups,
206 selection,
207 row_group_reader_builder,
208 );
209
210 Ok(ParquetPushDecoder {
211 state: ParquetDecoderState::ReadingRowGroup {
212 remaining_row_groups: Box::new(remaining_row_groups),
213 },
214 })
215 }
216}
217
218/// A push based Parquet Decoder
219///
220/// See [`ParquetPushDecoderBuilder`] for an example of how to build and use the decoder.
221///
222/// [`ParquetPushDecoder`] is a low level API for decoding Parquet data without an
223/// underlying reader for performing IO, and thus offers fine grained control
224/// over how data is fetched and decoded.
225///
226/// When more data is needed to make progress, instead of reading data directly
227/// from a reader, the decoder returns [`DecodeResult`] indicating what ranges
228/// are needed. Once the caller provides the requested ranges via
229/// [`Self::push_ranges`], they try to decode again by calling
230/// [`Self::try_decode`].
231///
232/// The decoder's internal state tracks what has been already decoded and what
233/// is needed next.
234#[derive(Debug)]
235pub struct ParquetPushDecoder {
236 /// The inner state.
237 ///
238 /// This state is consumed on every transition and a new state is produced
239 /// so the Rust compiler can ensure that the state is always valid and
240 /// transitions are not missed.
241 state: ParquetDecoderState,
242}
243
244impl ParquetPushDecoder {
245 /// Attempt to decode the next batch of data, or return what data is needed
246 ///
247 /// The the decoder communicates the next state with a [`DecodeResult`]
248 ///
249 /// See full example in [`ParquetPushDecoderBuilder`]
250 ///
251 /// ```no_run
252 /// # use parquet::arrow::push_decoder::ParquetPushDecoder;
253 /// use parquet::DecodeResult;
254 /// # fn get_decoder() -> ParquetPushDecoder { unimplemented!() }
255 /// # fn push_data(decoder: &mut ParquetPushDecoder, ranges: Vec<std::ops::Range<u64>>) { unimplemented!() }
256 /// let mut decoder = get_decoder();
257 /// loop {
258 /// match decoder.try_decode().unwrap() {
259 /// DecodeResult::NeedsData(ranges) => {
260 /// // The decoder needs more data. Fetch the data for the given ranges
261 /// // call decoder.push_ranges(ranges, data) and call again
262 /// push_data(&mut decoder, ranges);
263 /// }
264 /// DecodeResult::Data(batch) => {
265 /// // Successfully decoded the next batch of data
266 /// println!("Got batch with {} rows", batch.num_rows());
267 /// }
268 /// DecodeResult::Finished => {
269 /// // The decoder has finished decoding all data
270 /// break;
271 /// }
272 /// }
273 /// }
274 ///```
275 pub fn try_decode(&mut self) -> Result<DecodeResult<RecordBatch>, ParquetError> {
276 let current_state = std::mem::replace(&mut self.state, ParquetDecoderState::Finished);
277 let (new_state, decode_result) = current_state.try_next_batch()?;
278 self.state = new_state;
279 Ok(decode_result)
280 }
281
282 /// Return a [`ParquetRecordBatchReader`] that reads the next set of rows, or
283 /// return what data is needed to produce it.
284 ///
285 /// This API can be used to get a reader for decoding the next set of
286 /// RecordBatches while proceeding to begin fetching data for the set (e.g
287 /// row group)
288 ///
289 /// Example
290 /// ```no_run
291 /// # use parquet::arrow::push_decoder::ParquetPushDecoder;
292 /// use parquet::DecodeResult;
293 /// # fn get_decoder() -> ParquetPushDecoder { unimplemented!() }
294 /// # fn push_data(decoder: &mut ParquetPushDecoder, ranges: Vec<std::ops::Range<u64>>) { unimplemented!() }
295 /// let mut decoder = get_decoder();
296 /// loop {
297 /// match decoder.try_next_reader().unwrap() {
298 /// DecodeResult::NeedsData(ranges) => {
299 /// // The decoder needs more data. Fetch the data for the given ranges
300 /// // call decoder.push_ranges(ranges, data) and call again
301 /// push_data(&mut decoder, ranges);
302 /// }
303 /// DecodeResult::Data(reader) => {
304 /// // spawn a thread to read the batches in parallel
305 /// // with fetching the next row group / data
306 /// std::thread::spawn(move || {
307 /// for batch in reader {
308 /// let batch = batch.unwrap();
309 /// println!("Got batch with {} rows", batch.num_rows());
310 /// }
311 /// });
312 /// }
313 /// DecodeResult::Finished => {
314 /// // The decoder has finished decoding all data
315 /// break;
316 /// }
317 /// }
318 /// }
319 ///```
320 pub fn try_next_reader(
321 &mut self,
322 ) -> Result<DecodeResult<ParquetRecordBatchReader>, ParquetError> {
323 let current_state = std::mem::replace(&mut self.state, ParquetDecoderState::Finished);
324 let (new_state, decode_result) = current_state.try_next_reader()?;
325 self.state = new_state;
326 Ok(decode_result)
327 }
328
329 /// Push data into the decoder for processing
330 ///
331 /// This is a convenience wrapper around [`Self::push_ranges`] for pushing a
332 /// single range of data.
333 ///
334 /// Note this can be the entire file or just a part of it. If it is part of the file,
335 /// the ranges should correspond to the data ranges requested by the decoder.
336 ///
337 /// See example in [`ParquetPushDecoderBuilder`]
338 pub fn push_range(&mut self, range: Range<u64>, data: Bytes) -> Result<(), ParquetError> {
339 self.push_ranges(vec![range], vec![data])
340 }
341
342 /// Push data into the decoder for processing
343 ///
344 /// This should correspond to the data ranges requested by the decoder
345 pub fn push_ranges(
346 &mut self,
347 ranges: Vec<Range<u64>>,
348 data: Vec<Bytes>,
349 ) -> Result<(), ParquetError> {
350 let current_state = std::mem::replace(&mut self.state, ParquetDecoderState::Finished);
351 self.state = current_state.push_data(ranges, data)?;
352 Ok(())
353 }
354
355 /// Returns the total number of buffered bytes in the decoder
356 ///
357 /// This is the sum of the size of all [`Bytes`] that has been pushed to the
358 /// decoder but not yet consumed.
359 ///
360 /// Note that this does not include any overhead of the internal data
361 /// structures and that since [`Bytes`] are ref counted memory, this may not
362 /// reflect additional memory usage.
363 ///
364 /// This can be used to monitor memory usage of the decoder.
365 pub fn buffered_bytes(&self) -> u64 {
366 self.state.buffered_bytes()
367 }
368
369 /// Clear any staged byte ranges currently buffered for future decode work.
370 ///
371 /// This clears byte ranges still owned by the decoder's internal
372 /// `PushBuffers`. It does not affect any data that has already been handed
373 /// off to an active [`ParquetRecordBatchReader`].
374 pub fn clear_all_ranges(&mut self) {
375 self.state.clear_all_ranges();
376 }
377}
378
379/// Internal state machine for the [`ParquetPushDecoder`]
380#[derive(Debug)]
381enum ParquetDecoderState {
382 /// Waiting for data needed to decode the next RowGroup
383 ReadingRowGroup {
384 remaining_row_groups: Box<RemainingRowGroups>,
385 },
386 /// The decoder is actively decoding a RowGroup
387 DecodingRowGroup {
388 /// Current active reader
389 record_batch_reader: Box<ParquetRecordBatchReader>,
390 remaining_row_groups: Box<RemainingRowGroups>,
391 },
392 /// The decoder has finished processing all data
393 Finished,
394}
395
396impl ParquetDecoderState {
397 /// If actively reading a RowGroup, return the currently active
398 /// ParquetRecordBatchReader and advance to the next group.
399 fn try_next_reader(
400 self,
401 ) -> Result<(Self, DecodeResult<ParquetRecordBatchReader>), ParquetError> {
402 let mut current_state = self;
403 loop {
404 let (next_state, decode_result) = current_state.transition()?;
405 // if more data is needed to transition, can't proceed further without it
406 match decode_result {
407 DecodeResult::NeedsData(ranges) => {
408 return Ok((next_state, DecodeResult::NeedsData(ranges)));
409 }
410 // act next based on state
411 DecodeResult::Data(()) | DecodeResult::Finished => {}
412 }
413 match next_state {
414 // not ready to read yet, continue transitioning
415 Self::ReadingRowGroup { .. } => current_state = next_state,
416 // have a reader ready, so return it and set ourself to ReadingRowGroup
417 Self::DecodingRowGroup {
418 record_batch_reader,
419 remaining_row_groups,
420 } => {
421 let result = DecodeResult::Data(*record_batch_reader);
422 let next_state = Self::ReadingRowGroup {
423 remaining_row_groups,
424 };
425 return Ok((next_state, result));
426 }
427 Self::Finished => {
428 return Ok((Self::Finished, DecodeResult::Finished));
429 }
430 }
431 }
432 }
433
434 /// Current state --> next state + output
435 ///
436 /// This function is called to get the next RecordBatch
437 ///
438 /// This structure is used to reduce the indentation level of the main loop
439 /// in try_build
440 fn try_next_batch(self) -> Result<(Self, DecodeResult<RecordBatch>), ParquetError> {
441 let mut current_state = self;
442 loop {
443 let (new_state, decode_result) = current_state.transition()?;
444 // if more data is needed to transition, can't proceed further without it
445 match decode_result {
446 DecodeResult::NeedsData(ranges) => {
447 return Ok((new_state, DecodeResult::NeedsData(ranges)));
448 }
449 // act next based on state
450 DecodeResult::Data(()) | DecodeResult::Finished => {}
451 }
452 match new_state {
453 // not ready to read yet, continue transitioning
454 Self::ReadingRowGroup { .. } => current_state = new_state,
455 // have a reader ready, so decode the next batch
456 Self::DecodingRowGroup {
457 mut record_batch_reader,
458 remaining_row_groups,
459 } => {
460 match record_batch_reader.next() {
461 // Successfully decoded a batch, return it
462 Some(Ok(batch)) => {
463 let result = DecodeResult::Data(batch);
464 let next_state = Self::DecodingRowGroup {
465 record_batch_reader,
466 remaining_row_groups,
467 };
468 return Ok((next_state, result));
469 }
470 // No more batches in this row group, move to the next row group
471 None => {
472 current_state = Self::ReadingRowGroup {
473 remaining_row_groups,
474 }
475 }
476 // some error occurred while decoding, so return that
477 Some(Err(e)) => {
478 // TODO: preserve ArrowError in ParquetError (rather than convert to a string)
479 return Err(ParquetError::ArrowError(e.to_string()));
480 }
481 }
482 }
483 Self::Finished => {
484 return Ok((Self::Finished, DecodeResult::Finished));
485 }
486 }
487 }
488 }
489
490 /// Transition to the next state with a reader (data can be produced), if not end of stream
491 ///
492 /// This function is called in a loop until the decoder is ready to return
493 /// data (has the required pages buffered) or is finished.
494 fn transition(self) -> Result<(Self, DecodeResult<()>), ParquetError> {
495 // result returned when there is data ready
496 let data_ready = DecodeResult::Data(());
497 match self {
498 Self::ReadingRowGroup {
499 mut remaining_row_groups,
500 } => {
501 match remaining_row_groups.try_next_reader()? {
502 // If we have a next reader, we can transition to decoding it
503 DecodeResult::Data(record_batch_reader) => {
504 // Transition to decoding the row group
505 Ok((
506 Self::DecodingRowGroup {
507 record_batch_reader: Box::new(record_batch_reader),
508 remaining_row_groups,
509 },
510 data_ready,
511 ))
512 }
513 DecodeResult::NeedsData(ranges) => {
514 // If we need more data, we return the ranges needed and stay in Reading
515 // RowGroup state
516 Ok((
517 Self::ReadingRowGroup {
518 remaining_row_groups,
519 },
520 DecodeResult::NeedsData(ranges),
521 ))
522 }
523 // If there are no more readers, we are finished
524 DecodeResult::Finished => {
525 // No more row groups to read, we are finished
526 Ok((Self::Finished, DecodeResult::Finished))
527 }
528 }
529 }
530 // if we are already in DecodingRowGroup, just return data ready
531 Self::DecodingRowGroup { .. } => Ok((self, data_ready)),
532 // if finished, just return finished
533 Self::Finished => Ok((self, DecodeResult::Finished)),
534 }
535 }
536
537 /// Push data, and transition state if needed
538 ///
539 /// This should correspond to the data ranges requested by the decoder
540 pub fn push_data(
541 self,
542 ranges: Vec<Range<u64>>,
543 data: Vec<Bytes>,
544 ) -> Result<Self, ParquetError> {
545 match self {
546 ParquetDecoderState::ReadingRowGroup {
547 mut remaining_row_groups,
548 } => {
549 // Push data to the RowGroupReaderBuilder
550 remaining_row_groups.push_data(ranges, data);
551 Ok(ParquetDecoderState::ReadingRowGroup {
552 remaining_row_groups,
553 })
554 }
555 // it is ok to get data before we asked for it
556 ParquetDecoderState::DecodingRowGroup {
557 record_batch_reader,
558 mut remaining_row_groups,
559 } => {
560 remaining_row_groups.push_data(ranges, data);
561 Ok(ParquetDecoderState::DecodingRowGroup {
562 record_batch_reader,
563 remaining_row_groups,
564 })
565 }
566 ParquetDecoderState::Finished => Err(ParquetError::General(
567 "Cannot push data to a finished decoder".to_string(),
568 )),
569 }
570 }
571
572 /// How many bytes are currently buffered in the decoder?
573 fn buffered_bytes(&self) -> u64 {
574 match self {
575 ParquetDecoderState::ReadingRowGroup {
576 remaining_row_groups,
577 } => remaining_row_groups.buffered_bytes(),
578 ParquetDecoderState::DecodingRowGroup {
579 record_batch_reader: _,
580 remaining_row_groups,
581 } => remaining_row_groups.buffered_bytes(),
582 ParquetDecoderState::Finished => 0,
583 }
584 }
585
586 /// Clear any staged ranges currently buffered in the decoder.
587 fn clear_all_ranges(&mut self) {
588 match self {
589 ParquetDecoderState::ReadingRowGroup {
590 remaining_row_groups,
591 } => remaining_row_groups.clear_all_ranges(),
592 ParquetDecoderState::DecodingRowGroup {
593 record_batch_reader: _,
594 remaining_row_groups,
595 } => remaining_row_groups.clear_all_ranges(),
596 ParquetDecoderState::Finished => {}
597 }
598 }
599}
600
601#[cfg(test)]
602mod test {
603 use super::*;
604 use crate::DecodeResult;
605 use crate::arrow::arrow_reader::{ArrowPredicateFn, RowFilter, RowSelection, RowSelector};
606 use crate::arrow::push_decoder::{ParquetPushDecoder, ParquetPushDecoderBuilder};
607 use crate::arrow::{ArrowWriter, ProjectionMask};
608 use crate::errors::ParquetError;
609 use crate::file::metadata::ParquetMetaDataPushDecoder;
610 use crate::file::properties::WriterProperties;
611 use arrow::compute::kernels::cmp::{gt, lt};
612 use arrow_array::cast::AsArray;
613 use arrow_array::types::Int64Type;
614 use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringViewArray};
615 use arrow_select::concat::concat_batches;
616 use bytes::Bytes;
617 use std::fmt::Debug;
618 use std::ops::Range;
619 use std::sync::{Arc, LazyLock};
620
621 /// Test decoder struct size (as they are copied around on each transition, they
622 /// should not grow too large)
623 #[test]
624 fn test_decoder_size() {
625 assert_eq!(std::mem::size_of::<ParquetDecoderState>(), 24);
626 }
627
628 /// Decode the entire file at once, simulating a scenario where all data is
629 /// available in memory
630 #[test]
631 fn test_decoder_all_data() {
632 let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
633 .unwrap()
634 .build()
635 .unwrap();
636
637 decoder
638 .push_range(test_file_range(), TEST_FILE_DATA.clone())
639 .unwrap();
640
641 let results = vec![
642 // first row group should be decoded without needing more data
643 expect_data(decoder.try_decode()),
644 // second row group should be decoded without needing more data
645 expect_data(decoder.try_decode()),
646 ];
647 expect_finished(decoder.try_decode());
648
649 let all_output = concat_batches(&TEST_BATCH.schema(), &results).unwrap();
650 // Check that the output matches the input batch
651 assert_eq!(all_output, *TEST_BATCH);
652 }
653
654 /// Decode the entire file incrementally, simulating a scenario where data is
655 /// fetched as needed
656 #[test]
657 fn test_decoder_incremental() {
658 let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
659 .unwrap()
660 .build()
661 .unwrap();
662
663 let mut results = vec![];
664
665 // First row group, expect a single request
666 let ranges = expect_needs_data(decoder.try_decode());
667 let num_bytes_requested: u64 = ranges.iter().map(|r| r.end - r.start).sum();
668 push_ranges_to_decoder(&mut decoder, ranges);
669 // The decoder should currently only store the data it needs to decode the first row group
670 assert_eq!(decoder.buffered_bytes(), num_bytes_requested);
671 results.push(expect_data(decoder.try_decode()));
672 // the decoder should have consumed the data for the first row group and freed it
673 assert_eq!(decoder.buffered_bytes(), 0);
674
675 // Second row group,
676 let ranges = expect_needs_data(decoder.try_decode());
677 let num_bytes_requested: u64 = ranges.iter().map(|r| r.end - r.start).sum();
678 push_ranges_to_decoder(&mut decoder, ranges);
679 // The decoder should currently only store the data it needs to decode the second row group
680 assert_eq!(decoder.buffered_bytes(), num_bytes_requested);
681 results.push(expect_data(decoder.try_decode()));
682 // the decoder should have consumed the data for the second row group and freed it
683 assert_eq!(decoder.buffered_bytes(), 0);
684 expect_finished(decoder.try_decode());
685
686 // Check that the output matches the input batch
687 let all_output = concat_batches(&TEST_BATCH.schema(), &results).unwrap();
688 assert_eq!(all_output, *TEST_BATCH);
689 }
690
691 /// Releasing staged ranges should free speculative buffers without affecting
692 /// the active row group reader.
693 #[test]
694 fn test_decoder_clear_all_ranges() {
695 let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
696 .unwrap()
697 .with_batch_size(100)
698 .build()
699 .unwrap();
700
701 decoder
702 .push_range(test_file_range(), TEST_FILE_DATA.clone())
703 .unwrap();
704 assert_eq!(decoder.buffered_bytes(), test_file_len());
705
706 // The current row group reader is built from the prefetched bytes, but
707 // the speculative full-file range remains staged in the decoder.
708 let batch1 = expect_data(decoder.try_decode());
709 assert_eq!(batch1, TEST_BATCH.slice(0, 100));
710 assert_eq!(decoder.buffered_bytes(), test_file_len());
711
712 // All of the buffer is released
713 decoder.clear_all_ranges();
714 assert_eq!(decoder.buffered_bytes(), 0);
715
716 // The active reader still owns the current row group's bytes, so it can
717 // continue decoding without consulting PushBuffers.
718 let batch2 = expect_data(decoder.try_decode());
719 assert_eq!(batch2, TEST_BATCH.slice(100, 100));
720 assert_eq!(decoder.buffered_bytes(), 0);
721
722 // Moving to the next row group now requires the decoder to ask for data
723 // again because the staged speculative ranges were released.
724 let ranges = expect_needs_data(decoder.try_decode());
725 let num_bytes_requested: u64 = ranges.iter().map(|r| r.end - r.start).sum();
726 push_ranges_to_decoder(&mut decoder, ranges);
727 assert_eq!(decoder.buffered_bytes(), num_bytes_requested);
728
729 let batch3 = expect_data(decoder.try_decode());
730 assert_eq!(batch3, TEST_BATCH.slice(200, 100));
731 assert_eq!(decoder.buffered_bytes(), 0);
732
733 let batch4 = expect_data(decoder.try_decode());
734 assert_eq!(batch4, TEST_BATCH.slice(300, 100));
735 assert_eq!(decoder.buffered_bytes(), 0);
736
737 expect_finished(decoder.try_decode());
738 }
739
740 /// Decode the entire file incrementally, simulating partial reads
741 #[test]
742 fn test_decoder_partial() {
743 let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
744 .unwrap()
745 .build()
746 .unwrap();
747
748 // First row group, expect a single request for all data needed to read "a" and "b"
749 let ranges = expect_needs_data(decoder.try_decode());
750 push_ranges_to_decoder(&mut decoder, ranges);
751
752 let batch1 = expect_data(decoder.try_decode());
753 let expected1 = TEST_BATCH.slice(0, 200);
754 assert_eq!(batch1, expected1);
755
756 // Second row group, this time provide the data in two steps
757 let ranges = expect_needs_data(decoder.try_decode());
758 let (ranges1, ranges2) = ranges.split_at(ranges.len() / 2);
759 assert!(!ranges1.is_empty());
760 assert!(!ranges2.is_empty());
761 // push first half to simulate partial read
762 push_ranges_to_decoder(&mut decoder, ranges1.to_vec());
763
764 // still expect more data
765 let ranges = expect_needs_data(decoder.try_decode());
766 assert_eq!(ranges, ranges2); // should be the remaining ranges
767 // push empty ranges should be a no-op
768 push_ranges_to_decoder(&mut decoder, vec![]);
769 let ranges = expect_needs_data(decoder.try_decode());
770 assert_eq!(ranges, ranges2); // should be the remaining ranges
771 push_ranges_to_decoder(&mut decoder, ranges);
772
773 let batch2 = expect_data(decoder.try_decode());
774 let expected2 = TEST_BATCH.slice(200, 200);
775 assert_eq!(batch2, expected2);
776
777 expect_finished(decoder.try_decode());
778 }
779
780 /// Decode multiple columns "a" and "b", expect that the decoder requests
781 /// only a single request per row group
782 #[test]
783 fn test_decoder_selection_does_one_request() {
784 let builder =
785 ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
786
787 let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
788
789 let mut decoder = builder
790 .with_projection(
791 ProjectionMask::columns(&schema_descr, ["a", "b"]), // read "a", "b"
792 )
793 .build()
794 .unwrap();
795
796 // First row group, expect a single request for all data needed to read "a" and "b"
797 let ranges = expect_needs_data(decoder.try_decode());
798 push_ranges_to_decoder(&mut decoder, ranges);
799
800 let batch1 = expect_data(decoder.try_decode());
801 let expected1 = TEST_BATCH.slice(0, 200).project(&[0, 1]).unwrap();
802 assert_eq!(batch1, expected1);
803
804 // Second row group, similarly expect a single request for all data needed to read "a" and "b"
805 let ranges = expect_needs_data(decoder.try_decode());
806 push_ranges_to_decoder(&mut decoder, ranges);
807
808 let batch2 = expect_data(decoder.try_decode());
809 let expected2 = TEST_BATCH.slice(200, 200).project(&[0, 1]).unwrap();
810 assert_eq!(batch2, expected2);
811
812 expect_finished(decoder.try_decode());
813 }
814
815 /// Decode with a filter that requires multiple requests, but only provide part
816 /// of the data needed for the filter at a time simulating partial reads.
817 #[test]
818 fn test_decoder_single_filter_partial() {
819 let builder =
820 ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
821
822 // Values in column "a" range 0..399
823 // First filter: "a" > 250 (nothing in Row Group 0, both data pages in Row Group 1)
824 let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
825
826 // a > 250
827 let row_filter_a = ArrowPredicateFn::new(
828 // claim to use both a and b so we get two ranges requests for the filter pages
829 ProjectionMask::columns(&schema_descr, ["a", "b"]),
830 |batch: RecordBatch| {
831 let scalar_250 = Int64Array::new_scalar(250);
832 let column = batch.column(0).as_primitive::<Int64Type>();
833 gt(column, &scalar_250)
834 },
835 );
836
837 let mut decoder = builder
838 .with_projection(
839 // read only column "a" to test that filter pages are reused
840 ProjectionMask::columns(&schema_descr, ["a"]), // read "a"
841 )
842 .with_row_filter(RowFilter::new(vec![Box::new(row_filter_a)]))
843 .build()
844 .unwrap();
845
846 // First row group, evaluating filters
847 let ranges = expect_needs_data(decoder.try_decode());
848 // only provide half the ranges
849 let (ranges1, ranges2) = ranges.split_at(ranges.len() / 2);
850 assert!(!ranges1.is_empty());
851 assert!(!ranges2.is_empty());
852 push_ranges_to_decoder(&mut decoder, ranges1.to_vec());
853 // still expect more data
854 let ranges = expect_needs_data(decoder.try_decode());
855 assert_eq!(ranges, ranges2); // should be the remaining ranges
856 let ranges = expect_needs_data(decoder.try_decode());
857 assert_eq!(ranges, ranges2); // should be the remaining ranges
858 push_ranges_to_decoder(&mut decoder, ranges2.to_vec());
859
860 // Since no rows in the first row group pass the filters, there is no
861 // additional requests to read data pages for "b" here
862
863 // Second row group
864 let ranges = expect_needs_data(decoder.try_decode());
865 push_ranges_to_decoder(&mut decoder, ranges);
866
867 let batch = expect_data(decoder.try_decode());
868 let expected = TEST_BATCH.slice(251, 149).project(&[0]).unwrap();
869 assert_eq!(batch, expected);
870
871 expect_finished(decoder.try_decode());
872 }
873
874 /// Decode with a filter where we also skip one of the RowGroups via a RowSelection
875 #[test]
876 fn test_decoder_single_filter_and_row_selection() {
877 let builder =
878 ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
879
880 // Values in column "a" range 0..399
881 // First filter: "a" > 250 (nothing in Row Group 0, last data page in Row Group 1)
882 let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
883
884 // a > 250
885 let row_filter_a = ArrowPredicateFn::new(
886 ProjectionMask::columns(&schema_descr, ["a"]),
887 |batch: RecordBatch| {
888 let scalar_250 = Int64Array::new_scalar(250);
889 let column = batch.column(0).as_primitive::<Int64Type>();
890 gt(column, &scalar_250)
891 },
892 );
893
894 let mut decoder = builder
895 .with_projection(
896 // read only column "a" to test that filter pages are reused
897 ProjectionMask::columns(&schema_descr, ["b"]), // read "b"
898 )
899 .with_row_filter(RowFilter::new(vec![Box::new(row_filter_a)]))
900 .with_row_selection(RowSelection::from(vec![
901 RowSelector::skip(200), // skip first row group
902 RowSelector::select(100), // first 100 rows of second row group
903 RowSelector::skip(100),
904 ]))
905 .build()
906 .unwrap();
907
908 // expect the first row group to be filtered out (no filter is evaluated due to row selection)
909
910 // First row group, first filter (a > 250)
911 let ranges = expect_needs_data(decoder.try_decode());
912 push_ranges_to_decoder(&mut decoder, ranges);
913
914 // Second row group
915 let ranges = expect_needs_data(decoder.try_decode());
916 push_ranges_to_decoder(&mut decoder, ranges);
917
918 let batch = expect_data(decoder.try_decode());
919 let expected = TEST_BATCH.slice(251, 49).project(&[1]).unwrap();
920 assert_eq!(batch, expected);
921
922 expect_finished(decoder.try_decode());
923 }
924
925 /// Decode with multiple filters that require multiple requests
926 #[test]
927 fn test_decoder_multi_filters() {
928 // Create a decoder for decoding parquet data (note it does not have any IO / readers)
929 let builder =
930 ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
931
932 // Values in column "a" range 0..399
933 // Values in column "b" range 400..799
934 // First filter: "a" > 175 (last data page in Row Group 0)
935 // Second filter: "b" < 625 (last data page in Row Group 0 and first DataPage in RowGroup 1)
936 let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
937
938 // a > 175
939 let row_filter_a = ArrowPredicateFn::new(
940 ProjectionMask::columns(&schema_descr, ["a"]),
941 |batch: RecordBatch| {
942 let scalar_175 = Int64Array::new_scalar(175);
943 let column = batch.column(0).as_primitive::<Int64Type>();
944 gt(column, &scalar_175)
945 },
946 );
947
948 // b < 625
949 let row_filter_b = ArrowPredicateFn::new(
950 ProjectionMask::columns(&schema_descr, ["b"]),
951 |batch: RecordBatch| {
952 let scalar_625 = Int64Array::new_scalar(625);
953 let column = batch.column(0).as_primitive::<Int64Type>();
954 lt(column, &scalar_625)
955 },
956 );
957
958 let mut decoder = builder
959 .with_projection(
960 ProjectionMask::columns(&schema_descr, ["c"]), // read "c"
961 )
962 .with_row_filter(RowFilter::new(vec![
963 Box::new(row_filter_a),
964 Box::new(row_filter_b),
965 ]))
966 .build()
967 .unwrap();
968
969 // First row group, first filter (a > 175)
970 let ranges = expect_needs_data(decoder.try_decode());
971 push_ranges_to_decoder(&mut decoder, ranges);
972
973 // first row group, second filter (b < 625)
974 let ranges = expect_needs_data(decoder.try_decode());
975 push_ranges_to_decoder(&mut decoder, ranges);
976
977 // first row group, data pages for "c"
978 let ranges = expect_needs_data(decoder.try_decode());
979 push_ranges_to_decoder(&mut decoder, ranges);
980
981 // expect the first batch to be decoded: rows 176..199, column "c"
982 let batch1 = expect_data(decoder.try_decode());
983 let expected1 = TEST_BATCH.slice(176, 24).project(&[2]).unwrap();
984 assert_eq!(batch1, expected1);
985
986 // Second row group, first filter (a > 175)
987 let ranges = expect_needs_data(decoder.try_decode());
988 push_ranges_to_decoder(&mut decoder, ranges);
989
990 // Second row group, second filter (b < 625)
991 let ranges = expect_needs_data(decoder.try_decode());
992 push_ranges_to_decoder(&mut decoder, ranges);
993
994 // Second row group, data pages for "c"
995 let ranges = expect_needs_data(decoder.try_decode());
996 push_ranges_to_decoder(&mut decoder, ranges);
997
998 // expect the second batch to be decoded: rows 200..224, column "c"
999 let batch2 = expect_data(decoder.try_decode());
1000 let expected2 = TEST_BATCH.slice(200, 25).project(&[2]).unwrap();
1001 assert_eq!(batch2, expected2);
1002
1003 expect_finished(decoder.try_decode());
1004 }
1005
1006 /// Decode with a filter that uses a column that is also projected, and expect
1007 /// that the filter pages are reused (don't refetch them)
1008 #[test]
1009 fn test_decoder_reuses_filter_pages() {
1010 // Create a decoder for decoding parquet data (note it does not have any IO / readers)
1011 let builder =
1012 ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
1013
1014 // Values in column "a" range 0..399
1015 // First filter: "a" > 250 (nothing in Row Group 0, last data page in Row Group 1)
1016 let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
1017
1018 // a > 250
1019 let row_filter_a = ArrowPredicateFn::new(
1020 ProjectionMask::columns(&schema_descr, ["a"]),
1021 |batch: RecordBatch| {
1022 let scalar_250 = Int64Array::new_scalar(250);
1023 let column = batch.column(0).as_primitive::<Int64Type>();
1024 gt(column, &scalar_250)
1025 },
1026 );
1027
1028 let mut decoder = builder
1029 .with_projection(
1030 // read only column "a" to test that filter pages are reused
1031 ProjectionMask::columns(&schema_descr, ["a"]), // read "a"
1032 )
1033 .with_row_filter(RowFilter::new(vec![Box::new(row_filter_a)]))
1034 .build()
1035 .unwrap();
1036
1037 // First row group, first filter (a > 175)
1038 let ranges = expect_needs_data(decoder.try_decode());
1039 push_ranges_to_decoder(&mut decoder, ranges);
1040
1041 // expect the first row group to be filtered out (no rows match)
1042
1043 // Second row group, first filter (a > 250)
1044 let ranges = expect_needs_data(decoder.try_decode());
1045 push_ranges_to_decoder(&mut decoder, ranges);
1046
1047 // expect that the second row group is decoded: rows 251..399, column "a"
1048 // Note that the filter pages for "a" should be reused and no additional data
1049 // should be requested
1050 let batch = expect_data(decoder.try_decode());
1051 let expected = TEST_BATCH.slice(251, 149).project(&[0]).unwrap();
1052 assert_eq!(batch, expected);
1053
1054 expect_finished(decoder.try_decode());
1055 }
1056
1057 #[test]
1058 fn test_decoder_empty_filters() {
1059 let builder =
1060 ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
1061 let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
1062
1063 // only read column "c", but with empty filters
1064 let mut decoder = builder
1065 .with_projection(
1066 ProjectionMask::columns(&schema_descr, ["c"]), // read "c"
1067 )
1068 .with_row_filter(RowFilter::new(vec![
1069 // empty filters should be ignored
1070 ]))
1071 .build()
1072 .unwrap();
1073
1074 // First row group
1075 let ranges = expect_needs_data(decoder.try_decode());
1076 push_ranges_to_decoder(&mut decoder, ranges);
1077
1078 // expect the first batch to be decoded: rows 0..199, column "c"
1079 let batch1 = expect_data(decoder.try_decode());
1080 let expected1 = TEST_BATCH.slice(0, 200).project(&[2]).unwrap();
1081 assert_eq!(batch1, expected1);
1082
1083 // Second row group,
1084 let ranges = expect_needs_data(decoder.try_decode());
1085 push_ranges_to_decoder(&mut decoder, ranges);
1086
1087 // expect the second batch to be decoded: rows 200..399, column "c"
1088 let batch2 = expect_data(decoder.try_decode());
1089 let expected2 = TEST_BATCH.slice(200, 200).project(&[2]).unwrap();
1090
1091 assert_eq!(batch2, expected2);
1092
1093 expect_finished(decoder.try_decode());
1094 }
1095
1096 #[test]
1097 fn test_decoder_offset_limit() {
1098 let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
1099 .unwrap()
1100 // skip entire first row group (200 rows) and first 25 rows of second row group
1101 .with_offset(225)
1102 // and limit to 20 rows
1103 .with_limit(20)
1104 .build()
1105 .unwrap();
1106
1107 // First row group should be skipped,
1108
1109 // Second row group
1110 let ranges = expect_needs_data(decoder.try_decode());
1111 push_ranges_to_decoder(&mut decoder, ranges);
1112
1113 // expect the first and only batch to be decoded
1114 let batch1 = expect_data(decoder.try_decode());
1115 let expected1 = TEST_BATCH.slice(225, 20);
1116 assert_eq!(batch1, expected1);
1117
1118 expect_finished(decoder.try_decode());
1119 }
1120
1121 #[test]
1122 fn test_decoder_row_group_selection() {
1123 // take only the second row group
1124 let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
1125 .unwrap()
1126 .with_row_groups(vec![1])
1127 .build()
1128 .unwrap();
1129
1130 // First row group should be skipped,
1131
1132 // Second row group
1133 let ranges = expect_needs_data(decoder.try_decode());
1134 push_ranges_to_decoder(&mut decoder, ranges);
1135
1136 // expect the first and only batch to be decoded
1137 let batch1 = expect_data(decoder.try_decode());
1138 let expected1 = TEST_BATCH.slice(200, 200);
1139 assert_eq!(batch1, expected1);
1140
1141 expect_finished(decoder.try_decode());
1142 }
1143
1144 #[test]
1145 fn test_decoder_row_selection() {
1146 // take only the second row group
1147 let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
1148 .unwrap()
1149 .with_row_selection(RowSelection::from(vec![
1150 RowSelector::skip(225), // skip first row group and 25 rows of second])
1151 RowSelector::select(20), // take 20 rows
1152 ]))
1153 .build()
1154 .unwrap();
1155
1156 // First row group should be skipped,
1157
1158 // Second row group
1159 let ranges = expect_needs_data(decoder.try_decode());
1160 push_ranges_to_decoder(&mut decoder, ranges);
1161
1162 // expect the first ane only batch to be decoded
1163 let batch1 = expect_data(decoder.try_decode());
1164 let expected1 = TEST_BATCH.slice(225, 20);
1165 assert_eq!(batch1, expected1);
1166
1167 expect_finished(decoder.try_decode());
1168 }
1169
1170 /// Returns a batch with 400 rows, with 3 columns: "a", "b", "c"
1171 ///
1172 /// Note c is a different types (so the data page sizes will be different)
1173 static TEST_BATCH: LazyLock<RecordBatch> = LazyLock::new(|| {
1174 let a: ArrayRef = Arc::new(Int64Array::from_iter_values(0..400));
1175 let b: ArrayRef = Arc::new(Int64Array::from_iter_values(400..800));
1176 let c: ArrayRef = Arc::new(StringViewArray::from_iter_values((0..400).map(|i| {
1177 if i % 2 == 0 {
1178 format!("string_{i}")
1179 } else {
1180 format!("A string larger than 12 bytes and thus not inlined {i}")
1181 }
1182 })));
1183
1184 RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap()
1185 });
1186
1187 /// Create a parquet file in memory for testing.
1188 ///
1189 /// See [`TEST_BATCH`] for the data in the file.
1190 ///
1191 /// Each column is written in 4 data pages, each with 100 rows, across 2
1192 /// row groups. Each column in each row group has two data pages.
1193 ///
1194 /// The data is split across row groups like this
1195 ///
1196 /// Column | Values | Data Page | Row Group
1197 /// -------|------------------------|-----------|-----------
1198 /// a | 0..99 | 1 | 0
1199 /// a | 100..199 | 2 | 0
1200 /// a | 200..299 | 1 | 1
1201 /// a | 300..399 | 2 | 1
1202 ///
1203 /// b | 400..499 | 1 | 0
1204 /// b | 500..599 | 2 | 0
1205 /// b | 600..699 | 1 | 1
1206 /// b | 700..799 | 2 | 1
1207 ///
1208 /// c | "string_0".."string_99" | 1 | 0
1209 /// c | "string_100".."string_199" | 2 | 0
1210 /// c | "string_200".."string_299" | 1 | 1
1211 /// c | "string_300".."string_399" | 2 | 1
1212 static TEST_FILE_DATA: LazyLock<Bytes> = LazyLock::new(|| {
1213 let input_batch = &TEST_BATCH;
1214 let mut output = Vec::new();
1215
1216 let writer_options = WriterProperties::builder()
1217 .set_max_row_group_row_count(Some(200))
1218 .set_data_page_row_count_limit(100)
1219 .build();
1220 let mut writer =
1221 ArrowWriter::try_new(&mut output, input_batch.schema(), Some(writer_options)).unwrap();
1222
1223 // since the limits are only enforced on batch boundaries, write the input
1224 // batch in chunks of 50
1225 let mut row_remain = input_batch.num_rows();
1226 while row_remain > 0 {
1227 let chunk_size = row_remain.min(50);
1228 let chunk = input_batch.slice(input_batch.num_rows() - row_remain, chunk_size);
1229 writer.write(&chunk).unwrap();
1230 row_remain -= chunk_size;
1231 }
1232 writer.close().unwrap();
1233 Bytes::from(output)
1234 });
1235
1236 /// Return the length of [`TEST_FILE_DATA`], in bytes
1237 fn test_file_len() -> u64 {
1238 TEST_FILE_DATA.len() as u64
1239 }
1240
1241 /// Return a range that covers the entire [`TEST_FILE_DATA`]
1242 fn test_file_range() -> Range<u64> {
1243 0..test_file_len()
1244 }
1245
1246 /// Return a slice of the test file data from the given range
1247 pub fn test_file_slice(range: Range<u64>) -> Bytes {
1248 let start: usize = range.start.try_into().unwrap();
1249 let end: usize = range.end.try_into().unwrap();
1250 TEST_FILE_DATA.slice(start..end)
1251 }
1252
1253 /// return the metadata for the test file
1254 pub fn test_file_parquet_metadata() -> Arc<crate::file::metadata::ParquetMetaData> {
1255 let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(test_file_len()).unwrap();
1256 push_ranges_to_metadata_decoder(&mut metadata_decoder, vec![test_file_range()]);
1257 let metadata = metadata_decoder.try_decode().unwrap();
1258 let DecodeResult::Data(metadata) = metadata else {
1259 panic!("Expected metadata to be decoded successfully");
1260 };
1261 Arc::new(metadata)
1262 }
1263
1264 /// Push the given ranges to the metadata decoder, simulating reading from a file
1265 fn push_ranges_to_metadata_decoder(
1266 metadata_decoder: &mut ParquetMetaDataPushDecoder,
1267 ranges: Vec<Range<u64>>,
1268 ) {
1269 let data = ranges
1270 .iter()
1271 .map(|range| test_file_slice(range.clone()))
1272 .collect::<Vec<_>>();
1273 metadata_decoder.push_ranges(ranges, data).unwrap();
1274 }
1275
1276 fn push_ranges_to_decoder(decoder: &mut ParquetPushDecoder, ranges: Vec<Range<u64>>) {
1277 let data = ranges
1278 .iter()
1279 .map(|range| test_file_slice(range.clone()))
1280 .collect::<Vec<_>>();
1281 decoder.push_ranges(ranges, data).unwrap();
1282 }
1283
1284 /// Expect that the [`DecodeResult`] is a [`DecodeResult::Data`] and return the corresponding element
1285 fn expect_data<T: Debug>(result: Result<DecodeResult<T>, ParquetError>) -> T {
1286 match result.expect("Expected Ok(DecodeResult::Data(T))") {
1287 DecodeResult::Data(data) => data,
1288 result => panic!("Expected DecodeResult::Data, got {result:?}"),
1289 }
1290 }
1291
1292 /// Expect that the [`DecodeResult`] is a [`DecodeResult::NeedsData`] and return the corresponding ranges
1293 fn expect_needs_data<T: Debug>(
1294 result: Result<DecodeResult<T>, ParquetError>,
1295 ) -> Vec<Range<u64>> {
1296 match result.expect("Expected Ok(DecodeResult::NeedsData{ranges})") {
1297 DecodeResult::NeedsData(ranges) => ranges,
1298 result => panic!("Expected DecodeResult::NeedsData, got {result:?}"),
1299 }
1300 }
1301
1302 fn expect_finished<T: Debug>(result: Result<DecodeResult<T>, ParquetError>) {
1303 match result.expect("Expected Ok(DecodeResult::Finished)") {
1304 DecodeResult::Finished => {}
1305 result => panic!("Expected DecodeResult::Finished, got {result:?}"),
1306 }
1307 }
1308}