parquet_layout/
parquet-layout.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Binary that prints the physical layout of a parquet file
19//!
20//! # Install
21//!
22//! `parquet-layout` can be installed using `cargo`:
23//! ```
24//! cargo install parquet --features=cli
25//! ```
26//! After this `parquet-layout` should be available:
27//! ```
28//! parquet-layout XYZ.parquet
29//! ```
30//!
31//! The binary can also be built from the source code and run as follows:
32//! ```
33//! cargo run --features=cli --bin parquet-layout XYZ.parquet
34//! ```
35
36use std::fs::File;
37use std::io::Read;
38
39use clap::Parser;
40use parquet::file::metadata::ParquetMetaDataReader;
41use serde::Serialize;
42use thrift::protocol::TCompactInputProtocol;
43
44use parquet::basic::{Compression, Encoding};
45use parquet::errors::Result;
46use parquet::file::reader::ChunkReader;
47use parquet::format::PageHeader;
48use parquet::thrift::TSerializable;
49
50#[derive(Serialize, Debug)]
51struct ParquetFile {
52    row_groups: Vec<RowGroup>,
53}
54
55#[derive(Serialize, Debug)]
56struct RowGroup {
57    columns: Vec<ColumnChunk>,
58    row_count: i64,
59}
60
61#[derive(Serialize, Debug)]
62struct ColumnChunk {
63    path: String,
64    has_offset_index: bool,
65    has_column_index: bool,
66    has_bloom_filter: bool,
67    pages: Vec<Page>,
68}
69
70#[derive(Serialize, Debug)]
71struct Page {
72    compression: Option<&'static str>,
73    encoding: &'static str,
74    page_type: &'static str,
75    offset: u64,
76    compressed_bytes: i32,
77    uncompressed_bytes: i32,
78    header_bytes: i32,
79    num_values: i32,
80}
81
82fn do_layout<C: ChunkReader>(reader: &C) -> Result<ParquetFile> {
83    let metadata = ParquetMetaDataReader::new().parse_and_finish(reader)?;
84    let schema = metadata.file_metadata().schema_descr();
85
86    let row_groups = (0..metadata.num_row_groups())
87        .map(|row_group_idx| {
88            let row_group = metadata.row_group(row_group_idx);
89            let columns = row_group
90                .columns()
91                .iter()
92                .zip(schema.columns())
93                .map(|(column, column_schema)| {
94                    let compression = compression(column.compression());
95                    let mut pages = vec![];
96
97                    let mut start = column
98                        .dictionary_page_offset()
99                        .unwrap_or_else(|| column.data_page_offset())
100                        as u64;
101
102                    let end = start + column.compressed_size() as u64;
103                    while start != end {
104                        let (header_len, header) = read_page_header(reader, start)?;
105                        if let Some(dictionary) = header.dictionary_page_header {
106                            pages.push(Page {
107                                compression,
108                                encoding: encoding(dictionary.encoding),
109                                page_type: "dictionary",
110                                offset: start,
111                                compressed_bytes: header.compressed_page_size,
112                                uncompressed_bytes: header.uncompressed_page_size,
113                                header_bytes: header_len as _,
114                                num_values: dictionary.num_values,
115                            })
116                        } else if let Some(data_page) = header.data_page_header {
117                            pages.push(Page {
118                                compression,
119                                encoding: encoding(data_page.encoding),
120                                page_type: "data_page_v1",
121                                offset: start,
122                                compressed_bytes: header.compressed_page_size,
123                                uncompressed_bytes: header.uncompressed_page_size,
124                                header_bytes: header_len as _,
125                                num_values: data_page.num_values,
126                            })
127                        } else if let Some(data_page) = header.data_page_header_v2 {
128                            let is_compressed = data_page.is_compressed.unwrap_or(true);
129
130                            pages.push(Page {
131                                compression: compression.filter(|_| is_compressed),
132                                encoding: encoding(data_page.encoding),
133                                page_type: "data_page_v2",
134                                offset: start,
135                                compressed_bytes: header.compressed_page_size,
136                                uncompressed_bytes: header.uncompressed_page_size,
137                                header_bytes: header_len as _,
138                                num_values: data_page.num_values,
139                            })
140                        }
141                        start += header.compressed_page_size as u64 + header_len as u64;
142                    }
143
144                    Ok(ColumnChunk {
145                        path: column_schema.path().parts().join("."),
146                        has_offset_index: column.offset_index_offset().is_some(),
147                        has_column_index: column.column_index_offset().is_some(),
148                        has_bloom_filter: column.bloom_filter_offset().is_some(),
149                        pages,
150                    })
151                })
152                .collect::<Result<Vec<_>>>()?;
153
154            Ok(RowGroup {
155                columns,
156                row_count: row_group.num_rows(),
157            })
158        })
159        .collect::<Result<Vec<_>>>()?;
160
161    Ok(ParquetFile { row_groups })
162}
163
164/// Reads the page header at `offset` from `reader`, returning
165/// both the `PageHeader` and its length in bytes
166fn read_page_header<C: ChunkReader>(reader: &C, offset: u64) -> Result<(usize, PageHeader)> {
167    struct TrackedRead<R>(R, usize);
168
169    impl<R: Read> Read for TrackedRead<R> {
170        fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
171            let v = self.0.read(buf)?;
172            self.1 += v;
173            Ok(v)
174        }
175    }
176
177    let input = reader.get_read(offset)?;
178    let mut tracked = TrackedRead(input, 0);
179    let mut prot = TCompactInputProtocol::new(&mut tracked);
180    let header = PageHeader::read_from_in_protocol(&mut prot)?;
181    Ok((tracked.1, header))
182}
183
184/// Returns a string representation for a given compression
185fn compression(compression: Compression) -> Option<&'static str> {
186    match compression {
187        Compression::UNCOMPRESSED => None,
188        Compression::SNAPPY => Some("snappy"),
189        Compression::GZIP(_) => Some("gzip"),
190        Compression::LZO => Some("lzo"),
191        Compression::BROTLI(_) => Some("brotli"),
192        Compression::LZ4 => Some("lz4"),
193        Compression::ZSTD(_) => Some("zstd"),
194        Compression::LZ4_RAW => Some("lz4_raw"),
195    }
196}
197
198/// Returns a string representation for a given encoding
199fn encoding(encoding: parquet::format::Encoding) -> &'static str {
200    match Encoding::try_from(encoding) {
201        Ok(Encoding::PLAIN) => "plain",
202        Ok(Encoding::PLAIN_DICTIONARY) => "plain_dictionary",
203        Ok(Encoding::RLE) => "rle",
204        #[allow(deprecated)]
205        Ok(Encoding::BIT_PACKED) => "bit_packed",
206        Ok(Encoding::DELTA_BINARY_PACKED) => "delta_binary_packed",
207        Ok(Encoding::DELTA_LENGTH_BYTE_ARRAY) => "delta_length_byte_array",
208        Ok(Encoding::DELTA_BYTE_ARRAY) => "delta_byte_array",
209        Ok(Encoding::RLE_DICTIONARY) => "rle_dictionary",
210        Ok(Encoding::BYTE_STREAM_SPLIT) => "byte_stream_split",
211        Err(_) => "unknown",
212    }
213}
214
215#[derive(Debug, Parser)]
216#[clap(author, version, about("Prints the physical layout of a parquet file"), long_about = None)]
217struct Args {
218    #[clap(help("Path to a parquet file"))]
219    file: String,
220}
221
222impl Args {
223    fn run(&self) -> Result<()> {
224        let file = File::open(&self.file)?;
225        let layout = do_layout(&file)?;
226
227        let out = std::io::stdout();
228        let writer = out.lock();
229
230        serde_json::to_writer_pretty(writer, &layout).unwrap();
231        Ok(())
232    }
233}
234
235fn main() -> Result<()> {
236    Args::parse().run()
237}