parquet_variant_compute/
to_json.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Module for transforming a batch of Variants represented as
19//! STRUCT<metadata: BINARY, value: BINARY> into a batch of JSON strings.
20
21use arrow::array::{Array, ArrayRef, BinaryArray, BooleanBufferBuilder, StringArray, StructArray};
22use arrow::buffer::{Buffer, NullBuffer, OffsetBuffer, ScalarBuffer};
23use arrow::datatypes::DataType;
24use arrow_schema::ArrowError;
25use parquet_variant::Variant;
26use parquet_variant_json::variant_to_json;
27
28/// Transform a batch of Variant represented as STRUCT<metadata: BINARY, value: BINARY> to a batch
29/// of JSON strings where nulls are preserved. The JSON strings in the input must be valid.
30pub fn batch_variant_to_json_string(input: &ArrayRef) -> Result<StringArray, ArrowError> {
31    let struct_array = input
32        .as_any()
33        .downcast_ref::<StructArray>()
34        .ok_or_else(|| ArrowError::CastError("Expected StructArray as input".into()))?;
35
36    // Validate field types
37    let data_type = struct_array.data_type();
38    match data_type {
39        DataType::Struct(inner_fields) => {
40            if inner_fields.len() != 2
41                || inner_fields[0].data_type() != &DataType::Binary
42                || inner_fields[1].data_type() != &DataType::Binary
43            {
44                return Err(ArrowError::CastError(
45                    "Expected struct with two binary fields".into(),
46                ));
47            }
48        }
49        _ => {
50            return Err(ArrowError::CastError(
51                "Expected StructArray with known fields".into(),
52            ))
53        }
54    }
55
56    let metadata_array = struct_array
57        .column(0)
58        .as_any()
59        .downcast_ref::<BinaryArray>()
60        .ok_or_else(|| ArrowError::CastError("Expected BinaryArray for 'metadata'".into()))?;
61
62    let value_array = struct_array
63        .column(1)
64        .as_any()
65        .downcast_ref::<BinaryArray>()
66        .ok_or_else(|| ArrowError::CastError("Expected BinaryArray for 'value'".into()))?;
67
68    // Zero-copy builder
69    // The size per JSON string is assumed to be 128 bytes. If this holds true, resizing could be
70    // minimized for performance.
71    let mut json_buffer: Vec<u8> = Vec::with_capacity(struct_array.len() * 128);
72    let mut offsets: Vec<i32> = Vec::with_capacity(struct_array.len() + 1);
73    let mut validity = BooleanBufferBuilder::new(struct_array.len());
74    let mut current_offset: i32 = 0;
75    offsets.push(current_offset);
76
77    for i in 0..struct_array.len() {
78        if struct_array.is_null(i) {
79            validity.append(false);
80            offsets.push(current_offset);
81        } else {
82            let metadata = metadata_array.value(i);
83            let value = value_array.value(i);
84            let variant = Variant::new(metadata, value);
85            let start_len = json_buffer.len();
86            variant_to_json(&mut json_buffer, &variant)?;
87            let written = (json_buffer.len() - start_len) as i32;
88            current_offset += written;
89            offsets.push(current_offset);
90            validity.append(true);
91        }
92    }
93
94    let offsets_buffer = OffsetBuffer::new(ScalarBuffer::from(offsets));
95    let value_buffer = Buffer::from_vec(json_buffer);
96    let null_buffer = NullBuffer::new(validity.finish());
97
98    Ok(StringArray::new(
99        offsets_buffer,
100        value_buffer,
101        Some(null_buffer),
102    ))
103}
104
105#[cfg(test)]
106mod test {
107    use crate::batch_variant_to_json_string;
108    use arrow::array::{Array, ArrayRef, BinaryBuilder, BooleanBufferBuilder, StructArray};
109    use arrow::buffer::NullBuffer;
110    use arrow::datatypes::DataType;
111    use arrow::datatypes::Field;
112    use arrow_schema::Fields;
113    use std::sync::Arc;
114
115    #[test]
116    fn test_batch_variant_to_json_string() {
117        let mut metadata_builder = BinaryBuilder::new();
118        let mut value_builder = BinaryBuilder::new();
119
120        // Row 0: [1, 0, 0], [12, 0]
121        metadata_builder.append_value([1, 0, 0]);
122        value_builder.append_value([12, 0]);
123
124        // Row 1: null
125        metadata_builder.append_null();
126        value_builder.append_null();
127
128        // Row 2: [1, 1, 0, 1, 97], [2, 1, 0, 0, 1, 32]
129        metadata_builder.append_value([1, 1, 0, 1, 97]);
130        value_builder.append_value([2, 1, 0, 0, 2, 12, 32]);
131
132        // Row 3: [1, 0, 0], [0]
133        metadata_builder.append_value([1, 0, 0]);
134        value_builder.append_value([0]);
135
136        // Row 4: null
137        metadata_builder.append_null();
138        value_builder.append_null();
139
140        let metadata_array = Arc::new(metadata_builder.finish()) as ArrayRef;
141        let value_array = Arc::new(value_builder.finish()) as ArrayRef;
142
143        let fields: Fields = vec![
144            Field::new("metadata", DataType::Binary, true),
145            Field::new("value", DataType::Binary, true),
146        ]
147        .into();
148
149        let mut validity = BooleanBufferBuilder::new(value_array.len());
150        for i in 0..value_array.len() {
151            let is_valid = value_array.is_valid(i) && metadata_array.is_valid(i);
152            validity.append(is_valid);
153        }
154        let null_buffer = NullBuffer::new(validity.finish());
155
156        let struct_array = StructArray::new(
157            fields,
158            vec![metadata_array.clone(), value_array.clone()],
159            Some(null_buffer), // Null bitmap (let Arrow infer from children)
160        );
161
162        let input = Arc::new(struct_array) as ArrayRef;
163
164        let result = batch_variant_to_json_string(&input).unwrap();
165
166        // Expected output: ["0", null, "{\"a\":32}", "null", null]
167        let expected = vec![Some("0"), None, Some("{\"a\":32}"), Some("null"), None];
168
169        let result_vec: Vec<Option<&str>> = (0..result.len())
170            .map(|i| {
171                if result.is_null(i) {
172                    None
173                } else {
174                    Some(result.value(i))
175                }
176            })
177            .collect();
178
179        assert_eq!(result_vec, expected);
180    }
181}