1use bytes::{Buf, Bytes};
23use std::fs::File;
24use std::io::{BufReader, Seek, SeekFrom};
25use std::{io::Read, sync::Arc};
26
27use crate::bloom_filter::Sbbf;
28use crate::column::page::PageIterator;
29use crate::column::{page::PageReader, reader::ColumnReader};
30use crate::errors::{ParquetError, Result};
31use crate::file::metadata::*;
32pub use crate::file::serialized_reader::{SerializedFileReader, SerializedPageReader};
33use crate::record::reader::RowIter;
34use crate::schema::types::Type as SchemaType;
35
36use crate::basic::Type;
37
38use crate::column::reader::ColumnReaderImpl;
39
40#[allow(clippy::len_without_is_empty)]
43pub trait Length {
44 fn len(&self) -> u64;
46}
47
48pub trait ChunkReader: Length + Send + Sync {
65 type T: Read;
67
68 fn get_read(&self, start: u64) -> Result<Self::T>;
76
77 fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes>;
82}
83
84impl Length for File {
85 fn len(&self) -> u64 {
86 self.metadata().map(|m| m.len()).unwrap_or(0u64)
87 }
88}
89
90impl ChunkReader for File {
91 type T = BufReader<File>;
92
93 fn get_read(&self, start: u64) -> Result<Self::T> {
94 let mut reader = self.try_clone()?;
95 reader.seek(SeekFrom::Start(start))?;
96 Ok(BufReader::new(self.try_clone()?))
97 }
98
99 fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
100 let mut buffer = Vec::with_capacity(length);
101 let mut reader = self.try_clone()?;
102 reader.seek(SeekFrom::Start(start))?;
103 let read = reader.take(length as _).read_to_end(&mut buffer)?;
104
105 if read != length {
106 return Err(eof_err!(
107 "Expected to read {} bytes, read only {}",
108 length,
109 read
110 ));
111 }
112 Ok(buffer.into())
113 }
114}
115
116impl Length for Bytes {
117 fn len(&self) -> u64 {
118 self.len() as u64
119 }
120}
121
122impl ChunkReader for Bytes {
123 type T = bytes::buf::Reader<Bytes>;
124
125 fn get_read(&self, start: u64) -> Result<Self::T> {
126 let start = start as usize;
127 if start > self.len() {
128 return Err(eof_err!(
129 "Expected to read at offset {start}, while file has length {}",
130 self.len()
131 ));
132 }
133 Ok(self.slice(start..).reader())
134 }
135
136 fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
137 let start = start as usize;
138 if start > self.len() || start + length > self.len() {
139 return Err(eof_err!(
140 "Expected to read {} bytes at offset {}, while file has length {}",
141 length,
142 start,
143 self.len()
144 ));
145 }
146 Ok(self.slice(start..start + length))
147 }
148}
149
150pub trait FileReader: Send + Sync {
156 fn metadata(&self) -> &ParquetMetaData;
158
159 fn num_row_groups(&self) -> usize;
161
162 fn get_row_group(&self, i: usize) -> Result<Box<dyn RowGroupReader + '_>>;
164
165 fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter<'_>>;
172}
173
174pub trait RowGroupReader: Send + Sync {
177 fn metadata(&self) -> &RowGroupMetaData;
179
180 fn num_columns(&self) -> usize;
182
183 fn get_column_page_reader(&self, i: usize) -> Result<Box<dyn PageReader>>;
185
186 fn get_column_reader(&self, i: usize) -> Result<ColumnReader> {
188 let schema_descr = self.metadata().schema_descr();
189 let col_descr = schema_descr.column(i);
190 let col_page_reader = self.get_column_page_reader(i)?;
191 let col_reader = match col_descr.physical_type() {
192 Type::BOOLEAN => {
193 ColumnReader::BoolColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
194 }
195 Type::INT32 => {
196 ColumnReader::Int32ColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
197 }
198 Type::INT64 => {
199 ColumnReader::Int64ColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
200 }
201 Type::INT96 => {
202 ColumnReader::Int96ColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
203 }
204 Type::FLOAT => {
205 ColumnReader::FloatColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
206 }
207 Type::DOUBLE => {
208 ColumnReader::DoubleColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
209 }
210 Type::BYTE_ARRAY => ColumnReader::ByteArrayColumnReader(ColumnReaderImpl::new(
211 col_descr,
212 col_page_reader,
213 )),
214 Type::FIXED_LEN_BYTE_ARRAY => ColumnReader::FixedLenByteArrayColumnReader(
215 ColumnReaderImpl::new(col_descr, col_page_reader),
216 ),
217 };
218 Ok(col_reader)
219 }
220
221 fn get_column_bloom_filter(&self, i: usize) -> Option<&Sbbf>;
224
225 fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter<'_>>;
230}
231
232pub struct FilePageIterator {
237 column_index: usize,
238 row_group_indices: Box<dyn Iterator<Item = usize> + Send>,
239 file_reader: Arc<dyn FileReader>,
240}
241
242impl FilePageIterator {
243 pub fn new(column_index: usize, file_reader: Arc<dyn FileReader>) -> Result<Self> {
245 let num_row_groups = file_reader.metadata().num_row_groups();
246
247 let row_group_indices = Box::new(0..num_row_groups);
248
249 Self::with_row_groups(column_index, row_group_indices, file_reader)
250 }
251
252 pub fn with_row_groups(
254 column_index: usize,
255 row_group_indices: Box<dyn Iterator<Item = usize> + Send>,
256 file_reader: Arc<dyn FileReader>,
257 ) -> Result<Self> {
258 let num_columns = file_reader
260 .metadata()
261 .file_metadata()
262 .schema_descr()
263 .num_columns();
264
265 if column_index >= num_columns {
266 return Err(ParquetError::IndexOutOfBound(column_index, num_columns));
267 }
268
269 Ok(Self {
271 column_index,
272 row_group_indices,
273 file_reader,
274 })
275 }
276}
277
278impl Iterator for FilePageIterator {
279 type Item = Result<Box<dyn PageReader>>;
280
281 fn next(&mut self) -> Option<Result<Box<dyn PageReader>>> {
282 self.row_group_indices.next().map(|row_group_index| {
283 self.file_reader
284 .get_row_group(row_group_index)
285 .and_then(|r| r.get_column_page_reader(self.column_index))
286 })
287 }
288}
289
290impl PageIterator for FilePageIterator {}
291
292#[cfg(test)]
293mod tests {
294 use super::*;
295
296 #[test]
297 fn test_bytes_chunk_reader_get_read_out_of_bounds() {
298 let data = Bytes::from(vec![0, 1, 2, 3]);
299 let err = data.get_read(5).unwrap_err();
300 assert_eq!(
301 err.to_string(),
302 "EOF: Expected to read at offset 5, while file has length 4"
303 );
304 }
305
306 #[test]
307 fn test_bytes_chunk_reader_get_bytes_out_of_bounds() {
308 let data = Bytes::from(vec![0, 1, 2, 3]);
309 let err = data.get_bytes(5, 1).unwrap_err();
310 assert_eq!(
311 err.to_string(),
312 "EOF: Expected to read 1 bytes at offset 5, while file has length 4"
313 );
314
315 let err = data.get_bytes(2, 3).unwrap_err();
316 assert_eq!(
317 err.to_string(),
318 "EOF: Expected to read 3 bytes at offset 2, while file has length 4"
319 );
320 }
321}