parquet_index/
parquet-index.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Binary that prints the [page index] of a parquet file
19//!
20//! # Install
21//!
22//! `parquet-layout` can be installed using `cargo`:
23//! ```
24//! cargo install parquet --features=cli
25//! ```
26//! After this `parquet-index` should be available:
27//! ```
28//! parquet-index XYZ.parquet COLUMN_NAME
29//! ```
30//!
31//! The binary can also be built from the source code and run as follows:
32//! ```
33//! cargo run --features=cli --bin parquet-index XYZ.parquet COLUMN_NAME
34//!
35//! [page index]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
36
37use clap::Parser;
38use parquet::data_type::ByteArray;
39use parquet::errors::{ParquetError, Result};
40use parquet::file::page_index::column_index::{
41    ByteArrayColumnIndex, ColumnIndexMetaData, PrimitiveColumnIndex,
42};
43use parquet::file::page_index::offset_index::{OffsetIndexMetaData, PageLocation};
44use parquet::file::reader::{FileReader, SerializedFileReader};
45use parquet::file::serialized_reader::ReadOptionsBuilder;
46use std::fs::File;
47
48#[derive(Debug, Parser)]
49#[clap(author, version, about("Prints the page index of a parquet file"), long_about = None)]
50struct Args {
51    #[clap(help("Path to a parquet file"))]
52    file: String,
53
54    #[clap(help("Column name to print"))]
55    column: String,
56}
57
58impl Args {
59    fn run(&self) -> Result<()> {
60        let file = File::open(&self.file)?;
61        let options = ReadOptionsBuilder::new().with_page_index().build();
62        let reader = SerializedFileReader::new_with_options(file, options)?;
63
64        let schema = reader.metadata().file_metadata().schema_descr();
65        let column_idx = schema
66            .columns()
67            .iter()
68            .position(|x| x.name() == self.column.as_str())
69            .ok_or_else(|| {
70                ParquetError::General(format!("Failed to find column {}", self.column))
71            })?;
72
73        // Column index data for all row groups and columns
74        let column_index = reader
75            .metadata()
76            .column_index()
77            .ok_or_else(|| ParquetError::General("Column index not found".to_string()))?;
78
79        // Offset index data for all row groups and columns
80        let offset_index = reader
81            .metadata()
82            .offset_index()
83            .ok_or_else(|| ParquetError::General("Offset index not found".to_string()))?;
84
85        // Iterate through each row group
86        for (row_group_idx, ((column_indices, offset_indices), row_group)) in column_index
87            .iter()
88            .zip(offset_index)
89            .zip(reader.metadata().row_groups())
90            .enumerate()
91        {
92            println!("Row Group: {row_group_idx}");
93            let offset_index = offset_indices.get(column_idx).ok_or_else(|| {
94                ParquetError::General(format!(
95                    "No offset index for row group {row_group_idx} column chunk {column_idx}"
96                ))
97            })?;
98
99            let row_counts =
100                compute_row_counts(offset_index.page_locations.as_slice(), row_group.num_rows());
101            match &column_indices[column_idx] {
102                ColumnIndexMetaData::NONE => println!("NO INDEX"),
103                ColumnIndexMetaData::BOOLEAN(v) => {
104                    print_index::<bool>(v, offset_index, &row_counts)?
105                }
106                ColumnIndexMetaData::INT32(v) => print_index(v, offset_index, &row_counts)?,
107                ColumnIndexMetaData::INT64(v) => print_index(v, offset_index, &row_counts)?,
108                ColumnIndexMetaData::INT96(v) => print_index(v, offset_index, &row_counts)?,
109                ColumnIndexMetaData::FLOAT(v) => print_index(v, offset_index, &row_counts)?,
110                ColumnIndexMetaData::DOUBLE(v) => print_index(v, offset_index, &row_counts)?,
111                ColumnIndexMetaData::BYTE_ARRAY(v) => {
112                    print_bytes_index(v, offset_index, &row_counts)?
113                }
114                ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(v) => {
115                    print_bytes_index(v, offset_index, &row_counts)?
116                }
117            }
118        }
119        Ok(())
120    }
121}
122
123/// Computes the number of rows in each page within a column chunk
124fn compute_row_counts(offset_index: &[PageLocation], rows: i64) -> Vec<i64> {
125    if offset_index.is_empty() {
126        return vec![];
127    }
128
129    let mut last = offset_index[0].first_row_index;
130    let mut out = Vec::with_capacity(offset_index.len());
131    for o in offset_index.iter().skip(1) {
132        out.push(o.first_row_index - last);
133        last = o.first_row_index;
134    }
135    out.push(rows - last);
136    out
137}
138
139/// Prints index information for a single column chunk
140fn print_index<T: std::fmt::Display>(
141    column_index: &PrimitiveColumnIndex<T>,
142    offset_index: &OffsetIndexMetaData,
143    row_counts: &[i64],
144) -> Result<()> {
145    if column_index.num_pages() as usize != offset_index.page_locations.len() {
146        return Err(ParquetError::General(format!(
147            "Index length mismatch, got {} and {}",
148            column_index.num_pages(),
149            offset_index.page_locations.len()
150        )));
151    }
152
153    for (idx, (((min, max), o), row_count)) in column_index
154        .min_values_iter()
155        .zip(column_index.max_values_iter())
156        .zip(offset_index.page_locations())
157        .zip(row_counts)
158        .enumerate()
159    {
160        print!(
161            "Page {:>5} at offset {:#010x} with length {:>10} and row count {:>10}",
162            idx, o.offset, o.compressed_page_size, row_count
163        );
164        match min {
165            Some(m) => print!(", min {m:>10}"),
166            None => print!(", min {:>10}", "NONE"),
167        }
168
169        match max {
170            Some(m) => print!(", max {m:>10}"),
171            None => print!(", max {:>10}", "NONE"),
172        }
173        println!()
174    }
175
176    Ok(())
177}
178
179fn print_bytes_index(
180    column_index: &ByteArrayColumnIndex,
181    offset_index: &OffsetIndexMetaData,
182    row_counts: &[i64],
183) -> Result<()> {
184    if column_index.num_pages() as usize != offset_index.page_locations.len() {
185        return Err(ParquetError::General(format!(
186            "Index length mismatch, got {} and {}",
187            column_index.num_pages(),
188            offset_index.page_locations.len()
189        )));
190    }
191
192    for (idx, (((min, max), o), row_count)) in column_index
193        .min_values_iter()
194        .zip(column_index.max_values_iter())
195        .zip(offset_index.page_locations())
196        .zip(row_counts)
197        .enumerate()
198    {
199        print!(
200            "Page {:>5} at offset {:#010x} with length {:>10} and row count {:>10}",
201            idx, o.offset, o.compressed_page_size, row_count
202        );
203        match min {
204            Some(m) => match String::from_utf8(m.to_vec()) {
205                Ok(s) => print!(", min {s:>10}"),
206                Err(_) => print!(", min {:>10}", ByteArray::from(m)),
207            },
208            None => print!(", min {:>10}", "NONE"),
209        }
210
211        match max {
212            Some(m) => match String::from_utf8(m.to_vec()) {
213                Ok(s) => print!(", max {s:>10}"),
214                Err(_) => print!(", min {:>10}", ByteArray::from(m)),
215            },
216            None => print!(", max {:>10}", "NONE"),
217        }
218        println!()
219    }
220
221    Ok(())
222}
223
224fn main() -> Result<()> {
225    Args::parse().run()
226}