parquet_layout/
parquet-layout.rs1use std::fs::File;
44use std::io::Read;
45
46use clap::Parser;
47use parquet::file::metadata::ParquetMetaDataReader;
48use serde::Serialize;
49use thrift::protocol::TCompactInputProtocol;
50
51use parquet::basic::Compression;
52use parquet::errors::Result;
53use parquet::file::reader::ChunkReader;
54#[allow(deprecated)]
55use parquet::format::PageHeader;
56use parquet::thrift::TSerializable;
57
58#[derive(Serialize, Debug)]
59struct ParquetFile {
60 row_groups: Vec<RowGroup>,
61}
62
63#[derive(Serialize, Debug)]
64struct RowGroup {
65 columns: Vec<ColumnChunk>,
66 row_count: i64,
67}
68
69#[derive(Serialize, Debug)]
70struct ColumnChunk {
71 path: String,
72 has_offset_index: bool,
73 has_column_index: bool,
74 has_bloom_filter: bool,
75 pages: Vec<Page>,
76}
77
78#[derive(Serialize, Debug)]
79struct Page {
80 compression: Option<&'static str>,
81 encoding: &'static str,
82 page_type: &'static str,
83 offset: u64,
84 compressed_bytes: i32,
85 uncompressed_bytes: i32,
86 header_bytes: i32,
87 num_values: i32,
88}
89
90#[allow(deprecated)]
91fn do_layout<C: ChunkReader>(reader: &C) -> Result<ParquetFile> {
92 let metadata = ParquetMetaDataReader::new().parse_and_finish(reader)?;
93 let schema = metadata.file_metadata().schema_descr();
94
95 let row_groups = (0..metadata.num_row_groups())
96 .map(|row_group_idx| {
97 let row_group = metadata.row_group(row_group_idx);
98 let columns = row_group
99 .columns()
100 .iter()
101 .zip(schema.columns())
102 .map(|(column, column_schema)| {
103 let compression = compression(column.compression());
104 let mut pages = vec![];
105
106 let mut start = column
107 .dictionary_page_offset()
108 .unwrap_or_else(|| column.data_page_offset())
109 as u64;
110
111 let end = start + column.compressed_size() as u64;
112 while start != end {
113 let (header_len, header) = read_page_header(reader, start)?;
114 if let Some(dictionary) = header.dictionary_page_header {
115 pages.push(Page {
116 compression,
117 encoding: encoding(dictionary.encoding.0),
118 page_type: "dictionary",
119 offset: start,
120 compressed_bytes: header.compressed_page_size,
121 uncompressed_bytes: header.uncompressed_page_size,
122 header_bytes: header_len as _,
123 num_values: dictionary.num_values,
124 })
125 } else if let Some(data_page) = header.data_page_header {
126 pages.push(Page {
127 compression,
128 encoding: encoding(data_page.encoding.0),
129 page_type: "data_page_v1",
130 offset: start,
131 compressed_bytes: header.compressed_page_size,
132 uncompressed_bytes: header.uncompressed_page_size,
133 header_bytes: header_len as _,
134 num_values: data_page.num_values,
135 })
136 } else if let Some(data_page) = header.data_page_header_v2 {
137 let is_compressed = data_page.is_compressed.unwrap_or(true);
138
139 pages.push(Page {
140 compression: compression.filter(|_| is_compressed),
141 encoding: encoding(data_page.encoding.0),
142 page_type: "data_page_v2",
143 offset: start,
144 compressed_bytes: header.compressed_page_size,
145 uncompressed_bytes: header.uncompressed_page_size,
146 header_bytes: header_len as _,
147 num_values: data_page.num_values,
148 })
149 }
150 start += header.compressed_page_size as u64 + header_len as u64;
151 }
152
153 Ok(ColumnChunk {
154 path: column_schema.path().parts().join("."),
155 has_offset_index: column.offset_index_offset().is_some(),
156 has_column_index: column.column_index_offset().is_some(),
157 has_bloom_filter: column.bloom_filter_offset().is_some(),
158 pages,
159 })
160 })
161 .collect::<Result<Vec<_>>>()?;
162
163 Ok(RowGroup {
164 columns,
165 row_count: row_group.num_rows(),
166 })
167 })
168 .collect::<Result<Vec<_>>>()?;
169
170 Ok(ParquetFile { row_groups })
171}
172
173#[allow(deprecated)]
176fn read_page_header<C: ChunkReader>(reader: &C, offset: u64) -> Result<(usize, PageHeader)> {
177 struct TrackedRead<R>(R, usize);
178
179 impl<R: Read> Read for TrackedRead<R> {
180 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
181 let v = self.0.read(buf)?;
182 self.1 += v;
183 Ok(v)
184 }
185 }
186
187 let input = reader.get_read(offset)?;
188 let mut tracked = TrackedRead(input, 0);
189 let mut prot = TCompactInputProtocol::new(&mut tracked);
190 let header = PageHeader::read_from_in_protocol(&mut prot)?;
191 Ok((tracked.1, header))
192}
193
194fn compression(compression: Compression) -> Option<&'static str> {
196 match compression {
197 Compression::UNCOMPRESSED => None,
198 Compression::SNAPPY => Some("snappy"),
199 Compression::GZIP(_) => Some("gzip"),
200 Compression::LZO => Some("lzo"),
201 Compression::BROTLI(_) => Some("brotli"),
202 Compression::LZ4 => Some("lz4"),
203 Compression::ZSTD(_) => Some("zstd"),
204 Compression::LZ4_RAW => Some("lz4_raw"),
205 }
206}
207
208fn encoding(encoding: i32) -> &'static str {
210 match encoding {
211 0 => "plain",
212 2 => "plain_dictionary",
213 3 => "rle",
214 #[allow(deprecated)]
215 4 => "bit_packed",
216 5 => "delta_binary_packed",
217 6 => "delta_length_byte_array",
218 7 => "delta_byte_array",
219 8 => "rle_dictionary",
220 9 => "byte_stream_split",
221 _ => "unknown",
222 }
223}
224
225#[derive(Debug, Parser)]
226#[clap(author, version, about("Prints the physical layout of a parquet file"), long_about = None)]
227struct Args {
228 #[clap(help("Path to a parquet file"))]
229 file: String,
230}
231
232impl Args {
233 fn run(&self) -> Result<()> {
234 let file = File::open(&self.file)?;
235 let layout = do_layout(&file)?;
236
237 let out = std::io::stdout();
238 let writer = out.lock();
239
240 serde_json::to_writer_pretty(writer, &layout).unwrap();
241 Ok(())
242 }
243}
244
245fn main() -> Result<()> {
246 Args::parse().run()
247}