parquet/util/test_common/
page_util.rs1use bytes::Bytes;
19
20use crate::basic::Encoding;
21use crate::column::page::{Page, PageIterator};
22use crate::column::page::{PageMetadata, PageReader};
23use crate::data_type::DataType;
24use crate::encodings::encoding::{get_encoder, Encoder};
25use crate::encodings::levels::LevelEncoder;
26use crate::errors::Result;
27use crate::schema::types::{ColumnDescPtr, ColumnDescriptor, ColumnPath, Type as SchemaType};
28use std::iter::Peekable;
29use std::mem;
30use std::sync::Arc;
31
32pub trait DataPageBuilder {
33 fn add_rep_levels(&mut self, max_level: i16, rep_levels: &[i16]);
34 fn add_def_levels(&mut self, max_level: i16, def_levels: &[i16]);
35 fn add_values<T: DataType>(&mut self, encoding: Encoding, values: &[T::T]);
36 fn add_indices(&mut self, indices: Bytes);
37 fn consume(self) -> Page;
38}
39
40pub struct DataPageBuilderImpl {
48 encoding: Option<Encoding>,
49 num_values: u32,
50 buffer: Vec<u8>,
51 rep_levels_byte_len: u32,
52 def_levels_byte_len: u32,
53 datapage_v2: bool,
54 type_width: i32,
55}
56
57impl DataPageBuilderImpl {
58 pub fn new(desc: ColumnDescPtr, num_values: u32, datapage_v2: bool) -> Self {
62 DataPageBuilderImpl {
63 encoding: None,
64 num_values,
65 buffer: vec![],
66 rep_levels_byte_len: 0,
67 def_levels_byte_len: 0,
68 datapage_v2,
69 type_width: desc.type_length(),
70 }
71 }
72
73 fn add_levels(&mut self, max_level: i16, levels: &[i16]) -> u32 {
75 if max_level <= 0 {
76 return 0;
77 }
78 let mut level_encoder = LevelEncoder::v1(Encoding::RLE, max_level, levels.len());
79 level_encoder.put(levels);
80 let encoded_levels = level_encoder.consume();
81 let encoded_bytes = &encoded_levels[mem::size_of::<i32>()..];
83 if self.datapage_v2 {
84 self.buffer.extend_from_slice(encoded_bytes);
88 } else {
89 self.buffer.extend_from_slice(encoded_levels.as_slice());
90 }
91 encoded_bytes.len() as u32
92 }
93}
94
95impl DataPageBuilder for DataPageBuilderImpl {
96 fn add_rep_levels(&mut self, max_levels: i16, rep_levels: &[i16]) {
97 self.num_values = rep_levels.len() as u32;
98 self.rep_levels_byte_len = self.add_levels(max_levels, rep_levels);
99 }
100
101 fn add_def_levels(&mut self, max_levels: i16, def_levels: &[i16]) {
102 self.num_values = def_levels.len() as u32;
103 self.def_levels_byte_len = self.add_levels(max_levels, def_levels);
104 }
105
106 fn add_values<T: DataType>(&mut self, encoding: Encoding, values: &[T::T]) {
107 assert!(
108 self.num_values >= values.len() as u32,
109 "num_values: {}, values.len(): {}",
110 self.num_values,
111 values.len()
112 );
113 let desc = {
115 let ty = SchemaType::primitive_type_builder("t", T::get_physical_type())
116 .with_length(self.type_width)
117 .build()
118 .unwrap();
119 Arc::new(ColumnDescriptor::new(
120 Arc::new(ty),
121 0,
122 0,
123 ColumnPath::new(vec![]),
124 ))
125 };
126 self.encoding = Some(encoding);
127 let mut encoder: Box<dyn Encoder<T>> =
128 get_encoder::<T>(encoding, &desc).expect("get_encoder() should be OK");
129 encoder.put(values).expect("put() should be OK");
130 let encoded_values = encoder
131 .flush_buffer()
132 .expect("consume_buffer() should be OK");
133 self.buffer.extend_from_slice(&encoded_values);
134 }
135
136 fn add_indices(&mut self, indices: Bytes) {
137 self.encoding = Some(Encoding::RLE_DICTIONARY);
138 self.buffer.extend_from_slice(&indices);
139 }
140
141 fn consume(self) -> Page {
142 if self.datapage_v2 {
143 Page::DataPageV2 {
144 buf: Bytes::from(self.buffer),
145 num_values: self.num_values,
146 encoding: self.encoding.unwrap(),
147 num_nulls: 0, num_rows: self.num_values, def_levels_byte_len: self.def_levels_byte_len,
152 rep_levels_byte_len: self.rep_levels_byte_len,
153 is_compressed: false,
154 statistics: None, }
156 } else {
157 Page::DataPage {
158 buf: Bytes::from(self.buffer),
159 num_values: self.num_values,
160 encoding: self.encoding.unwrap(),
161 def_level_encoding: Encoding::RLE,
162 rep_level_encoding: Encoding::RLE,
163 statistics: None, }
165 }
166 }
167}
168
169pub struct InMemoryPageReader<P: Iterator<Item = Page>> {
171 page_iter: Peekable<P>,
172}
173
174impl<P: Iterator<Item = Page>> InMemoryPageReader<P> {
175 pub fn new(pages: impl IntoIterator<Item = Page, IntoIter = P>) -> Self {
176 Self {
177 page_iter: pages.into_iter().peekable(),
178 }
179 }
180}
181
182impl<P: Iterator<Item = Page> + Send> PageReader for InMemoryPageReader<P> {
183 fn get_next_page(&mut self) -> Result<Option<Page>> {
184 Ok(self.page_iter.next())
185 }
186
187 fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> {
188 if let Some(x) = self.page_iter.peek() {
189 match x {
190 Page::DataPage { num_values, .. } => Ok(Some(PageMetadata {
191 num_rows: None,
192 num_levels: Some(*num_values as _),
193 is_dict: false,
194 })),
195 Page::DataPageV2 {
196 num_rows,
197 num_values,
198 ..
199 } => Ok(Some(PageMetadata {
200 num_rows: Some(*num_rows as _),
201 num_levels: Some(*num_values as _),
202 is_dict: false,
203 })),
204 Page::DictionaryPage { .. } => Ok(Some(PageMetadata {
205 num_rows: None,
206 num_levels: None,
207 is_dict: true,
208 })),
209 }
210 } else {
211 Ok(None)
212 }
213 }
214
215 fn skip_next_page(&mut self) -> Result<()> {
216 self.page_iter.next();
217 Ok(())
218 }
219}
220
221impl<P: Iterator<Item = Page> + Send> Iterator for InMemoryPageReader<P> {
222 type Item = Result<Page>;
223
224 fn next(&mut self) -> Option<Self::Item> {
225 self.get_next_page().transpose()
226 }
227}
228
229#[derive(Clone)]
231pub struct InMemoryPageIterator<I: Iterator<Item = Vec<Page>>> {
232 page_reader_iter: I,
233}
234
235impl<I: Iterator<Item = Vec<Page>>> InMemoryPageIterator<I> {
236 pub fn new(pages: impl IntoIterator<Item = Vec<Page>, IntoIter = I>) -> Self {
237 Self {
238 page_reader_iter: pages.into_iter(),
239 }
240 }
241}
242
243impl<I: Iterator<Item = Vec<Page>>> Iterator for InMemoryPageIterator<I> {
244 type Item = Result<Box<dyn PageReader>>;
245
246 fn next(&mut self) -> Option<Self::Item> {
247 self.page_reader_iter
248 .next()
249 .map(|x| Ok(Box::new(InMemoryPageReader::new(x)) as Box<dyn PageReader>))
250 }
251}
252
253impl<I: Iterator<Item = Vec<Page>> + Send> PageIterator for InMemoryPageIterator<I> {}