parquet/util/test_common/
page_util.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use bytes::Bytes;
19
20use crate::basic::Encoding;
21use crate::column::page::{Page, PageIterator};
22use crate::column::page::{PageMetadata, PageReader};
23use crate::data_type::DataType;
24use crate::encodings::encoding::{get_encoder, Encoder};
25use crate::encodings::levels::LevelEncoder;
26use crate::errors::Result;
27use crate::schema::types::{ColumnDescPtr, ColumnDescriptor, ColumnPath, Type as SchemaType};
28use std::iter::Peekable;
29use std::mem;
30use std::sync::Arc;
31
32pub trait DataPageBuilder {
33    fn add_rep_levels(&mut self, max_level: i16, rep_levels: &[i16]);
34    fn add_def_levels(&mut self, max_level: i16, def_levels: &[i16]);
35    fn add_values<T: DataType>(&mut self, encoding: Encoding, values: &[T::T]);
36    fn add_indices(&mut self, indices: Bytes);
37    fn consume(self) -> Page;
38}
39
40/// A utility struct for building data pages (v1 or v2). Callers must call the
41/// following functions in order to populate and obtain a data page:
42///
43/// - add_rep_levels()
44/// - add_def_levels()
45/// - add_values() for normal data page / add_indices() for dictionary data page
46/// - consume()
47pub struct DataPageBuilderImpl {
48    encoding: Option<Encoding>,
49    num_values: u32,
50    buffer: Vec<u8>,
51    rep_levels_byte_len: u32,
52    def_levels_byte_len: u32,
53    datapage_v2: bool,
54    type_width: i32,
55}
56
57impl DataPageBuilderImpl {
58    // `num_values` is the number of non-null values to put in the data page.
59    // `datapage_v2` flag is used to indicate if the generated data page should use V2
60    // format or not.
61    pub fn new(desc: ColumnDescPtr, num_values: u32, datapage_v2: bool) -> Self {
62        DataPageBuilderImpl {
63            encoding: None,
64            num_values,
65            buffer: vec![],
66            rep_levels_byte_len: 0,
67            def_levels_byte_len: 0,
68            datapage_v2,
69            type_width: desc.type_length(),
70        }
71    }
72
73    // Adds levels to the buffer and return number of encoded bytes
74    fn add_levels(&mut self, max_level: i16, levels: &[i16]) -> u32 {
75        if max_level <= 0 {
76            return 0;
77        }
78        let mut level_encoder = LevelEncoder::v1(Encoding::RLE, max_level, levels.len());
79        level_encoder.put(levels);
80        let encoded_levels = level_encoder.consume();
81        // Actual encoded bytes (without length offset)
82        let encoded_bytes = &encoded_levels[mem::size_of::<i32>()..];
83        if self.datapage_v2 {
84            // Level encoder always initializes with offset of i32, where it stores
85            // length of encoded data; for data page v2 we explicitly
86            // store length, therefore we should skip i32 bytes.
87            self.buffer.extend_from_slice(encoded_bytes);
88        } else {
89            self.buffer.extend_from_slice(encoded_levels.as_slice());
90        }
91        encoded_bytes.len() as u32
92    }
93}
94
95impl DataPageBuilder for DataPageBuilderImpl {
96    fn add_rep_levels(&mut self, max_levels: i16, rep_levels: &[i16]) {
97        self.num_values = rep_levels.len() as u32;
98        self.rep_levels_byte_len = self.add_levels(max_levels, rep_levels);
99    }
100
101    fn add_def_levels(&mut self, max_levels: i16, def_levels: &[i16]) {
102        self.num_values = def_levels.len() as u32;
103        self.def_levels_byte_len = self.add_levels(max_levels, def_levels);
104    }
105
106    fn add_values<T: DataType>(&mut self, encoding: Encoding, values: &[T::T]) {
107        assert!(
108            self.num_values >= values.len() as u32,
109            "num_values: {}, values.len(): {}",
110            self.num_values,
111            values.len()
112        );
113        // Create test column descriptor.
114        let desc = {
115            let ty = SchemaType::primitive_type_builder("t", T::get_physical_type())
116                .with_length(self.type_width)
117                .build()
118                .unwrap();
119            Arc::new(ColumnDescriptor::new(
120                Arc::new(ty),
121                0,
122                0,
123                ColumnPath::new(vec![]),
124            ))
125        };
126        self.encoding = Some(encoding);
127        let mut encoder: Box<dyn Encoder<T>> =
128            get_encoder::<T>(encoding, &desc).expect("get_encoder() should be OK");
129        encoder.put(values).expect("put() should be OK");
130        let encoded_values = encoder
131            .flush_buffer()
132            .expect("consume_buffer() should be OK");
133        self.buffer.extend_from_slice(&encoded_values);
134    }
135
136    fn add_indices(&mut self, indices: Bytes) {
137        self.encoding = Some(Encoding::RLE_DICTIONARY);
138        self.buffer.extend_from_slice(&indices);
139    }
140
141    fn consume(self) -> Page {
142        if self.datapage_v2 {
143            Page::DataPageV2 {
144                buf: Bytes::from(self.buffer),
145                num_values: self.num_values,
146                encoding: self.encoding.unwrap(),
147                num_nulls: 0, /* set to dummy value - don't need this when reading
148                               * data page */
149                num_rows: self.num_values, /* num_rows only needs in skip_records, now we not support skip REPEATED field,
150                                            * so we can assume num_values == num_rows */
151                def_levels_byte_len: self.def_levels_byte_len,
152                rep_levels_byte_len: self.rep_levels_byte_len,
153                is_compressed: false,
154                statistics: None, // set to None, we do not need statistics for tests
155            }
156        } else {
157            Page::DataPage {
158                buf: Bytes::from(self.buffer),
159                num_values: self.num_values,
160                encoding: self.encoding.unwrap(),
161                def_level_encoding: Encoding::RLE,
162                rep_level_encoding: Encoding::RLE,
163                statistics: None, // set to None, we do not need statistics for tests
164            }
165        }
166    }
167}
168
169/// A utility page reader which stores pages in memory.
170pub struct InMemoryPageReader<P: Iterator<Item = Page>> {
171    page_iter: Peekable<P>,
172}
173
174impl<P: Iterator<Item = Page>> InMemoryPageReader<P> {
175    pub fn new(pages: impl IntoIterator<Item = Page, IntoIter = P>) -> Self {
176        Self {
177            page_iter: pages.into_iter().peekable(),
178        }
179    }
180}
181
182impl<P: Iterator<Item = Page> + Send> PageReader for InMemoryPageReader<P> {
183    fn get_next_page(&mut self) -> Result<Option<Page>> {
184        Ok(self.page_iter.next())
185    }
186
187    fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> {
188        if let Some(x) = self.page_iter.peek() {
189            match x {
190                Page::DataPage { num_values, .. } => Ok(Some(PageMetadata {
191                    num_rows: None,
192                    num_levels: Some(*num_values as _),
193                    is_dict: false,
194                })),
195                Page::DataPageV2 {
196                    num_rows,
197                    num_values,
198                    ..
199                } => Ok(Some(PageMetadata {
200                    num_rows: Some(*num_rows as _),
201                    num_levels: Some(*num_values as _),
202                    is_dict: false,
203                })),
204                Page::DictionaryPage { .. } => Ok(Some(PageMetadata {
205                    num_rows: None,
206                    num_levels: None,
207                    is_dict: true,
208                })),
209            }
210        } else {
211            Ok(None)
212        }
213    }
214
215    fn skip_next_page(&mut self) -> Result<()> {
216        self.page_iter.next();
217        Ok(())
218    }
219}
220
221impl<P: Iterator<Item = Page> + Send> Iterator for InMemoryPageReader<P> {
222    type Item = Result<Page>;
223
224    fn next(&mut self) -> Option<Self::Item> {
225        self.get_next_page().transpose()
226    }
227}
228
229/// A utility page iterator which stores page readers in memory, used for tests.
230#[derive(Clone)]
231pub struct InMemoryPageIterator<I: Iterator<Item = Vec<Page>>> {
232    page_reader_iter: I,
233}
234
235impl<I: Iterator<Item = Vec<Page>>> InMemoryPageIterator<I> {
236    pub fn new(pages: impl IntoIterator<Item = Vec<Page>, IntoIter = I>) -> Self {
237        Self {
238            page_reader_iter: pages.into_iter(),
239        }
240    }
241}
242
243impl<I: Iterator<Item = Vec<Page>>> Iterator for InMemoryPageIterator<I> {
244    type Item = Result<Box<dyn PageReader>>;
245
246    fn next(&mut self) -> Option<Self::Item> {
247        self.page_reader_iter
248            .next()
249            .map(|x| Ok(Box::new(InMemoryPageReader::new(x)) as Box<dyn PageReader>))
250    }
251}
252
253impl<I: Iterator<Item = Vec<Page>> + Send> PageIterator for InMemoryPageIterator<I> {}