parquet/file/page_index/
offset_index.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`OffsetIndexMetaData`] structure holding decoded [`OffsetIndex`] information
19//!
20//! [`OffsetIndex`]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
21
22use std::io::Write;
23
24use crate::parquet_thrift::{
25    ElementType, FieldType, ReadThrift, ThriftCompactInputProtocol, ThriftCompactOutputProtocol,
26    WriteThrift, WriteThriftField, read_thrift_vec,
27};
28use crate::{
29    errors::{ParquetError, Result},
30    thrift_struct,
31};
32
33thrift_struct!(
34/// Page location information for [`OffsetIndexMetaData`]
35pub struct PageLocation {
36  /// Offset of the page in the file
37  1: required i64 offset
38  /// Size of the page, including header. Sum of compressed_page_size and header
39  2: required i32 compressed_page_size
40  /// Index within the RowGroup of the first row of the page. When an
41  /// OffsetIndex is present, pages must begin on row boundaries
42  /// (repetition_level = 0).
43  3: required i64 first_row_index
44}
45);
46
47thrift_struct!(
48/// [`OffsetIndex`] information for a column chunk. Contains offsets and sizes for each page
49/// in the chunk. Optionally stores fully decoded page sizes for BYTE_ARRAY columns.
50///
51/// See [`ParquetOffsetIndex`] for more information.
52///
53/// [`ParquetOffsetIndex`]: crate::file::metadata::ParquetOffsetIndex
54/// [`OffsetIndex`]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
55pub struct OffsetIndexMetaData {
56  /// Vector of [`PageLocation`] objects, one per page in the chunk.
57  1: required list<PageLocation> page_locations
58  /// Optional vector of unencoded page sizes, one per page in the chunk.
59  /// Only defined for BYTE_ARRAY columns.
60  2: optional list<i64> unencoded_byte_array_data_bytes
61}
62);
63
64impl OffsetIndexMetaData {
65    /// Vector of [`PageLocation`] objects, one per page in the chunk.
66    pub fn page_locations(&self) -> &Vec<PageLocation> {
67        &self.page_locations
68    }
69
70    /// Optional vector of unencoded page sizes, one per page in the chunk. Only defined
71    /// for BYTE_ARRAY columns.
72    pub fn unencoded_byte_array_data_bytes(&self) -> Option<&Vec<i64>> {
73        self.unencoded_byte_array_data_bytes.as_ref()
74    }
75
76    // Fast-path read of offset index. This works because we expect all field deltas to be 1,
77    // and there's no nesting beyond PageLocation, so no need to save the last field id. Like
78    // read_page_locations(), this will fail if absolute field id's are used.
79    pub(super) fn try_from_fast<'a, R: ThriftCompactInputProtocol<'a>>(
80        prot: &mut R,
81    ) -> Result<Self> {
82        // Offset index is a struct with 2 fields. First field is an array of PageLocations,
83        // the second an optional array of i64.
84
85        // read field 1 header, then list header, then vec of PageLocations
86        let (field_type, delta) = prot.read_field_header()?;
87        if delta != 1 || field_type != FieldType::List as u8 {
88            return Err(general_err!("error reading OffsetIndex::page_locations"));
89        }
90
91        // we have to do this manually because we want to use the fast PageLocation decoder
92        let list_ident = prot.read_list_begin()?;
93        let mut page_locations = Vec::with_capacity(list_ident.size as usize);
94        for _ in 0..list_ident.size {
95            page_locations.push(read_page_location(prot)?);
96        }
97
98        let mut unencoded_byte_array_data_bytes: Option<Vec<i64>> = None;
99
100        // read second field...if it's Stop we're done
101        let (mut field_type, delta) = prot.read_field_header()?;
102        if field_type == FieldType::List as u8 {
103            if delta != 1 {
104                return Err(general_err!(
105                    "encountered unknown field while reading OffsetIndex"
106                ));
107            }
108            let vec = read_thrift_vec::<i64, R>(&mut *prot)?;
109            unencoded_byte_array_data_bytes = Some(vec);
110
111            // this one should be Stop
112            (field_type, _) = prot.read_field_header()?;
113        }
114
115        if field_type != FieldType::Stop as u8 {
116            return Err(general_err!(
117                "encountered unknown field while reading OffsetIndex"
118            ));
119        }
120
121        Ok(Self {
122            page_locations,
123            unencoded_byte_array_data_bytes,
124        })
125    }
126}
127
128// hand coding this one because it is very time critical
129
130// Note: this will fail if the fields are either out of order, or if a suboptimal
131// encoder doesn't use field deltas.
132fn read_page_location<'a, R: ThriftCompactInputProtocol<'a>>(prot: &mut R) -> Result<PageLocation> {
133    // there are 3 fields, all mandatory, so all field deltas should be 1
134    let (field_type, delta) = prot.read_field_header()?;
135    if delta != 1 || field_type != FieldType::I64 as u8 {
136        return Err(general_err!("error reading PageLocation::offset"));
137    }
138    let offset = prot.read_i64()?;
139
140    let (field_type, delta) = prot.read_field_header()?;
141    if delta != 1 || field_type != FieldType::I32 as u8 {
142        return Err(general_err!(
143            "error reading PageLocation::compressed_page_size"
144        ));
145    }
146    let compressed_page_size = prot.read_i32()?;
147
148    let (field_type, delta) = prot.read_field_header()?;
149    if delta != 1 || field_type != FieldType::I64 as u8 {
150        return Err(general_err!("error reading PageLocation::first_row_index"));
151    }
152    let first_row_index = prot.read_i64()?;
153
154    // read end of struct...return error if there are unknown fields present
155    let (field_type, _) = prot.read_field_header()?;
156    if field_type != FieldType::Stop as u8 {
157        return Err(general_err!("unexpected field in PageLocation"));
158    }
159
160    Ok(PageLocation {
161        offset,
162        compressed_page_size,
163        first_row_index,
164    })
165}
166
167#[cfg(test)]
168mod tests {
169    use super::*;
170    use crate::parquet_thrift::tests::test_roundtrip;
171
172    #[test]
173    fn test_offset_idx_roundtrip() {
174        let page_locations = [
175            PageLocation {
176                offset: 0,
177                compressed_page_size: 10,
178                first_row_index: 0,
179            },
180            PageLocation {
181                offset: 10,
182                compressed_page_size: 20,
183                first_row_index: 100,
184            },
185        ]
186        .to_vec();
187        let unenc = [0i64, 100i64].to_vec();
188
189        test_roundtrip(OffsetIndexMetaData {
190            page_locations: page_locations.clone(),
191            unencoded_byte_array_data_bytes: Some(unenc),
192        });
193        test_roundtrip(OffsetIndexMetaData {
194            page_locations,
195            unencoded_byte_array_data_bytes: None,
196        });
197    }
198}