parquet/file/reader.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! File reader API and methods to access file metadata, row group
19//! readers to read individual column chunks, or access record
20//! iterator.
21
22use bytes::{Buf, Bytes};
23use std::fs::File;
24use std::io::{BufReader, Seek, SeekFrom};
25use std::{io::Read, sync::Arc};
26
27use crate::bloom_filter::Sbbf;
28use crate::column::page::PageIterator;
29use crate::column::{page::PageReader, reader::ColumnReader};
30use crate::errors::{ParquetError, Result};
31use crate::file::metadata::*;
32pub use crate::file::serialized_reader::{SerializedFileReader, SerializedPageReader};
33use crate::record::reader::RowIter;
34use crate::schema::types::Type as SchemaType;
35
36use crate::basic::Type;
37
38use crate::column::reader::ColumnReaderImpl;
39
40/// Length should return the total number of bytes in the input source.
41/// It's mainly used to read the metadata, which is at the end of the source.
42#[allow(clippy::len_without_is_empty)]
43pub trait Length {
44 /// Returns the amount of bytes of the inner source.
45 fn len(&self) -> u64;
46}
47
48/// Generates [`Read`]ers to read chunks of a Parquet data source.
49///
50/// The Parquet reader uses [`ChunkReader`] to access Parquet data, allowing
51/// multiple decoders to read concurrently from different locations in the same
52/// file.
53///
54/// The trait functions both as a reader and a factory for readers.
55/// * random access via [`Self::get_bytes`]
56/// * sequential access via the reader returned via factory method [`Self::get_read`]
57///
58/// # Provided Implementations
59/// * [`File`] for reading from local file system
60/// * [`Bytes`] for reading from an in-memory buffer
61///
62/// User provided implementations can implement more sophisticated behaviors
63/// such as on-demand buffering or scan sharing.
64pub trait ChunkReader: Length + Send + Sync {
65 /// The concrete type of reader returned by this trait
66 type T: Read;
67
68 /// Get a [`Read`] instance starting at the provided file offset
69 ///
70 /// Returned readers follow the model of [`File::try_clone`] where mutations
71 /// of one reader affect all readers. Thus subsequent or concurrent calls to
72 /// [`Self::get_read`] or [`Self::get_bytes`] may cause side-effects on
73 /// previously returned readers. Callers of `get_read` should take care
74 /// to avoid race conditions.
75 fn get_read(&self, start: u64) -> Result<Self::T>;
76
77 /// Get a range of data in memory as [`Bytes`]
78 ///
79 /// Similarly to [`Self::get_read`], this method may have side-effects on
80 /// previously returned readers.
81 fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes>;
82}
83
84impl Length for File {
85 fn len(&self) -> u64 {
86 self.metadata().map(|m| m.len()).unwrap_or(0u64)
87 }
88}
89
90impl ChunkReader for File {
91 type T = BufReader<File>;
92
93 fn get_read(&self, start: u64) -> Result<Self::T> {
94 let mut reader = self.try_clone()?;
95 reader.seek(SeekFrom::Start(start))?;
96 Ok(BufReader::new(self.try_clone()?))
97 }
98
99 fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
100 let mut buffer = Vec::with_capacity(length);
101 let mut reader = self.try_clone()?;
102 reader.seek(SeekFrom::Start(start))?;
103 let read = reader.take(length as _).read_to_end(&mut buffer)?;
104
105 if read != length {
106 return Err(eof_err!(
107 "Expected to read {} bytes, read only {}",
108 length,
109 read
110 ));
111 }
112 Ok(buffer.into())
113 }
114}
115
116impl Length for Bytes {
117 fn len(&self) -> u64 {
118 self.len() as u64
119 }
120}
121
122impl ChunkReader for Bytes {
123 type T = bytes::buf::Reader<Bytes>;
124
125 fn get_read(&self, start: u64) -> Result<Self::T> {
126 let start = start as usize;
127 Ok(self.slice(start..).reader())
128 }
129
130 fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
131 let start = start as usize;
132 Ok(self.slice(start..start + length))
133 }
134}
135
136// ----------------------------------------------------------------------
137// APIs for file & row group readers
138
139/// Parquet file reader API. With this, user can get metadata information about the
140/// Parquet file, can get reader for each row group, and access record iterator.
141pub trait FileReader: Send + Sync {
142 /// Get metadata information about this file.
143 fn metadata(&self) -> &ParquetMetaData;
144
145 /// Get the total number of row groups for this file.
146 fn num_row_groups(&self) -> usize;
147
148 /// Get the `i`th row group reader. Note this doesn't do bound check.
149 fn get_row_group(&self, i: usize) -> Result<Box<dyn RowGroupReader + '_>>;
150
151 /// Get an iterator over the row in this file, see [`RowIter`] for caveats.
152 ///
153 /// Iterator will automatically load the next row group to advance.
154 ///
155 /// Projected schema can be a subset of or equal to the file schema, when it is None,
156 /// full file schema is assumed.
157 fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter<'_>>;
158}
159
160/// Parquet row group reader API. With this, user can get metadata information about the
161/// row group, as well as readers for each individual column chunk.
162pub trait RowGroupReader: Send + Sync {
163 /// Get metadata information about this row group.
164 fn metadata(&self) -> &RowGroupMetaData;
165
166 /// Get the total number of column chunks in this row group.
167 fn num_columns(&self) -> usize;
168
169 /// Get page reader for the `i`th column chunk.
170 fn get_column_page_reader(&self, i: usize) -> Result<Box<dyn PageReader>>;
171
172 /// Get value reader for the `i`th column chunk.
173 fn get_column_reader(&self, i: usize) -> Result<ColumnReader> {
174 let schema_descr = self.metadata().schema_descr();
175 let col_descr = schema_descr.column(i);
176 let col_page_reader = self.get_column_page_reader(i)?;
177 let col_reader = match col_descr.physical_type() {
178 Type::BOOLEAN => {
179 ColumnReader::BoolColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
180 }
181 Type::INT32 => {
182 ColumnReader::Int32ColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
183 }
184 Type::INT64 => {
185 ColumnReader::Int64ColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
186 }
187 Type::INT96 => {
188 ColumnReader::Int96ColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
189 }
190 Type::FLOAT => {
191 ColumnReader::FloatColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
192 }
193 Type::DOUBLE => {
194 ColumnReader::DoubleColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
195 }
196 Type::BYTE_ARRAY => ColumnReader::ByteArrayColumnReader(ColumnReaderImpl::new(
197 col_descr,
198 col_page_reader,
199 )),
200 Type::FIXED_LEN_BYTE_ARRAY => ColumnReader::FixedLenByteArrayColumnReader(
201 ColumnReaderImpl::new(col_descr, col_page_reader),
202 ),
203 };
204 Ok(col_reader)
205 }
206
207 /// Get bloom filter for the `i`th column chunk, if present and the reader was configured
208 /// to read bloom filters.
209 fn get_column_bloom_filter(&self, i: usize) -> Option<&Sbbf>;
210
211 /// Get an iterator over the row in this file, see [`RowIter`] for caveats.
212 ///
213 /// Projected schema can be a subset of or equal to the file schema, when it is None,
214 /// full file schema is assumed.
215 fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter<'_>>;
216}
217
218// ----------------------------------------------------------------------
219// Iterator
220
221/// Implementation of page iterator for parquet file.
222pub struct FilePageIterator {
223 column_index: usize,
224 row_group_indices: Box<dyn Iterator<Item = usize> + Send>,
225 file_reader: Arc<dyn FileReader>,
226}
227
228impl FilePageIterator {
229 /// Creates a page iterator for all row groups in file.
230 pub fn new(column_index: usize, file_reader: Arc<dyn FileReader>) -> Result<Self> {
231 let num_row_groups = file_reader.metadata().num_row_groups();
232
233 let row_group_indices = Box::new(0..num_row_groups);
234
235 Self::with_row_groups(column_index, row_group_indices, file_reader)
236 }
237
238 /// Create page iterator from parquet file reader with only some row groups.
239 pub fn with_row_groups(
240 column_index: usize,
241 row_group_indices: Box<dyn Iterator<Item = usize> + Send>,
242 file_reader: Arc<dyn FileReader>,
243 ) -> Result<Self> {
244 // Check that column_index is valid
245 let num_columns = file_reader
246 .metadata()
247 .file_metadata()
248 .schema_descr()
249 .num_columns();
250
251 if column_index >= num_columns {
252 return Err(ParquetError::IndexOutOfBound(column_index, num_columns));
253 }
254
255 // We don't check iterators here because iterator may be infinite
256 Ok(Self {
257 column_index,
258 row_group_indices,
259 file_reader,
260 })
261 }
262}
263
264impl Iterator for FilePageIterator {
265 type Item = Result<Box<dyn PageReader>>;
266
267 fn next(&mut self) -> Option<Result<Box<dyn PageReader>>> {
268 self.row_group_indices.next().map(|row_group_index| {
269 self.file_reader
270 .get_row_group(row_group_index)
271 .and_then(|r| r.get_column_page_reader(self.column_index))
272 })
273 }
274}
275
276impl PageIterator for FilePageIterator {}