parquet_layout/
parquet-layout.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Binary that prints the physical layout of a parquet file
19//!
20//! NOTE: due to this binary's use of the deprecated [`parquet::format`] module, it
21//! will no longer be maintained, and will likely be removed in the future.
22//! Alternatives to this include [`parquet-cli`] and [`parquet-viewer`].
23//!
24//! # Install
25//!
26//! `parquet-layout` can be installed using `cargo`:
27//! ```
28//! cargo install parquet --features=cli
29//! ```
30//! After this `parquet-layout` should be available:
31//! ```
32//! parquet-layout XYZ.parquet
33//! ```
34//!
35//! The binary can also be built from the source code and run as follows:
36//! ```
37//! cargo run --features=cli --bin parquet-layout XYZ.parquet
38//! ```
39//!
40//! [`parquet-cli`]: https://github.com/apache/parquet-java/tree/master/parquet-cli
41//! [`parquet-viewer`]: https://github.com/xiangpenghao/parquet-viewer
42
43use std::fs::File;
44use std::io::Read;
45
46use clap::Parser;
47use parquet::file::metadata::ParquetMetaDataReader;
48use serde::Serialize;
49use thrift::protocol::TCompactInputProtocol;
50
51use parquet::basic::Compression;
52use parquet::errors::Result;
53use parquet::file::reader::ChunkReader;
54#[allow(deprecated)]
55use parquet::format::PageHeader;
56use parquet::thrift::TSerializable;
57
58#[derive(Serialize, Debug)]
59struct ParquetFile {
60    row_groups: Vec<RowGroup>,
61}
62
63#[derive(Serialize, Debug)]
64struct RowGroup {
65    columns: Vec<ColumnChunk>,
66    row_count: i64,
67}
68
69#[derive(Serialize, Debug)]
70struct ColumnChunk {
71    path: String,
72    has_offset_index: bool,
73    has_column_index: bool,
74    has_bloom_filter: bool,
75    pages: Vec<Page>,
76}
77
78#[derive(Serialize, Debug)]
79struct Page {
80    compression: Option<&'static str>,
81    encoding: &'static str,
82    page_type: &'static str,
83    offset: u64,
84    compressed_bytes: i32,
85    uncompressed_bytes: i32,
86    header_bytes: i32,
87    num_values: i32,
88}
89
90#[allow(deprecated)]
91fn do_layout<C: ChunkReader>(reader: &C) -> Result<ParquetFile> {
92    let metadata = ParquetMetaDataReader::new().parse_and_finish(reader)?;
93    let schema = metadata.file_metadata().schema_descr();
94
95    let row_groups = (0..metadata.num_row_groups())
96        .map(|row_group_idx| {
97            let row_group = metadata.row_group(row_group_idx);
98            let columns = row_group
99                .columns()
100                .iter()
101                .zip(schema.columns())
102                .map(|(column, column_schema)| {
103                    let compression = compression(column.compression());
104                    let mut pages = vec![];
105
106                    let mut start = column
107                        .dictionary_page_offset()
108                        .unwrap_or_else(|| column.data_page_offset())
109                        as u64;
110
111                    let end = start + column.compressed_size() as u64;
112                    while start != end {
113                        let (header_len, header) = read_page_header(reader, start)?;
114                        if let Some(dictionary) = header.dictionary_page_header {
115                            pages.push(Page {
116                                compression,
117                                encoding: encoding(dictionary.encoding.0),
118                                page_type: "dictionary",
119                                offset: start,
120                                compressed_bytes: header.compressed_page_size,
121                                uncompressed_bytes: header.uncompressed_page_size,
122                                header_bytes: header_len as _,
123                                num_values: dictionary.num_values,
124                            })
125                        } else if let Some(data_page) = header.data_page_header {
126                            pages.push(Page {
127                                compression,
128                                encoding: encoding(data_page.encoding.0),
129                                page_type: "data_page_v1",
130                                offset: start,
131                                compressed_bytes: header.compressed_page_size,
132                                uncompressed_bytes: header.uncompressed_page_size,
133                                header_bytes: header_len as _,
134                                num_values: data_page.num_values,
135                            })
136                        } else if let Some(data_page) = header.data_page_header_v2 {
137                            let is_compressed = data_page.is_compressed.unwrap_or(true);
138
139                            pages.push(Page {
140                                compression: compression.filter(|_| is_compressed),
141                                encoding: encoding(data_page.encoding.0),
142                                page_type: "data_page_v2",
143                                offset: start,
144                                compressed_bytes: header.compressed_page_size,
145                                uncompressed_bytes: header.uncompressed_page_size,
146                                header_bytes: header_len as _,
147                                num_values: data_page.num_values,
148                            })
149                        }
150                        start += header.compressed_page_size as u64 + header_len as u64;
151                    }
152
153                    Ok(ColumnChunk {
154                        path: column_schema.path().parts().join("."),
155                        has_offset_index: column.offset_index_offset().is_some(),
156                        has_column_index: column.column_index_offset().is_some(),
157                        has_bloom_filter: column.bloom_filter_offset().is_some(),
158                        pages,
159                    })
160                })
161                .collect::<Result<Vec<_>>>()?;
162
163            Ok(RowGroup {
164                columns,
165                row_count: row_group.num_rows(),
166            })
167        })
168        .collect::<Result<Vec<_>>>()?;
169
170    Ok(ParquetFile { row_groups })
171}
172
173/// Reads the page header at `offset` from `reader`, returning
174/// both the `PageHeader` and its length in bytes
175#[allow(deprecated)]
176fn read_page_header<C: ChunkReader>(reader: &C, offset: u64) -> Result<(usize, PageHeader)> {
177    struct TrackedRead<R>(R, usize);
178
179    impl<R: Read> Read for TrackedRead<R> {
180        fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
181            let v = self.0.read(buf)?;
182            self.1 += v;
183            Ok(v)
184        }
185    }
186
187    let input = reader.get_read(offset)?;
188    let mut tracked = TrackedRead(input, 0);
189    let mut prot = TCompactInputProtocol::new(&mut tracked);
190    let header = PageHeader::read_from_in_protocol(&mut prot)?;
191    Ok((tracked.1, header))
192}
193
194/// Returns a string representation for a given compression
195fn compression(compression: Compression) -> Option<&'static str> {
196    match compression {
197        Compression::UNCOMPRESSED => None,
198        Compression::SNAPPY => Some("snappy"),
199        Compression::GZIP(_) => Some("gzip"),
200        Compression::LZO => Some("lzo"),
201        Compression::BROTLI(_) => Some("brotli"),
202        Compression::LZ4 => Some("lz4"),
203        Compression::ZSTD(_) => Some("zstd"),
204        Compression::LZ4_RAW => Some("lz4_raw"),
205    }
206}
207
208/// Returns a string representation for a given encoding
209fn encoding(encoding: i32) -> &'static str {
210    match encoding {
211        0 => "plain",
212        2 => "plain_dictionary",
213        3 => "rle",
214        #[allow(deprecated)]
215        4 => "bit_packed",
216        5 => "delta_binary_packed",
217        6 => "delta_length_byte_array",
218        7 => "delta_byte_array",
219        8 => "rle_dictionary",
220        9 => "byte_stream_split",
221        _ => "unknown",
222    }
223}
224
225#[derive(Debug, Parser)]
226#[clap(author, version, about("Prints the physical layout of a parquet file"), long_about = None)]
227struct Args {
228    #[clap(help("Path to a parquet file"))]
229    file: String,
230}
231
232impl Args {
233    fn run(&self) -> Result<()> {
234        let file = File::open(&self.file)?;
235        let layout = do_layout(&file)?;
236
237        let out = std::io::stdout();
238        let writer = out.lock();
239
240        serde_json::to_writer_pretty(writer, &layout).unwrap();
241        Ok(())
242    }
243}
244
245fn main() -> Result<()> {
246    Args::parse().run()
247}