parquet/
variant.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! ⚠️ Experimental Support for reading and writing [`Variant`]s to / from Parquet files ⚠️
19//!
20//! This is a 🚧 Work In Progress
21//!
22//! Note: Requires the `variant_experimental` feature of the `parquet` crate to be enabled.
23//!
24//! # Features
25//! * Representation of [`Variant`], and [`VariantArray`] for working with
26//!   Variant values (see [`parquet_variant`] for more details)
27//! * Kernels for working with arrays of Variant values
28//!   such as conversion between `Variant` and JSON, and shredding/unshredding
29//!   (see [`parquet_variant_compute`] for more details)
30//!
31//! # Example: Writing a Parquet file with Variant column
32//! ```rust
33//! # use parquet::variant::{VariantArray, VariantType, VariantArrayBuilder, VariantBuilderExt};
34//! # use std::sync::Arc;
35//! # use arrow_array::{Array, ArrayRef, RecordBatch};
36//! # use arrow_schema::{DataType, Field, Schema};
37//! # use parquet::arrow::ArrowWriter;
38//! # fn main() -> Result<(), parquet::errors::ParquetError> {
39//!  // Use the VariantArrayBuilder to build a VariantArray
40//!  let mut builder = VariantArrayBuilder::new(3);
41//!  builder.new_object().with_field("name", "Alice").finish(); // row 1: {"name": "Alice"}
42//!  builder.append_value("such wow"); // row 2: "such wow" (a string)
43//!  let array = builder.build();
44//!
45//!  // Since VariantArray is an ExtensionType, it needs to be converted
46//!  // to an ArrayRef and Field with the appropriate metadata
47//!  // before it can be written to a Parquet file
48//!  let field = array.field("data");
49//!  let array = ArrayRef::from(array);
50//!  // create a RecordBatch with the VariantArray
51//!  let schema = Schema::new(vec![field]);
52//!  let batch = RecordBatch::try_new(Arc::new(schema), vec![array])?;
53//!
54//!  // Now you can write the RecordBatch to the Parquet file, as normal
55//!  let file = std::fs::File::create("variant.parquet")?;
56//!  let mut writer = ArrowWriter::try_new(file, batch.schema(), None)?;
57//!  writer.write(&batch)?;
58//!  writer.close()?;
59//!
60//! # std::fs::remove_file("variant.parquet")?;
61//! # Ok(())
62//! # }
63//! ```
64//!
65//! # Example: Writing JSON into a Parquet file with Variant column
66//! ```rust
67//! # use std::sync::Arc;
68//! # use arrow_array::{ArrayRef, RecordBatch, StringArray};
69//! # use arrow_schema::Schema;
70//! # use parquet::variant::{json_to_variant, VariantArray};
71//! # use parquet::arrow::ArrowWriter;
72//! # fn main() -> Result<(), parquet::errors::ParquetError> {
73//!  // Create an array of JSON strings, simulating a column of JSON data
74//!  let input_array: ArrayRef = Arc::new(StringArray::from(vec![
75//!   Some(r#"{"name": "Alice", "age": 30}"#),
76//!   Some(r#"{"name": "Bob", "age": 25, "address": {"city": "New York"}}"#),
77//!   None,
78//!   Some("{}"),
79//!  ]));
80//!
81//!  // Convert the JSON strings to a VariantArray
82//!  let array: VariantArray = json_to_variant(&input_array)?;
83//!  // create a RecordBatch with the VariantArray
84//!  let schema = Schema::new(vec![array.field("data")]);
85//!  let batch = RecordBatch::try_new(Arc::new(schema), vec![ArrayRef::from(array)])?;
86//!
87//!  // write the RecordBatch to a Parquet file as normal
88//!  let file = std::fs::File::create("variant-json.parquet")?;
89//!  let mut writer = ArrowWriter::try_new(file, batch.schema(), None)?;
90//!  writer.write(&batch)?;
91//!  writer.close()?;
92//! # std::fs::remove_file("variant-json.parquet")?;
93//! # Ok(())
94//! # }
95//! ```
96//!
97//! # Example: Reading a Parquet file with Variant column
98//!
99//! Use the [`VariantType`] extension type to find the Variant column:
100//!
101//! ```
102//! # use std::sync::Arc;
103//! # use std::path::PathBuf;
104//! # use arrow_array::{ArrayRef, RecordBatch, RecordBatchReader};
105//! # use parquet::variant::{Variant, VariantArray, VariantType};
106//! # use parquet::arrow::arrow_reader::ArrowReaderBuilder;
107//! # fn main() -> Result<(), parquet::errors::ParquetError> {
108//! # use arrow_array::StructArray;
109//! # fn file_path() -> PathBuf { // return a testing file path
110//! #    PathBuf::from(arrow::util::test_util::parquet_test_data())
111//! #   .join("..")
112//! #   .join("shredded_variant")
113//! #   .join("case-075.parquet")
114//! # }
115//! // Read the Parquet file using standard Arrow Parquet reader.
116//! // Note this file has 2 columns: "id", "var", and the "var" column
117//  // contains a variant that looks like this:
118//  // "Variant(metadata=VariantMetadata(dict={}), value=Variant(type=STRING, value=iceberg))"
119//! let file = std::fs::File::open(file_path())?;
120//! let mut reader = ArrowReaderBuilder::try_new(file)?.build()?;
121//!
122//! // You can check if a column contains a Variant using
123//! // the VariantType extension type
124//! let schema = reader.schema();
125//! let field = schema.field_with_name("var")?;
126//! assert!(field.try_extension_type::<VariantType>().is_ok());
127//!
128//! // The reader will yield RecordBatches with a StructArray
129//! // to convert them to VariantArray, use VariantArray::try_new
130//! let batch = reader.next().unwrap().unwrap();
131//!
132//! let col = batch.column_by_name("var").unwrap();
133//! let var_array = VariantArray::try_new(col)?;
134//! assert_eq!(var_array.len(), 1);
135//! let var_value: Variant = var_array.value(0);
136//! assert_eq!(var_value, Variant::from("iceberg")); // the value in case-075.parquet
137//! # Ok(())
138//! # }
139//! ```
140pub use parquet_variant::*;
141pub use parquet_variant_compute::*;
142
143#[cfg(test)]
144mod tests {
145    use crate::arrow::arrow_reader::ArrowReaderBuilder;
146    use crate::arrow::ArrowWriter;
147    use crate::file::metadata::{ParquetMetaData, ParquetMetaDataReader};
148    use crate::file::reader::ChunkReader;
149    use arrow::util::test_util::parquet_test_data;
150    use arrow_array::{ArrayRef, RecordBatch};
151    use arrow_schema::Schema;
152    use bytes::Bytes;
153    use parquet_variant::{Variant, VariantBuilderExt};
154    use parquet_variant_compute::{VariantArray, VariantArrayBuilder, VariantType};
155    use std::path::PathBuf;
156    use std::sync::Arc;
157
158    #[test]
159    fn roundtrip_basic() {
160        roundtrip(variant_array());
161    }
162
163    /// Ensure a file with Variant LogicalType, written by another writer in
164    /// parquet-testing, can be read as a VariantArray
165    #[test]
166    fn read_logical_type() {
167        // Note: case-075 2 columns ("id", "var")
168        // The variant looks like this:
169        // "Variant(metadata=VariantMetadata(dict={}), value=Variant(type=STRING, value=iceberg))"
170        let batch = read_shredded_variant_test_case("case-075.parquet");
171
172        assert_variant_metadata(&batch, "var");
173        let var_column = batch.column_by_name("var").expect("expected var column");
174        let var_array =
175            VariantArray::try_new(&var_column).expect("expected var column to be a VariantArray");
176
177        // verify the value
178        assert_eq!(var_array.len(), 1);
179        assert!(var_array.is_valid(0));
180        let var_value = var_array.value(0);
181        assert_eq!(var_value, Variant::from("iceberg"));
182    }
183
184    /// Writes a variant to a parquet file and ensures the parquet logical type
185    /// annotation is correct
186    #[test]
187    fn write_logical_type() {
188        let array = variant_array();
189        let batch = variant_array_to_batch(array);
190        let buffer = write_to_buffer(&batch);
191
192        // read the parquet file's metadata and verify the logical type
193        let metadata = read_metadata(&Bytes::from(buffer));
194        let schema = metadata.file_metadata().schema_descr();
195        let fields = schema.root_schema().get_fields();
196        assert_eq!(fields.len(), 1);
197        let field = &fields[0];
198        assert_eq!(field.name(), "data");
199        // data should have been written with the Variant logical type
200        assert_eq!(
201            field.get_basic_info().logical_type(),
202            Some(crate::basic::LogicalType::Variant)
203        );
204    }
205
206    /// Return a VariantArray with 3 rows:
207    ///
208    /// 1. `{"name": "Alice"}`
209    /// 2. `"such wow"` (a string)
210    /// 3. `null`
211    fn variant_array() -> VariantArray {
212        let mut builder = VariantArrayBuilder::new(3);
213        // row 1: {"name": "Alice"}
214        builder.new_object().with_field("name", "Alice").finish();
215        // row 2: "such wow" (a string)
216        builder.append_value("such wow");
217        // row 3: null
218        builder.append_null();
219        builder.build()
220    }
221
222    /// Writes a VariantArray to a parquet file and reads it back, verifying that
223    /// the data is the same
224    fn roundtrip(array: VariantArray) {
225        let source_batch = variant_array_to_batch(array);
226        assert_variant_metadata(&source_batch, "data");
227
228        let buffer = write_to_buffer(&source_batch);
229        let result_batch = read_to_batch(Bytes::from(buffer));
230        assert_variant_metadata(&result_batch, "data");
231        assert_eq!(result_batch, source_batch); // NB this also checks the schemas
232    }
233
234    /// creates a RecordBatch with a single column "data" from a VariantArray,
235    fn variant_array_to_batch(array: VariantArray) -> RecordBatch {
236        let field = array.field("data");
237        let schema = Schema::new(vec![field]);
238        RecordBatch::try_new(Arc::new(schema), vec![ArrayRef::from(array)]).unwrap()
239    }
240
241    /// writes a RecordBatch to memory buffer and returns the buffer
242    fn write_to_buffer(batch: &RecordBatch) -> Vec<u8> {
243        let mut buffer = vec![];
244        let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
245        writer.write(batch).unwrap();
246        writer.close().unwrap();
247        buffer
248    }
249
250    /// Reads the Parquet metadata
251    fn read_metadata<T: ChunkReader + 'static>(input: &T) -> ParquetMetaData {
252        let mut reader = ParquetMetaDataReader::new();
253        reader.try_parse(input).unwrap();
254        reader.finish().unwrap()
255    }
256
257    /// Reads a RecordBatch from a reader (e.g. Vec or File)
258    fn read_to_batch<T: ChunkReader + 'static>(reader: T) -> RecordBatch {
259        let reader = ArrowReaderBuilder::try_new(reader)
260            .unwrap()
261            .build()
262            .unwrap();
263        let mut batches: Vec<RecordBatch> = reader.collect::<Result<Vec<_>, _>>().unwrap();
264        assert_eq!(batches.len(), 1);
265        batches.swap_remove(0)
266    }
267
268    /// Verifies the variant metadata is present in the schema for the specified
269    /// field name.
270    fn assert_variant_metadata(batch: &RecordBatch, field_name: &str) {
271        let schema = batch.schema();
272        let field = schema
273            .field_with_name(field_name)
274            .expect("could not find expected field");
275
276        // explicitly check the metadata so it is clear in the tests what the
277        // names are
278        let metadata_value = field
279            .metadata()
280            .get("ARROW:extension:name")
281            .expect("metadata does not exist");
282
283        assert_eq!(metadata_value, "arrow.parquet.variant");
284
285        // verify that `VariantType` also correctly finds the metadata
286        field
287            .try_extension_type::<VariantType>()
288            .expect("VariantExtensionType should be readable");
289    }
290
291    /// Read the specified test case filename from parquet-testing
292    /// See parquet-testing/shredded_variant/cases.json for more details
293    fn read_shredded_variant_test_case(name: &str) -> RecordBatch {
294        let case_file = PathBuf::from(parquet_test_data())
295            .join("..") // go up from data/ to parquet-testing/
296            .join("shredded_variant")
297            .join(name);
298        let case_file = std::fs::File::open(case_file).unwrap();
299        read_to_batch(case_file)
300    }
301}