1use std::fs::File;
44use std::io::Read;
45
46use clap::Parser;
47use parquet::file::metadata::ParquetMetaDataReader;
48use serde::Serialize;
49use thrift::protocol::TCompactInputProtocol;
50
51use parquet::basic::Compression;
52use parquet::errors::Result;
53use parquet::file::reader::ChunkReader;
54#[allow(deprecated)]
55use parquet::format::PageHeader;
56use parquet::thrift::TSerializable;
57
58#[derive(Serialize, Debug)]
59struct Index {
60 offset: i64,
61 length: Option<i32>,
62}
63
64#[derive(Serialize, Debug)]
65struct Footer {
66 metadata_size: Option<usize>,
67}
68
69#[derive(Serialize, Debug)]
70struct ParquetFile {
71 row_groups: Vec<RowGroup>,
72 footer: Footer,
73}
74
75#[derive(Serialize, Debug)]
76struct RowGroup {
77 columns: Vec<ColumnChunk>,
78 row_count: i64,
79}
80
81#[derive(Serialize, Debug)]
82struct ColumnChunk {
83 path: String,
84 has_offset_index: bool,
85 has_column_index: bool,
86 has_bloom_filter: bool,
87 offset_index: Option<Index>,
88 column_index: Option<Index>,
89 bloom_filter: Option<Index>,
90 pages: Vec<Page>,
91}
92
93#[derive(Serialize, Debug)]
94struct Page {
95 compression: Option<&'static str>,
96 encoding: &'static str,
97 page_type: &'static str,
98 offset: u64,
99 compressed_bytes: i32,
100 uncompressed_bytes: i32,
101 header_bytes: i32,
102 num_values: i32,
103}
104
105#[allow(deprecated)]
106fn do_layout<C: ChunkReader>(reader: &C) -> Result<ParquetFile> {
107 let mut metadata_reader = ParquetMetaDataReader::new();
108 metadata_reader.try_parse(reader)?;
109 let metadata_size = metadata_reader.metadata_size();
110 let metadata = metadata_reader.finish()?;
111 let schema = metadata.file_metadata().schema_descr();
112
113 let row_groups = (0..metadata.num_row_groups())
114 .map(|row_group_idx| {
115 let row_group = metadata.row_group(row_group_idx);
116 let columns = row_group
117 .columns()
118 .iter()
119 .zip(schema.columns())
120 .map(|(column, column_schema)| {
121 let compression = compression(column.compression());
122 let mut pages = vec![];
123
124 let mut start = column
125 .dictionary_page_offset()
126 .unwrap_or_else(|| column.data_page_offset())
127 as u64;
128
129 let end = start + column.compressed_size() as u64;
130 while start != end {
131 let (header_len, header) = read_page_header(reader, start)?;
132 if let Some(dictionary) = header.dictionary_page_header {
133 pages.push(Page {
134 compression,
135 encoding: encoding(dictionary.encoding.0),
136 page_type: "dictionary",
137 offset: start,
138 compressed_bytes: header.compressed_page_size,
139 uncompressed_bytes: header.uncompressed_page_size,
140 header_bytes: header_len as _,
141 num_values: dictionary.num_values,
142 })
143 } else if let Some(data_page) = header.data_page_header {
144 pages.push(Page {
145 compression,
146 encoding: encoding(data_page.encoding.0),
147 page_type: "data_page_v1",
148 offset: start,
149 compressed_bytes: header.compressed_page_size,
150 uncompressed_bytes: header.uncompressed_page_size,
151 header_bytes: header_len as _,
152 num_values: data_page.num_values,
153 })
154 } else if let Some(data_page) = header.data_page_header_v2 {
155 let is_compressed = data_page.is_compressed.unwrap_or(true);
156
157 pages.push(Page {
158 compression: compression.filter(|_| is_compressed),
159 encoding: encoding(data_page.encoding.0),
160 page_type: "data_page_v2",
161 offset: start,
162 compressed_bytes: header.compressed_page_size,
163 uncompressed_bytes: header.uncompressed_page_size,
164 header_bytes: header_len as _,
165 num_values: data_page.num_values,
166 })
167 }
168 start += header.compressed_page_size as u64 + header_len as u64;
169 }
170
171 Ok(ColumnChunk {
172 path: column_schema.path().parts().join("."),
173 has_offset_index: column.offset_index_offset().is_some(),
174 has_column_index: column.column_index_offset().is_some(),
175 has_bloom_filter: column.bloom_filter_offset().is_some(),
176 offset_index: column.offset_index_offset().map(|offset| Index {
177 offset,
178 length: column.offset_index_length(),
179 }),
180 column_index: column.column_index_offset().map(|offset| Index {
181 offset,
182 length: column.column_index_length(),
183 }),
184 bloom_filter: column.bloom_filter_offset().map(|offset| Index {
185 offset,
186 length: column.bloom_filter_length(),
187 }),
188 pages,
189 })
190 })
191 .collect::<Result<Vec<_>>>()?;
192
193 Ok(RowGroup {
194 columns,
195 row_count: row_group.num_rows(),
196 })
197 })
198 .collect::<Result<Vec<_>>>()?;
199
200 Ok(ParquetFile {
201 row_groups,
202 footer: Footer { metadata_size },
203 })
204}
205
206#[allow(deprecated)]
209fn read_page_header<C: ChunkReader>(reader: &C, offset: u64) -> Result<(usize, PageHeader)> {
210 struct TrackedRead<R>(R, usize);
211
212 impl<R: Read> Read for TrackedRead<R> {
213 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
214 let v = self.0.read(buf)?;
215 self.1 += v;
216 Ok(v)
217 }
218 }
219
220 let input = reader.get_read(offset)?;
221 let mut tracked = TrackedRead(input, 0);
222 let mut prot = TCompactInputProtocol::new(&mut tracked);
223 let header = PageHeader::read_from_in_protocol(&mut prot)?;
224 Ok((tracked.1, header))
225}
226
227fn compression(compression: Compression) -> Option<&'static str> {
229 match compression {
230 Compression::UNCOMPRESSED => None,
231 Compression::SNAPPY => Some("snappy"),
232 Compression::GZIP(_) => Some("gzip"),
233 Compression::LZO => Some("lzo"),
234 Compression::BROTLI(_) => Some("brotli"),
235 Compression::LZ4 => Some("lz4"),
236 Compression::ZSTD(_) => Some("zstd"),
237 Compression::LZ4_RAW => Some("lz4_raw"),
238 }
239}
240
241fn encoding(encoding: i32) -> &'static str {
243 match encoding {
244 0 => "plain",
245 2 => "plain_dictionary",
246 3 => "rle",
247 #[allow(deprecated)]
248 4 => "bit_packed",
249 5 => "delta_binary_packed",
250 6 => "delta_length_byte_array",
251 7 => "delta_byte_array",
252 8 => "rle_dictionary",
253 9 => "byte_stream_split",
254 _ => "unknown",
255 }
256}
257
258#[derive(Debug, Parser)]
259#[clap(author, version, about("Prints the physical layout of a parquet file"), long_about = None)]
260struct Args {
261 #[clap(help("Path to a parquet file"))]
262 file: String,
263}
264
265impl Args {
266 fn run(&self) -> Result<()> {
267 let file = File::open(&self.file)?;
268 let layout = do_layout(&file)?;
269
270 let out = std::io::stdout();
271 let writer = out.lock();
272
273 serde_json::to_writer_pretty(writer, &layout).unwrap();
274 Ok(())
275 }
276}
277
278fn main() -> Result<()> {
279 Args::parse().run()
280}