1use bytes::{Buf, Bytes};
23use std::fs::File;
24use std::io::{BufReader, Seek, SeekFrom};
25use std::{io::Read, sync::Arc};
26
27use crate::bloom_filter::Sbbf;
28use crate::column::page::PageIterator;
29use crate::column::{page::PageReader, reader::ColumnReader};
30use crate::errors::{ParquetError, Result};
31use crate::file::metadata::*;
32pub use crate::file::serialized_reader::{SerializedFileReader, SerializedPageReader};
33use crate::record::reader::RowIter;
34use crate::schema::types::Type as SchemaType;
35
36use crate::basic::Type;
37
38use crate::column::reader::ColumnReaderImpl;
39
40#[allow(clippy::len_without_is_empty)]
43pub trait Length {
44 fn len(&self) -> u64;
46}
47
48pub trait ChunkReader: Length + Send + Sync {
64 type T: Read;
66
67 fn get_read(&self, start: u64) -> Result<Self::T>;
75
76 fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes>;
81}
82
83impl Length for File {
84 fn len(&self) -> u64 {
85 self.metadata().map(|m| m.len()).unwrap_or(0u64)
86 }
87}
88
89impl ChunkReader for File {
90 type T = BufReader<File>;
91
92 fn get_read(&self, start: u64) -> Result<Self::T> {
93 let mut reader = self.try_clone()?;
94 reader.seek(SeekFrom::Start(start))?;
95 Ok(BufReader::new(self.try_clone()?))
96 }
97
98 fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
99 let mut buffer = Vec::with_capacity(length);
100 let mut reader = self.try_clone()?;
101 reader.seek(SeekFrom::Start(start))?;
102 let read = reader.take(length as _).read_to_end(&mut buffer)?;
103
104 if read != length {
105 return Err(eof_err!(
106 "Expected to read {} bytes, read only {}",
107 length,
108 read
109 ));
110 }
111 Ok(buffer.into())
112 }
113}
114
115impl Length for Bytes {
116 fn len(&self) -> u64 {
117 self.len() as u64
118 }
119}
120
121impl ChunkReader for Bytes {
122 type T = bytes::buf::Reader<Bytes>;
123
124 fn get_read(&self, start: u64) -> Result<Self::T> {
125 let start = start as usize;
126 Ok(self.slice(start..).reader())
127 }
128
129 fn get_bytes(&self, start: u64, length: usize) -> Result<Bytes> {
130 let start = start as usize;
131 Ok(self.slice(start..start + length))
132 }
133}
134
135pub trait FileReader: Send + Sync {
141 fn metadata(&self) -> &ParquetMetaData;
143
144 fn num_row_groups(&self) -> usize;
146
147 fn get_row_group(&self, i: usize) -> Result<Box<dyn RowGroupReader + '_>>;
149
150 fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter>;
157}
158
159pub trait RowGroupReader: Send + Sync {
162 fn metadata(&self) -> &RowGroupMetaData;
164
165 fn num_columns(&self) -> usize;
167
168 fn get_column_page_reader(&self, i: usize) -> Result<Box<dyn PageReader>>;
170
171 fn get_column_reader(&self, i: usize) -> Result<ColumnReader> {
173 let schema_descr = self.metadata().schema_descr();
174 let col_descr = schema_descr.column(i);
175 let col_page_reader = self.get_column_page_reader(i)?;
176 let col_reader = match col_descr.physical_type() {
177 Type::BOOLEAN => {
178 ColumnReader::BoolColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
179 }
180 Type::INT32 => {
181 ColumnReader::Int32ColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
182 }
183 Type::INT64 => {
184 ColumnReader::Int64ColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
185 }
186 Type::INT96 => {
187 ColumnReader::Int96ColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
188 }
189 Type::FLOAT => {
190 ColumnReader::FloatColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
191 }
192 Type::DOUBLE => {
193 ColumnReader::DoubleColumnReader(ColumnReaderImpl::new(col_descr, col_page_reader))
194 }
195 Type::BYTE_ARRAY => ColumnReader::ByteArrayColumnReader(ColumnReaderImpl::new(
196 col_descr,
197 col_page_reader,
198 )),
199 Type::FIXED_LEN_BYTE_ARRAY => ColumnReader::FixedLenByteArrayColumnReader(
200 ColumnReaderImpl::new(col_descr, col_page_reader),
201 ),
202 };
203 Ok(col_reader)
204 }
205
206 fn get_column_bloom_filter(&self, i: usize) -> Option<&Sbbf>;
209
210 fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter>;
215}
216
217pub struct FilePageIterator {
222 column_index: usize,
223 row_group_indices: Box<dyn Iterator<Item = usize> + Send>,
224 file_reader: Arc<dyn FileReader>,
225}
226
227impl FilePageIterator {
228 pub fn new(column_index: usize, file_reader: Arc<dyn FileReader>) -> Result<Self> {
230 let num_row_groups = file_reader.metadata().num_row_groups();
231
232 let row_group_indices = Box::new(0..num_row_groups);
233
234 Self::with_row_groups(column_index, row_group_indices, file_reader)
235 }
236
237 pub fn with_row_groups(
239 column_index: usize,
240 row_group_indices: Box<dyn Iterator<Item = usize> + Send>,
241 file_reader: Arc<dyn FileReader>,
242 ) -> Result<Self> {
243 let num_columns = file_reader
245 .metadata()
246 .file_metadata()
247 .schema_descr()
248 .num_columns();
249
250 if column_index >= num_columns {
251 return Err(ParquetError::IndexOutOfBound(column_index, num_columns));
252 }
253
254 Ok(Self {
256 column_index,
257 row_group_indices,
258 file_reader,
259 })
260 }
261}
262
263impl Iterator for FilePageIterator {
264 type Item = Result<Box<dyn PageReader>>;
265
266 fn next(&mut self) -> Option<Result<Box<dyn PageReader>>> {
267 self.row_group_indices.next().map(|row_group_index| {
268 self.file_reader
269 .get_row_group(row_group_index)
270 .and_then(|r| r.get_column_page_reader(self.column_index))
271 })
272 }
273}
274
275impl PageIterator for FilePageIterator {}