Skip to main content

parquet/util/
push_buffers.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::errors::ParquetError;
19use crate::file::reader::{ChunkReader, Length};
20use bytes::Bytes;
21use std::fmt::Display;
22use std::ops::Range;
23
24/// Holds multiple non-contiguous, caller-provided buffers of file data.
25///
26/// This is the in-memory buffer used by the push-based Parquet decoders
27/// (`ParquetPushDecoder` and `ParquetMetaDataPushDecoder`). It can be
28/// constructed up front and handed to a builder so the decoder reuses bytes
29/// that have already been fetched.
30///
31/// Features:
32/// 1. Zero copy
33/// 2. non contiguous ranges of bytes
34///
35/// # Non Coalescing
36///
37/// This buffer does not coalesce  (merging adjacent ranges of bytes into a
38/// single range). Coalescing at this level would require copying the data but
39/// the caller may already have the needed data in a single buffer which would
40/// require no copying.
41///
42/// Thus, the implementation defers to the caller to coalesce subsequent requests
43/// if desired.
44#[derive(Debug, Clone, Default)]
45pub struct PushBuffers {
46    /// the virtual "offset" of this buffers (added to any request)
47    offset: u64,
48    /// The total length of the file being decoded
49    file_len: u64,
50    /// The ranges of data that are available for decoding (not adjusted for offset)
51    ranges: Vec<Range<u64>>,
52    /// The buffers of data that can be used to decode the Parquet file
53    buffers: Vec<Bytes>,
54}
55
56impl Display for PushBuffers {
57    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58        writeln!(
59            f,
60            "Buffers (offset: {}, file_len: {})",
61            self.offset, self.file_len
62        )?;
63        writeln!(f, "Available Ranges (w/ offset):")?;
64        for range in &self.ranges {
65            writeln!(
66                f,
67                "  {}..{} ({}..{}): {} bytes",
68                range.start,
69                range.end,
70                range.start + self.offset,
71                range.end + self.offset,
72                range.end - range.start
73            )?;
74        }
75
76        Ok(())
77    }
78}
79
80impl PushBuffers {
81    /// Create a new, empty `PushBuffers` for a file of the given length.
82    ///
83    /// Use [`PushBuffers::default`] when the file length is unknown or
84    /// irrelevant (e.g. the push decoder, which tracks ranges by absolute
85    /// offset and never consults `file_len`).
86    pub fn new(file_len: u64) -> Self {
87        Self {
88            offset: 0,
89            file_len,
90            ranges: Vec::new(),
91            buffers: Vec::new(),
92        }
93    }
94
95    /// Push all the ranges and buffers
96    pub fn push_ranges(&mut self, ranges: Vec<Range<u64>>, buffers: Vec<Bytes>) {
97        assert_eq!(
98            ranges.len(),
99            buffers.len(),
100            "Number of ranges must match number of buffers"
101        );
102        for (range, buffer) in ranges.into_iter().zip(buffers.into_iter()) {
103            self.push_range(range, buffer);
104        }
105    }
106
107    /// Push a new range and its associated buffer
108    pub fn push_range(&mut self, range: Range<u64>, buffer: Bytes) {
109        assert_eq!(
110            (range.end - range.start) as usize,
111            buffer.len(),
112            "Range length must match buffer length"
113        );
114        self.ranges.push(range);
115        self.buffers.push(buffer);
116    }
117
118    /// Returns true if the Buffers contains data for the given range
119    pub(crate) fn has_range(&self, range: &Range<u64>) -> bool {
120        self.ranges
121            .iter()
122            .any(|r| r.start <= range.start && r.end >= range.end)
123    }
124
125    fn iter(&self) -> impl Iterator<Item = (&Range<u64>, &Bytes)> {
126        self.ranges.iter().zip(self.buffers.iter())
127    }
128
129    /// return the file length of the Parquet file being read
130    pub(crate) fn file_len(&self) -> u64 {
131        self.file_len
132    }
133
134    /// Specify a new offset
135    fn with_offset(mut self, offset: u64) -> Self {
136        self.offset = offset;
137        self
138    }
139
140    /// Return the total of all buffered ranges
141    #[cfg(feature = "arrow")]
142    pub(crate) fn buffered_bytes(&self) -> u64 {
143        self.ranges.iter().map(|r| r.end - r.start).sum()
144    }
145
146    /// Clear any range and corresponding buffer that is exactly in the ranges_to_clear
147    #[cfg(feature = "arrow")]
148    pub(crate) fn clear_ranges(&mut self, ranges_to_clear: &[Range<u64>]) {
149        let mut new_ranges = Vec::new();
150        let mut new_buffers = Vec::new();
151
152        for (range, buffer) in self.iter() {
153            if !ranges_to_clear
154                .iter()
155                .any(|r| r.start == range.start && r.end == range.end)
156            {
157                new_ranges.push(range.clone());
158                new_buffers.push(buffer.clone());
159            }
160        }
161        self.ranges = new_ranges;
162        self.buffers = new_buffers;
163    }
164
165    /// Clear all buffered ranges and their corresponding data
166    pub(crate) fn clear_all_ranges(&mut self) {
167        self.ranges.clear();
168        self.buffers.clear();
169    }
170}
171
172impl Length for PushBuffers {
173    fn len(&self) -> u64 {
174        self.file_len
175    }
176}
177
178/// less efficient implementation of Read for Buffers
179impl std::io::Read for PushBuffers {
180    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
181        // Find the range that contains the start offset
182        let mut found = false;
183        for (range, data) in self.iter() {
184            if range.start <= self.offset && range.end >= self.offset + buf.len() as u64 {
185                // Found the range, figure out the starting offset in the buffer
186                let start_offset = (self.offset - range.start) as usize;
187                let end_offset = start_offset + buf.len();
188                let slice = data.slice(start_offset..end_offset);
189                buf.copy_from_slice(slice.as_ref());
190                found = true;
191                break;
192            }
193        }
194        if found {
195            // If we found the range, we can return the number of bytes read
196            // advance our offset
197            self.offset += buf.len() as u64;
198            Ok(buf.len())
199        } else {
200            Err(std::io::Error::new(
201                std::io::ErrorKind::UnexpectedEof,
202                "No data available in Buffers",
203            ))
204        }
205    }
206}
207
208impl ChunkReader for PushBuffers {
209    type T = Self;
210
211    fn get_read(&self, start: u64) -> Result<Self::T, ParquetError> {
212        Ok(self.clone().with_offset(self.offset + start))
213    }
214
215    fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes, ParquetError> {
216        // find the range that contains the start offset
217        for (range, data) in self.iter() {
218            if range.start <= start && range.end >= start + length as u64 {
219                // Found the range, figure out the starting offset in the buffer
220                let start_offset = (start - range.start) as usize;
221                return Ok(data.slice(start_offset..start_offset + length));
222            }
223        }
224        // Signal that we need more data
225        let requested_end = start + length as u64;
226        Err(ParquetError::NeedMoreDataRange(start..requested_end))
227    }
228}