parquet_layout/
parquet-layout.rs1use std::fs::File;
37use std::io::Read;
38
39use clap::Parser;
40use parquet::file::metadata::ParquetMetaDataReader;
41use serde::Serialize;
42use thrift::protocol::TCompactInputProtocol;
43
44use parquet::basic::{Compression, Encoding};
45use parquet::errors::Result;
46use parquet::file::reader::ChunkReader;
47use parquet::format::PageHeader;
48use parquet::thrift::TSerializable;
49
50#[derive(Serialize, Debug)]
51struct ParquetFile {
52 row_groups: Vec<RowGroup>,
53}
54
55#[derive(Serialize, Debug)]
56struct RowGroup {
57 columns: Vec<ColumnChunk>,
58 row_count: i64,
59}
60
61#[derive(Serialize, Debug)]
62struct ColumnChunk {
63 path: String,
64 has_offset_index: bool,
65 has_column_index: bool,
66 has_bloom_filter: bool,
67 pages: Vec<Page>,
68}
69
70#[derive(Serialize, Debug)]
71struct Page {
72 compression: Option<&'static str>,
73 encoding: &'static str,
74 page_type: &'static str,
75 offset: u64,
76 compressed_bytes: i32,
77 uncompressed_bytes: i32,
78 header_bytes: i32,
79 num_values: i32,
80}
81
82fn do_layout<C: ChunkReader>(reader: &C) -> Result<ParquetFile> {
83 let metadata = ParquetMetaDataReader::new().parse_and_finish(reader)?;
84 let schema = metadata.file_metadata().schema_descr();
85
86 let row_groups = (0..metadata.num_row_groups())
87 .map(|row_group_idx| {
88 let row_group = metadata.row_group(row_group_idx);
89 let columns = row_group
90 .columns()
91 .iter()
92 .zip(schema.columns())
93 .map(|(column, column_schema)| {
94 let compression = compression(column.compression());
95 let mut pages = vec![];
96
97 let mut start = column
98 .dictionary_page_offset()
99 .unwrap_or_else(|| column.data_page_offset())
100 as u64;
101
102 let end = start + column.compressed_size() as u64;
103 while start != end {
104 let (header_len, header) = read_page_header(reader, start)?;
105 if let Some(dictionary) = header.dictionary_page_header {
106 pages.push(Page {
107 compression,
108 encoding: encoding(dictionary.encoding),
109 page_type: "dictionary",
110 offset: start,
111 compressed_bytes: header.compressed_page_size,
112 uncompressed_bytes: header.uncompressed_page_size,
113 header_bytes: header_len as _,
114 num_values: dictionary.num_values,
115 })
116 } else if let Some(data_page) = header.data_page_header {
117 pages.push(Page {
118 compression,
119 encoding: encoding(data_page.encoding),
120 page_type: "data_page_v1",
121 offset: start,
122 compressed_bytes: header.compressed_page_size,
123 uncompressed_bytes: header.uncompressed_page_size,
124 header_bytes: header_len as _,
125 num_values: data_page.num_values,
126 })
127 } else if let Some(data_page) = header.data_page_header_v2 {
128 let is_compressed = data_page.is_compressed.unwrap_or(true);
129
130 pages.push(Page {
131 compression: compression.filter(|_| is_compressed),
132 encoding: encoding(data_page.encoding),
133 page_type: "data_page_v2",
134 offset: start,
135 compressed_bytes: header.compressed_page_size,
136 uncompressed_bytes: header.uncompressed_page_size,
137 header_bytes: header_len as _,
138 num_values: data_page.num_values,
139 })
140 }
141 start += header.compressed_page_size as u64 + header_len as u64;
142 }
143
144 Ok(ColumnChunk {
145 path: column_schema.path().parts().join("."),
146 has_offset_index: column.offset_index_offset().is_some(),
147 has_column_index: column.column_index_offset().is_some(),
148 has_bloom_filter: column.bloom_filter_offset().is_some(),
149 pages,
150 })
151 })
152 .collect::<Result<Vec<_>>>()?;
153
154 Ok(RowGroup {
155 columns,
156 row_count: row_group.num_rows(),
157 })
158 })
159 .collect::<Result<Vec<_>>>()?;
160
161 Ok(ParquetFile { row_groups })
162}
163
164fn read_page_header<C: ChunkReader>(reader: &C, offset: u64) -> Result<(usize, PageHeader)> {
167 struct TrackedRead<R>(R, usize);
168
169 impl<R: Read> Read for TrackedRead<R> {
170 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
171 let v = self.0.read(buf)?;
172 self.1 += v;
173 Ok(v)
174 }
175 }
176
177 let input = reader.get_read(offset)?;
178 let mut tracked = TrackedRead(input, 0);
179 let mut prot = TCompactInputProtocol::new(&mut tracked);
180 let header = PageHeader::read_from_in_protocol(&mut prot)?;
181 Ok((tracked.1, header))
182}
183
184fn compression(compression: Compression) -> Option<&'static str> {
186 match compression {
187 Compression::UNCOMPRESSED => None,
188 Compression::SNAPPY => Some("snappy"),
189 Compression::GZIP(_) => Some("gzip"),
190 Compression::LZO => Some("lzo"),
191 Compression::BROTLI(_) => Some("brotli"),
192 Compression::LZ4 => Some("lz4"),
193 Compression::ZSTD(_) => Some("zstd"),
194 Compression::LZ4_RAW => Some("lz4_raw"),
195 }
196}
197
198fn encoding(encoding: parquet::format::Encoding) -> &'static str {
200 match Encoding::try_from(encoding) {
201 Ok(Encoding::PLAIN) => "plain",
202 Ok(Encoding::PLAIN_DICTIONARY) => "plain_dictionary",
203 Ok(Encoding::RLE) => "rle",
204 #[allow(deprecated)]
205 Ok(Encoding::BIT_PACKED) => "bit_packed",
206 Ok(Encoding::DELTA_BINARY_PACKED) => "delta_binary_packed",
207 Ok(Encoding::DELTA_LENGTH_BYTE_ARRAY) => "delta_length_byte_array",
208 Ok(Encoding::DELTA_BYTE_ARRAY) => "delta_byte_array",
209 Ok(Encoding::RLE_DICTIONARY) => "rle_dictionary",
210 Ok(Encoding::BYTE_STREAM_SPLIT) => "byte_stream_split",
211 Err(_) => "unknown",
212 }
213}
214
215#[derive(Debug, Parser)]
216#[clap(author, version, about("Prints the physical layout of a parquet file"), long_about = None)]
217struct Args {
218 #[clap(help("Path to a parquet file"))]
219 file: String,
220}
221
222impl Args {
223 fn run(&self) -> Result<()> {
224 let file = File::open(&self.file)?;
225 let layout = do_layout(&file)?;
226
227 let out = std::io::stdout();
228 let writer = out.lock();
229
230 serde_json::to_writer_pretty(writer, &layout).unwrap();
231 Ok(())
232 }
233}
234
235fn main() -> Result<()> {
236 Args::parse().run()
237}