parquet_layout/
parquet-layout.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Binary that prints the physical layout of a parquet file
19//!
20//! NOTE: due to this binary's use of the deprecated [`parquet::format`] module, it
21//! will no longer be maintained, and will likely be removed in the future.
22//! Alternatives to this include [`parquet-cli`] and [`parquet-viewer`].
23//!
24//! # Install
25//!
26//! `parquet-layout` can be installed using `cargo`:
27//! ```
28//! cargo install parquet --features=cli
29//! ```
30//! After this `parquet-layout` should be available:
31//! ```
32//! parquet-layout XYZ.parquet
33//! ```
34//!
35//! The binary can also be built from the source code and run as follows:
36//! ```
37//! cargo run --features=cli --bin parquet-layout XYZ.parquet
38//! ```
39//!
40//! [`parquet-cli`]: https://github.com/apache/parquet-java/tree/master/parquet-cli
41//! [`parquet-viewer`]: https://github.com/xiangpenghao/parquet-viewer
42
43use std::fs::File;
44use std::io::Read;
45
46use clap::Parser;
47use parquet::file::metadata::ParquetMetaDataReader;
48use serde::Serialize;
49use thrift::protocol::TCompactInputProtocol;
50
51use parquet::basic::Compression;
52use parquet::errors::Result;
53use parquet::file::reader::ChunkReader;
54#[allow(deprecated)]
55use parquet::format::PageHeader;
56use parquet::thrift::TSerializable;
57
58#[derive(Serialize, Debug)]
59struct Index {
60    offset: i64,
61    length: Option<i32>,
62}
63
64#[derive(Serialize, Debug)]
65struct Footer {
66    metadata_size: Option<usize>,
67}
68
69#[derive(Serialize, Debug)]
70struct ParquetFile {
71    row_groups: Vec<RowGroup>,
72    footer: Footer,
73}
74
75#[derive(Serialize, Debug)]
76struct RowGroup {
77    columns: Vec<ColumnChunk>,
78    row_count: i64,
79}
80
81#[derive(Serialize, Debug)]
82struct ColumnChunk {
83    path: String,
84    has_offset_index: bool,
85    has_column_index: bool,
86    has_bloom_filter: bool,
87    offset_index: Option<Index>,
88    column_index: Option<Index>,
89    bloom_filter: Option<Index>,
90    pages: Vec<Page>,
91}
92
93#[derive(Serialize, Debug)]
94struct Page {
95    compression: Option<&'static str>,
96    encoding: &'static str,
97    page_type: &'static str,
98    offset: u64,
99    compressed_bytes: i32,
100    uncompressed_bytes: i32,
101    header_bytes: i32,
102    num_values: i32,
103}
104
105#[allow(deprecated)]
106fn do_layout<C: ChunkReader>(reader: &C) -> Result<ParquetFile> {
107    let mut metadata_reader = ParquetMetaDataReader::new();
108    metadata_reader.try_parse(reader)?;
109    let metadata_size = metadata_reader.metadata_size();
110    let metadata = metadata_reader.finish()?;
111    let schema = metadata.file_metadata().schema_descr();
112
113    let row_groups = (0..metadata.num_row_groups())
114        .map(|row_group_idx| {
115            let row_group = metadata.row_group(row_group_idx);
116            let columns = row_group
117                .columns()
118                .iter()
119                .zip(schema.columns())
120                .map(|(column, column_schema)| {
121                    let compression = compression(column.compression());
122                    let mut pages = vec![];
123
124                    let mut start = column
125                        .dictionary_page_offset()
126                        .unwrap_or_else(|| column.data_page_offset())
127                        as u64;
128
129                    let end = start + column.compressed_size() as u64;
130                    while start != end {
131                        let (header_len, header) = read_page_header(reader, start)?;
132                        if let Some(dictionary) = header.dictionary_page_header {
133                            pages.push(Page {
134                                compression,
135                                encoding: encoding(dictionary.encoding.0),
136                                page_type: "dictionary",
137                                offset: start,
138                                compressed_bytes: header.compressed_page_size,
139                                uncompressed_bytes: header.uncompressed_page_size,
140                                header_bytes: header_len as _,
141                                num_values: dictionary.num_values,
142                            })
143                        } else if let Some(data_page) = header.data_page_header {
144                            pages.push(Page {
145                                compression,
146                                encoding: encoding(data_page.encoding.0),
147                                page_type: "data_page_v1",
148                                offset: start,
149                                compressed_bytes: header.compressed_page_size,
150                                uncompressed_bytes: header.uncompressed_page_size,
151                                header_bytes: header_len as _,
152                                num_values: data_page.num_values,
153                            })
154                        } else if let Some(data_page) = header.data_page_header_v2 {
155                            let is_compressed = data_page.is_compressed.unwrap_or(true);
156
157                            pages.push(Page {
158                                compression: compression.filter(|_| is_compressed),
159                                encoding: encoding(data_page.encoding.0),
160                                page_type: "data_page_v2",
161                                offset: start,
162                                compressed_bytes: header.compressed_page_size,
163                                uncompressed_bytes: header.uncompressed_page_size,
164                                header_bytes: header_len as _,
165                                num_values: data_page.num_values,
166                            })
167                        }
168                        start += header.compressed_page_size as u64 + header_len as u64;
169                    }
170
171                    Ok(ColumnChunk {
172                        path: column_schema.path().parts().join("."),
173                        has_offset_index: column.offset_index_offset().is_some(),
174                        has_column_index: column.column_index_offset().is_some(),
175                        has_bloom_filter: column.bloom_filter_offset().is_some(),
176                        offset_index: column.offset_index_offset().map(|offset| Index {
177                            offset,
178                            length: column.offset_index_length(),
179                        }),
180                        column_index: column.column_index_offset().map(|offset| Index {
181                            offset,
182                            length: column.column_index_length(),
183                        }),
184                        bloom_filter: column.bloom_filter_offset().map(|offset| Index {
185                            offset,
186                            length: column.bloom_filter_length(),
187                        }),
188                        pages,
189                    })
190                })
191                .collect::<Result<Vec<_>>>()?;
192
193            Ok(RowGroup {
194                columns,
195                row_count: row_group.num_rows(),
196            })
197        })
198        .collect::<Result<Vec<_>>>()?;
199
200    Ok(ParquetFile {
201        row_groups,
202        footer: Footer { metadata_size },
203    })
204}
205
206/// Reads the page header at `offset` from `reader`, returning
207/// both the `PageHeader` and its length in bytes
208#[allow(deprecated)]
209fn read_page_header<C: ChunkReader>(reader: &C, offset: u64) -> Result<(usize, PageHeader)> {
210    struct TrackedRead<R>(R, usize);
211
212    impl<R: Read> Read for TrackedRead<R> {
213        fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
214            let v = self.0.read(buf)?;
215            self.1 += v;
216            Ok(v)
217        }
218    }
219
220    let input = reader.get_read(offset)?;
221    let mut tracked = TrackedRead(input, 0);
222    let mut prot = TCompactInputProtocol::new(&mut tracked);
223    let header = PageHeader::read_from_in_protocol(&mut prot)?;
224    Ok((tracked.1, header))
225}
226
227/// Returns a string representation for a given compression
228fn compression(compression: Compression) -> Option<&'static str> {
229    match compression {
230        Compression::UNCOMPRESSED => None,
231        Compression::SNAPPY => Some("snappy"),
232        Compression::GZIP(_) => Some("gzip"),
233        Compression::LZO => Some("lzo"),
234        Compression::BROTLI(_) => Some("brotli"),
235        Compression::LZ4 => Some("lz4"),
236        Compression::ZSTD(_) => Some("zstd"),
237        Compression::LZ4_RAW => Some("lz4_raw"),
238    }
239}
240
241/// Returns a string representation for a given encoding
242fn encoding(encoding: i32) -> &'static str {
243    match encoding {
244        0 => "plain",
245        2 => "plain_dictionary",
246        3 => "rle",
247        #[allow(deprecated)]
248        4 => "bit_packed",
249        5 => "delta_binary_packed",
250        6 => "delta_length_byte_array",
251        7 => "delta_byte_array",
252        8 => "rle_dictionary",
253        9 => "byte_stream_split",
254        _ => "unknown",
255    }
256}
257
258#[derive(Debug, Parser)]
259#[clap(author, version, about("Prints the physical layout of a parquet file"), long_about = None)]
260struct Args {
261    #[clap(help("Path to a parquet file"))]
262    file: String,
263}
264
265impl Args {
266    fn run(&self) -> Result<()> {
267        let file = File::open(&self.file)?;
268        let layout = do_layout(&file)?;
269
270        let out = std::io::stdout();
271        let writer = out.lock();
272
273        serde_json::to_writer_pretty(writer, &layout).unwrap();
274        Ok(())
275    }
276}
277
278fn main() -> Result<()> {
279    Args::parse().run()
280}