parquet/variant.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! ⚠️ Experimental Support for reading and writing [`Variant`]s to / from Parquet files ⚠️
19//!
20//! This is a 🚧 Work In Progress
21//!
22//! Note: Requires the `variant_experimental` feature of the `parquet` crate to be enabled.
23//!
24//! # Features
25//! * Representation of [`Variant`], and [`VariantArray`] for working with
26//! Variant values (see [`parquet_variant`] for more details)
27//! * Kernels for working with arrays of Variant values
28//! such as conversion between `Variant` and JSON, and shredding/unshredding
29//! (see [`parquet_variant_compute`] for more details)
30//!
31//! # Example: Writing a Parquet file with Variant column
32//! ```rust
33//! # use parquet::variant::{VariantArray, VariantType, VariantArrayBuilder, VariantBuilderExt};
34//! # use std::sync::Arc;
35//! # use arrow_array::{Array, ArrayRef, RecordBatch};
36//! # use arrow_schema::{DataType, Field, Schema};
37//! # use parquet::arrow::ArrowWriter;
38//! # fn main() -> Result<(), parquet::errors::ParquetError> {
39//! // Use the VariantArrayBuilder to build a VariantArray
40//! let mut builder = VariantArrayBuilder::new(3);
41//! builder.new_object().with_field("name", "Alice").finish(); // row 1: {"name": "Alice"}
42//! builder.append_value("such wow"); // row 2: "such wow" (a string)
43//! let array = builder.build();
44//!
45//! // Since VariantArray is an ExtensionType, it needs to be converted
46//! // to an ArrayRef and Field with the appropriate metadata
47//! // before it can be written to a Parquet file
48//! let field = array.field("data");
49//! let array = ArrayRef::from(array);
50//! // create a RecordBatch with the VariantArray
51//! let schema = Schema::new(vec![field]);
52//! let batch = RecordBatch::try_new(Arc::new(schema), vec![array])?;
53//!
54//! // Now you can write the RecordBatch to the Parquet file, as normal
55//! let file = std::fs::File::create("variant.parquet")?;
56//! let mut writer = ArrowWriter::try_new(file, batch.schema(), None)?;
57//! writer.write(&batch)?;
58//! writer.close()?;
59//!
60//! # std::fs::remove_file("variant.parquet")?;
61//! # Ok(())
62//! # }
63//! ```
64//!
65//! # Example: Writing JSON into a Parquet file with Variant column
66//! ```rust
67//! # use std::sync::Arc;
68//! # use arrow_array::{ArrayRef, RecordBatch, StringArray};
69//! # use arrow_schema::Schema;
70//! # use parquet::variant::{json_to_variant, VariantArray};
71//! # use parquet::arrow::ArrowWriter;
72//! # fn main() -> Result<(), parquet::errors::ParquetError> {
73//! // Create an array of JSON strings, simulating a column of JSON data
74//! let input_array: ArrayRef = Arc::new(StringArray::from(vec![
75//! Some(r#"{"name": "Alice", "age": 30}"#),
76//! Some(r#"{"name": "Bob", "age": 25, "address": {"city": "New York"}}"#),
77//! None,
78//! Some("{}"),
79//! ]));
80//!
81//! // Convert the JSON strings to a VariantArray
82//! let array: VariantArray = json_to_variant(&input_array)?;
83//! // create a RecordBatch with the VariantArray
84//! let schema = Schema::new(vec![array.field("data")]);
85//! let batch = RecordBatch::try_new(Arc::new(schema), vec![ArrayRef::from(array)])?;
86//!
87//! // write the RecordBatch to a Parquet file as normal
88//! let file = std::fs::File::create("variant-json.parquet")?;
89//! let mut writer = ArrowWriter::try_new(file, batch.schema(), None)?;
90//! writer.write(&batch)?;
91//! writer.close()?;
92//! # std::fs::remove_file("variant-json.parquet")?;
93//! # Ok(())
94//! # }
95//! ```
96//!
97//! # Example: Reading a Parquet file with Variant column
98//!
99//! Use the [`VariantType`] extension type to find the Variant column:
100//!
101//! ```
102//! # use std::sync::Arc;
103//! # use std::path::PathBuf;
104//! # use arrow_array::{ArrayRef, RecordBatch, RecordBatchReader};
105//! # use parquet::variant::{Variant, VariantArray, VariantType};
106//! # use parquet::arrow::arrow_reader::ArrowReaderBuilder;
107//! # fn main() -> Result<(), parquet::errors::ParquetError> {
108//! # use arrow_array::StructArray;
109//! # fn file_path() -> PathBuf { // return a testing file path
110//! # PathBuf::from(arrow::util::test_util::parquet_test_data())
111//! # .join("..")
112//! # .join("shredded_variant")
113//! # .join("case-075.parquet")
114//! # }
115//! // Read the Parquet file using standard Arrow Parquet reader.
116//! // Note this file has 2 columns: "id", "var", and the "var" column
117// // contains a variant that looks like this:
118// // "Variant(metadata=VariantMetadata(dict={}), value=Variant(type=STRING, value=iceberg))"
119//! let file = std::fs::File::open(file_path())?;
120//! let mut reader = ArrowReaderBuilder::try_new(file)?.build()?;
121//!
122//! // You can check if a column contains a Variant using
123//! // the VariantType extension type
124//! let schema = reader.schema();
125//! let field = schema.field_with_name("var")?;
126//! assert!(field.try_extension_type::<VariantType>().is_ok());
127//!
128//! // The reader will yield RecordBatches with a StructArray
129//! // to convert them to VariantArray, use VariantArray::try_new
130//! let batch = reader.next().unwrap().unwrap();
131//!
132//! let col = batch.column_by_name("var").unwrap();
133//! let var_array = VariantArray::try_new(col)?;
134//! assert_eq!(var_array.len(), 1);
135//! let var_value: Variant = var_array.value(0);
136//! assert_eq!(var_value, Variant::from("iceberg")); // the value in case-075.parquet
137//! # Ok(())
138//! # }
139//! ```
140pub use parquet_variant::*;
141pub use parquet_variant_compute::*;
142
143#[cfg(test)]
144mod tests {
145 use crate::arrow::arrow_reader::ArrowReaderBuilder;
146 use crate::arrow::ArrowWriter;
147 use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
148 use crate::file::reader::ChunkReader;
149 use arrow::util::test_util::parquet_test_data;
150 use arrow_array::{ArrayRef, RecordBatch};
151 use arrow_schema::Schema;
152 use bytes::Bytes;
153 use parquet_variant::{Variant, VariantBuilderExt};
154 use parquet_variant_compute::{VariantArray, VariantArrayBuilder, VariantType};
155 use std::path::PathBuf;
156 use std::sync::Arc;
157
158 #[test]
159 fn roundtrip_basic() {
160 roundtrip(variant_array());
161 }
162
163 /// Ensure a file with Variant LogicalType, written by another writer in
164 /// parquet-testing, can be read as a VariantArray
165 #[test]
166 fn read_logical_type() {
167 // Note: case-075 2 columns ("id", "var")
168 // The variant looks like this:
169 // "Variant(metadata=VariantMetadata(dict={}), value=Variant(type=STRING, value=iceberg))"
170 let batch = read_shredded_variant_test_case("case-075.parquet");
171
172 assert_variant_metadata(&batch, "var");
173 let var_column = batch.column_by_name("var").expect("expected var column");
174 let var_array =
175 VariantArray::try_new(&var_column).expect("expected var column to be a VariantArray");
176
177 // verify the value
178 assert_eq!(var_array.len(), 1);
179 assert!(var_array.is_valid(0));
180 let var_value = var_array.value(0);
181 assert_eq!(var_value, Variant::from("iceberg"));
182 }
183
184 /// Writes a variant to a parquet file and ensures the parquet logical type
185 /// annotation is correct
186 #[test]
187 fn write_logical_type() {
188 let array = variant_array();
189 let batch = variant_array_to_batch(array);
190 let buffer = write_to_buffer(&batch);
191
192 // read the parquet file's metadata and verify the logical type
193 let metadata = read_metadata(&Bytes::from(buffer));
194 let schema = metadata.file_metadata().schema_descr();
195 let fields = schema.root_schema().get_fields();
196 assert_eq!(fields.len(), 1);
197 let field = &fields[0];
198 assert_eq!(field.name(), "data");
199 // data should have been written with the Variant logical type
200 assert_eq!(
201 field.get_basic_info().logical_type(),
202 Some(crate::basic::LogicalType::Variant)
203 );
204 }
205
206 /// Return a VariantArray with 3 rows:
207 ///
208 /// 1. `{"name": "Alice"}`
209 /// 2. `"such wow"` (a string)
210 /// 3. `null`
211 fn variant_array() -> VariantArray {
212 let mut builder = VariantArrayBuilder::new(3);
213 // row 1: {"name": "Alice"}
214 builder.new_object().with_field("name", "Alice").finish();
215 // row 2: "such wow" (a string)
216 builder.append_value("such wow");
217 // row 3: null
218 builder.append_null();
219 builder.build()
220 }
221
222 /// Writes a VariantArray to a parquet file and reads it back, verifying that
223 /// the data is the same
224 fn roundtrip(array: VariantArray) {
225 let source_batch = variant_array_to_batch(array);
226 assert_variant_metadata(&source_batch, "data");
227
228 let buffer = write_to_buffer(&source_batch);
229 let result_batch = read_to_batch(Bytes::from(buffer));
230 assert_variant_metadata(&result_batch, "data");
231 assert_eq!(result_batch, source_batch); // NB this also checks the schemas
232 }
233
234 /// creates a RecordBatch with a single column "data" from a VariantArray,
235 fn variant_array_to_batch(array: VariantArray) -> RecordBatch {
236 let field = array.field("data");
237 let schema = Schema::new(vec![field]);
238 RecordBatch::try_new(Arc::new(schema), vec![ArrayRef::from(array)]).unwrap()
239 }
240
241 /// writes a RecordBatch to memory buffer and returns the buffer
242 fn write_to_buffer(batch: &RecordBatch) -> Vec<u8> {
243 let mut buffer = vec![];
244 let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
245 writer.write(batch).unwrap();
246 writer.close().unwrap();
247 buffer
248 }
249
250 /// Reads the Parquet metadata
251 fn read_metadata<T: ChunkReader + 'static>(input: &T) -> ParquetMetaData {
252 let mut reader = ParquetMetaDataReader::new();
253 reader.try_parse(input).unwrap();
254 reader.finish().unwrap()
255 }
256
257 /// Reads a RecordBatch from a reader (e.g. Vec or File)
258 fn read_to_batch<T: ChunkReader + 'static>(reader: T) -> RecordBatch {
259 let reader = ArrowReaderBuilder::try_new(reader)
260 .unwrap()
261 .build()
262 .unwrap();
263 let mut batches: Vec<RecordBatch> = reader.collect::<Result<Vec<_>, _>>().unwrap();
264 assert_eq!(batches.len(), 1);
265 batches.swap_remove(0)
266 }
267
268 /// Verifies the variant metadata is present in the schema for the specified
269 /// field name.
270 fn assert_variant_metadata(batch: &RecordBatch, field_name: &str) {
271 let schema = batch.schema();
272 let field = schema
273 .field_with_name(field_name)
274 .expect("could not find expected field");
275
276 // explicitly check the metadata so it is clear in the tests what the
277 // names are
278 let metadata_value = field
279 .metadata()
280 .get("ARROW:extension:name")
281 .expect("metadata does not exist");
282
283 assert_eq!(metadata_value, "arrow.parquet.variant");
284
285 // verify that `VariantType` also correctly finds the metadata
286 field
287 .try_extension_type::<VariantType>()
288 .expect("VariantExtensionType should be readable");
289 }
290
291 /// Read the specified test case filename from parquet-testing
292 /// See parquet-testing/shredded_variant/cases.json for more details
293 fn read_shredded_variant_test_case(name: &str) -> RecordBatch {
294 let case_file = PathBuf::from(parquet_test_data())
295 .join("..") // go up from data/ to parquet-testing/
296 .join("shredded_variant")
297 .join(name);
298 let case_file = std::fs::File::open(case_file).unwrap();
299 read_to_batch(case_file)
300 }
301}