pub struct ParquetPushDecoder {
state: ParquetDecoderState,
}Expand description
A push based Parquet Decoder
See ParquetPushDecoderBuilder for an example of how to build and use the decoder.
ParquetPushDecoder is a low level API for decoding Parquet data without an
underlying reader for performing IO, and thus offers fine grained control
over how data is fetched and decoded.
When more data is needed to make progress, instead of reading data directly
from a reader, the decoder returns DecodeResult indicating what ranges
are needed. Once the caller provides the requested ranges via
Self::push_ranges, they try to decode again by calling
Self::try_decode.
The decoder’s internal state tracks what has been already decoded and what is needed next.
Fields§
§state: ParquetDecoderStateThe inner state.
This state is consumed on every transition and a new state is produced so the Rust compiler can ensure that the state is always valid and transitions are not missed.
Implementations§
Source§impl ParquetPushDecoder
impl ParquetPushDecoder
Sourcepub fn try_decode(&mut self) -> Result<DecodeResult<RecordBatch>, ParquetError>
pub fn try_decode(&mut self) -> Result<DecodeResult<RecordBatch>, ParquetError>
Attempt to decode the next batch of data, or return what data is needed
The the decoder communicates the next state with a DecodeResult
See full example in ParquetPushDecoderBuilder
use parquet::DecodeResult;
let mut decoder = get_decoder();
loop {
match decoder.try_decode().unwrap() {
DecodeResult::NeedsData(ranges) => {
// The decoder needs more data. Fetch the data for the given ranges
// call decoder.push_ranges(ranges, data) and call again
push_data(&mut decoder, ranges);
}
DecodeResult::Data(batch) => {
// Successfully decoded the next batch of data
println!("Got batch with {} rows", batch.num_rows());
}
DecodeResult::Finished => {
// The decoder has finished decoding all data
break;
}
}
}Sourcepub fn try_next_reader(
&mut self,
) -> Result<DecodeResult<ParquetRecordBatchReader>, ParquetError>
pub fn try_next_reader( &mut self, ) -> Result<DecodeResult<ParquetRecordBatchReader>, ParquetError>
Return a ParquetRecordBatchReader that reads the next set of rows, or
return what data is needed to produce it.
This API can be used to get a reader for decoding the next set of RecordBatches while proceeding to begin fetching data for the set (e.g row group)
Example
use parquet::DecodeResult;
let mut decoder = get_decoder();
loop {
match decoder.try_next_reader().unwrap() {
DecodeResult::NeedsData(ranges) => {
// The decoder needs more data. Fetch the data for the given ranges
// call decoder.push_ranges(ranges, data) and call again
push_data(&mut decoder, ranges);
}
DecodeResult::Data(reader) => {
// spawn a thread to read the batches in parallel
// with fetching the next row group / data
std::thread::spawn(move || {
for batch in reader {
let batch = batch.unwrap();
println!("Got batch with {} rows", batch.num_rows());
}
});
}
DecodeResult::Finished => {
// The decoder has finished decoding all data
break;
}
}
}Sourcepub fn push_range(
&mut self,
range: Range<u64>,
data: Bytes,
) -> Result<(), ParquetError>
pub fn push_range( &mut self, range: Range<u64>, data: Bytes, ) -> Result<(), ParquetError>
Push data into the decoder for processing
This is a convenience wrapper around Self::push_ranges for pushing a
single range of data.
Note this can be the entire file or just a part of it. If it is part of the file, the ranges should correspond to the data ranges requested by the decoder.
See example in ParquetPushDecoderBuilder
Sourcepub fn push_ranges(
&mut self,
ranges: Vec<Range<u64>>,
data: Vec<Bytes>,
) -> Result<(), ParquetError>
pub fn push_ranges( &mut self, ranges: Vec<Range<u64>>, data: Vec<Bytes>, ) -> Result<(), ParquetError>
Push data into the decoder for processing
This should correspond to the data ranges requested by the decoder
Sourcepub fn buffered_bytes(&self) -> u64
pub fn buffered_bytes(&self) -> u64
Returns the total number of buffered bytes in the decoder
This is the sum of the size of all [Bytes] that has been pushed to the
decoder but not yet consumed.
Note that this does not include any overhead of the internal data
structures and that since [Bytes] are ref counted memory, this may not
reflect additional memory usage.
This can be used to monitor memory usage of the decoder.
Sourcepub fn clear_all_ranges(&mut self)
pub fn clear_all_ranges(&mut self)
Clear any staged byte ranges currently buffered for future decode work.
This clears byte ranges still owned by the decoder’s internal
PushBuffers. It does not affect any data that has already been handed
off to an active ParquetRecordBatchReader.
Sourcepub fn is_at_row_group_boundary(&self) -> bool
pub fn is_at_row_group_boundary(&self) -> bool
True iff the decoder is at a row-group boundary, where
Self::into_builder can reconfigure the scan.
A boundary is “between row groups”: the previous row group’s
ParquetRecordBatchReader has been fully extracted (via
Self::try_next_reader) or fully drained (via Self::try_decode),
and the next row group has not yet been planned. While
Self::try_decode is iterating an active row group’s reader this
returns false; with Self::try_next_reader there is a clean
window between two consecutive returns where this is true.
Sourcepub fn row_groups_remaining(&self) -> usize
pub fn row_groups_remaining(&self) -> usize
Number of row groups left to decode after the one currently in flight. Useful as a “should I bother reconfiguring the scan?” signal.
Sourcepub fn into_builder(self) -> Result<ParquetPushDecoderBuilder, ParquetError>
pub fn into_builder(self) -> Result<ParquetPushDecoderBuilder, ParquetError>
Decompose this decoder back into a ParquetPushDecoderBuilder for the
row groups that have not yet been decoded.
This is the API for adaptive scans. Drive the decoder with
Self::try_next_reader; at any row-group boundary, call
into_builder to recover a builder, adjust it with the usual
ParquetPushDecoderBuilder setters, and
build a fresh decoder that resumes
from the next row group:
let mut decoder = get_decoder();
// ... drive `decoder.try_next_reader()` for a few row groups ...
if decoder.is_at_row_group_boundary() && decoder.row_groups_remaining() > 0 {
decoder = decoder
.into_builder()
.unwrap()
// any builder option can be changed here, e.g. promote a
// filter into a row filter based on observed selectivity
.with_row_filter(new_filter())
.build()
.unwrap();
}The returned builder pins the not-yet-decoded row groups (via
with_row_groups) and carries the
not-yet-consumed row selection and offset/limit budget, so rows from
already-decoded row groups are not produced again. Every other option —
projection, row filter, row selection policy, batch size, metrics,
predicate-cache size — is left exactly as the decoder had it and can be
overridden before build.
§Errors
Returns Err(ParquetError::General) when the decoder is not at a
row-group boundary (check Self::is_at_row_group_boundary first) or
has already finished. The decoder is consumed either way.
§Buffered bytes
The decoder’s buffered bytes are carried across the rebuild: bytes
already fetched for row groups the new configuration still reads are
not re-requested. Bytes the new configuration no longer needs stay
buffered until clear_all_ranges is called
or the rebuilt decoder is dropped.