parquet/variant.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! ⚠️ Experimental Support for reading and writing [`Variant`]s to / from Parquet files ⚠️
19//!
20//! This is a 🚧 Work In Progress
21//!
22//! Note: Requires the `variant_experimental` feature of the `parquet` crate to be enabled.
23//!
24//! # Features
25//! * Representation of [`Variant`], and [`VariantArray`] for working with
26//! Variant values (see [`parquet_variant`] for more details)
27//! * Kernels for working with arrays of Variant values
28//! such as conversion between `Variant` and JSON, and shredding/unshredding
29//! (see [`parquet_variant_compute`] for more details)
30//!
31//! # Example: Writing a Parquet file with Variant column
32//! ```rust
33//! # use parquet::variant::{VariantArray, VariantType, VariantArrayBuilder, VariantBuilderExt};
34//! # use std::sync::Arc;
35//! # use arrow_array::{Array, ArrayRef, RecordBatch};
36//! # use arrow_schema::{DataType, Field, Schema};
37//! # use parquet::arrow::ArrowWriter;
38//! # fn main() -> Result<(), parquet::errors::ParquetError> {
39//! // Use the VariantArrayBuilder to build a VariantArray
40//! let mut builder = VariantArrayBuilder::new(3);
41//! builder.new_object().with_field("name", "Alice").finish(); // row 1: {"name": "Alice"}
42//! builder.append_value("such wow"); // row 2: "such wow" (a string)
43//! let array = builder.build();
44//!
45//! // Since VariantArray is an ExtensionType, it needs to be converted
46//! // to an ArrayRef and Field with the appropriate metadata
47//! // before it can be written to a Parquet file
48//! let field = array.field("data");
49//! let array = ArrayRef::from(array);
50//! // create a RecordBatch with the VariantArray
51//! let schema = Schema::new(vec![field]);
52//! let batch = RecordBatch::try_new(Arc::new(schema), vec![array])?;
53//!
54//! // Now you can write the RecordBatch to the Parquet file, as normal
55//! let file = std::fs::File::create("variant.parquet")?;
56//! let mut writer = ArrowWriter::try_new(file, batch.schema(), None)?;
57//! writer.write(&batch)?;
58//! writer.close()?;
59//!
60//! # std::fs::remove_file("variant.parquet")?;
61//! # Ok(())
62//! # }
63//! ```
64//!
65//! # Example: Writing JSON into a Parquet file with Variant column
66//! ```rust
67//! # use std::sync::Arc;
68//! # use arrow_array::{ArrayRef, RecordBatch, StringArray};
69//! # use arrow_schema::Schema;
70//! # use parquet::variant::{json_to_variant, VariantArray};
71//! # use parquet::arrow::ArrowWriter;
72//! # fn main() -> Result<(), parquet::errors::ParquetError> {
73//! // Create an array of JSON strings, simulating a column of JSON data
74//! let input_array: ArrayRef = Arc::new(StringArray::from(vec![
75//! Some(r#"{"name": "Alice", "age": 30}"#),
76//! Some(r#"{"name": "Bob", "age": 25, "address": {"city": "New York"}}"#),
77//! None,
78//! Some("{}"),
79//! ]));
80//!
81//! // Convert the JSON strings to a VariantArray
82//! let array: VariantArray = json_to_variant(&input_array)?;
83//! // create a RecordBatch with the VariantArray
84//! let schema = Schema::new(vec![array.field("data")]);
85//! let batch = RecordBatch::try_new(Arc::new(schema), vec![ArrayRef::from(array)])?;
86//!
87//! // write the RecordBatch to a Parquet file as normal
88//! let file = std::fs::File::create("variant-json.parquet")?;
89//! let mut writer = ArrowWriter::try_new(file, batch.schema(), None)?;
90//! writer.write(&batch)?;
91//! writer.close()?;
92//! # std::fs::remove_file("variant-json.parquet")?;
93//! # Ok(())
94//! # }
95//! ```
96//!
97//! # Example: Reading a Parquet file with Variant column
98//!
99//! Use the [`VariantType`] extension type to find the Variant column:
100//!
101//! ```
102//! # use std::sync::Arc;
103//! # use std::path::PathBuf;
104//! # use arrow_array::{ArrayRef, RecordBatch, RecordBatchReader};
105//! # use parquet::variant::{Variant, VariantArray, VariantType};
106//! # use parquet::arrow::arrow_reader::ArrowReaderBuilder;
107//! # fn main() -> Result<(), parquet::errors::ParquetError> {
108//! # use arrow_array::StructArray;
109//! # fn file_path() -> PathBuf { // return a testing file path
110//! # PathBuf::from(arrow::util::test_util::parquet_test_data())
111//! # .join("..")
112//! # .join("shredded_variant")
113//! # .join("case-075.parquet")
114//! # }
115//! // Read the Parquet file using standard Arrow Parquet reader.
116//! // Note this file has 2 columns: "id", "var", and the "var" column
117// // contains a variant that looks like this:
118// // "Variant(metadata=VariantMetadata(dict={}), value=Variant(type=STRING, value=iceberg))"
119//! let file = std::fs::File::open(file_path())?;
120//! let mut reader = ArrowReaderBuilder::try_new(file)?.build()?;
121//!
122//! // You can check if a column contains a Variant using
123//! // the VariantType extension type
124//! let schema = reader.schema();
125//! let field = schema.field_with_name("var")?;
126//! assert!(field.try_extension_type::<VariantType>().is_ok());
127//!
128//! // The reader will yield RecordBatches with a StructArray
129//! // to convert them to VariantArray, use VariantArray::try_new
130//! let batch = reader.next().unwrap().unwrap();
131//!
132//! let col = batch.column_by_name("var").unwrap();
133//! let var_array = VariantArray::try_new(col)?;
134//! assert_eq!(var_array.len(), 1);
135//! let var_value: Variant = var_array.value(0);
136//! assert_eq!(var_value, Variant::from("iceberg")); // the value in case-075.parquet
137//! # Ok(())
138//! # }
139//! ```
140pub use parquet_variant::*;
141pub use parquet_variant_compute::*;
142
143#[cfg(test)]
144mod tests {
145 use crate::arrow::ArrowWriter;
146 use crate::arrow::arrow_reader::ArrowReaderBuilder;
147 use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
148 use crate::file::reader::ChunkReader;
149 use arrow::util::test_util::parquet_test_data;
150 use arrow_array::{ArrayRef, RecordBatch};
151 use arrow_schema::Schema;
152 use bytes::Bytes;
153 use parquet_variant::{Variant, VariantBuilderExt};
154 use parquet_variant_compute::{VariantArray, VariantArrayBuilder, VariantType};
155 use std::path::PathBuf;
156 use std::sync::Arc;
157
158 #[test]
159 fn roundtrip_basic() {
160 roundtrip(variant_array());
161 }
162
163 /// Ensure a file with Variant LogicalType, written by another writer in
164 /// parquet-testing, can be read as a VariantArray
165 #[test]
166 fn read_logical_type() {
167 // Note: case-075 2 columns ("id", "var")
168 // The variant looks like this:
169 // "Variant(metadata=VariantMetadata(dict={}), value=Variant(type=STRING, value=iceberg))"
170 let batch = read_shredded_variant_test_case("case-075.parquet");
171
172 assert_variant_metadata(&batch, "var");
173 let var_column = batch.column_by_name("var").expect("expected var column");
174 let var_array =
175 VariantArray::try_new(&var_column).expect("expected var column to be a VariantArray");
176
177 // verify the value
178 assert_eq!(var_array.len(), 1);
179 assert!(var_array.is_valid(0));
180 let var_value = var_array.value(0);
181 assert_eq!(var_value, Variant::from("iceberg"));
182 }
183
184 /// Writes a variant to a parquet file and ensures the parquet logical type
185 /// annotation is correct
186 #[test]
187 fn write_logical_type() {
188 let array = variant_array();
189 let batch = variant_array_to_batch(array);
190 let buffer = write_to_buffer(&batch);
191
192 // read the parquet file's metadata and verify the logical type
193 let metadata = read_metadata(&Bytes::from(buffer));
194 let schema = metadata.file_metadata().schema_descr();
195 let fields = schema.root_schema().get_fields();
196 assert_eq!(fields.len(), 1);
197 let field = &fields[0];
198 assert_eq!(field.name(), "data");
199 // data should have been written with the Variant logical type
200 assert_eq!(
201 field.get_basic_info().logical_type(),
202 Some(crate::basic::LogicalType::Variant {
203 specification_version: None
204 })
205 );
206 }
207
208 /// Return a VariantArray with 3 rows:
209 ///
210 /// 1. `{"name": "Alice"}`
211 /// 2. `"such wow"` (a string)
212 /// 3. `null`
213 fn variant_array() -> VariantArray {
214 let mut builder = VariantArrayBuilder::new(3);
215 // row 1: {"name": "Alice"}
216 builder.new_object().with_field("name", "Alice").finish();
217 // row 2: "such wow" (a string)
218 builder.append_value("such wow");
219 // row 3: null
220 builder.append_null();
221 builder.build()
222 }
223
224 /// Writes a VariantArray to a parquet file and reads it back, verifying that
225 /// the data is the same
226 fn roundtrip(array: VariantArray) {
227 let source_batch = variant_array_to_batch(array);
228 assert_variant_metadata(&source_batch, "data");
229
230 let buffer = write_to_buffer(&source_batch);
231 let result_batch = read_to_batch(Bytes::from(buffer));
232 assert_variant_metadata(&result_batch, "data");
233 assert_eq!(result_batch, source_batch); // NB this also checks the schemas
234 }
235
236 /// creates a RecordBatch with a single column "data" from a VariantArray,
237 fn variant_array_to_batch(array: VariantArray) -> RecordBatch {
238 let field = array.field("data");
239 let schema = Schema::new(vec![field]);
240 RecordBatch::try_new(Arc::new(schema), vec![ArrayRef::from(array)]).unwrap()
241 }
242
243 /// writes a RecordBatch to memory buffer and returns the buffer
244 fn write_to_buffer(batch: &RecordBatch) -> Vec<u8> {
245 let mut buffer = vec![];
246 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
247 writer.write(batch).unwrap();
248 writer.close().unwrap();
249 buffer
250 }
251
252 /// Reads the Parquet metadata
253 fn read_metadata<T: ChunkReader + 'static>(input: &T) -> ParquetMetaData {
254 let mut reader = ParquetMetaDataReader::new();
255 reader.try_parse(input).unwrap();
256 reader.finish().unwrap()
257 }
258
259 /// Reads a RecordBatch from a reader (e.g. Vec or File)
260 fn read_to_batch<T: ChunkReader + 'static>(reader: T) -> RecordBatch {
261 let reader = ArrowReaderBuilder::try_new(reader)
262 .unwrap()
263 .build()
264 .unwrap();
265 let mut batches: Vec<RecordBatch> = reader.collect::<Result<Vec<_>, _>>().unwrap();
266 assert_eq!(batches.len(), 1);
267 batches.swap_remove(0)
268 }
269
270 /// Verifies the variant metadata is present in the schema for the specified
271 /// field name.
272 fn assert_variant_metadata(batch: &RecordBatch, field_name: &str) {
273 let schema = batch.schema();
274 let field = schema
275 .field_with_name(field_name)
276 .expect("could not find expected field");
277
278 // explicitly check the metadata so it is clear in the tests what the
279 // names are
280 let metadata_value = field
281 .metadata()
282 .get("ARROW:extension:name")
283 .expect("metadata does not exist");
284
285 assert_eq!(metadata_value, "arrow.parquet.variant");
286
287 // verify that `VariantType` also correctly finds the metadata
288 field
289 .try_extension_type::<VariantType>()
290 .expect("VariantExtensionType should be readable");
291 }
292
293 /// Read the specified test case filename from parquet-testing
294 /// See parquet-testing/shredded_variant/cases.json for more details
295 fn read_shredded_variant_test_case(name: &str) -> RecordBatch {
296 let case_file = PathBuf::from(parquet_test_data())
297 .join("..") // go up from data/ to parquet-testing/
298 .join("shredded_variant")
299 .join(name);
300 let case_file = std::fs::File::open(case_file).unwrap();
301 read_to_batch(case_file)
302 }
303}