arrow_integration_testing/
lib.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Common code used in the integration test binaries
19
20// The unused_crate_dependencies lint does not work well for crates defining additional examples/bin targets
21#![allow(unused_crate_dependencies)]
22#![warn(missing_docs)]
23use serde_json::Value;
24
25use arrow::array::{Array, StructArray};
26use arrow::datatypes::{DataType, Field, Fields, Schema};
27use arrow::error::{ArrowError, Result};
28use arrow::ffi::{from_ffi_and_data_type, FFI_ArrowArray, FFI_ArrowSchema};
29use arrow::record_batch::RecordBatch;
30use arrow::util::test_util::arrow_test_data;
31use arrow_integration_test::*;
32use std::collections::HashMap;
33use std::ffi::{c_char, c_int, CStr, CString};
34use std::fs::File;
35use std::io::BufReader;
36use std::iter::zip;
37use std::ptr;
38use std::sync::Arc;
39
40/// The expected username for the basic auth integration test.
41pub const AUTH_USERNAME: &str = "arrow";
42/// The expected password for the basic auth integration test.
43pub const AUTH_PASSWORD: &str = "flight";
44
45pub mod flight_client_scenarios;
46pub mod flight_server_scenarios;
47
48/// An Arrow file in JSON format
49pub struct ArrowFile {
50    /// The schema of the file
51    pub schema: Schema,
52    // we can evolve this into a concrete Arrow type
53    // this is temporarily not being read from
54    dictionaries: HashMap<i64, ArrowJsonDictionaryBatch>,
55    arrow_json: Value,
56}
57
58impl ArrowFile {
59    /// Read a single [RecordBatch] from the file
60    pub fn read_batch(&self, batch_num: usize) -> Result<RecordBatch> {
61        let b = self.arrow_json["batches"].get(batch_num).unwrap();
62        let json_batch: ArrowJsonBatch = serde_json::from_value(b.clone()).unwrap();
63        record_batch_from_json(&self.schema, json_batch, Some(&self.dictionaries))
64    }
65
66    /// Read all [RecordBatch]es from the file
67    pub fn read_batches(&self) -> Result<Vec<RecordBatch>> {
68        self.arrow_json["batches"]
69            .as_array()
70            .unwrap()
71            .iter()
72            .map(|b| {
73                let json_batch: ArrowJsonBatch = serde_json::from_value(b.clone()).unwrap();
74                record_batch_from_json(&self.schema, json_batch, Some(&self.dictionaries))
75            })
76            .collect()
77    }
78}
79
80/// Canonicalize the names of map fields in a schema
81pub fn canonicalize_schema(schema: &Schema) -> Schema {
82    let fields = schema
83        .fields()
84        .iter()
85        .map(|field| match field.data_type() {
86            DataType::Map(child_field, sorted) => match child_field.data_type() {
87                DataType::Struct(fields) if fields.len() == 2 => {
88                    let first_field = &fields[0];
89                    let key_field =
90                        Arc::new(Field::new("key", first_field.data_type().clone(), false));
91                    let second_field = &fields[1];
92                    let value_field = Arc::new(Field::new(
93                        "value",
94                        second_field.data_type().clone(),
95                        second_field.is_nullable(),
96                    ));
97
98                    let fields = Fields::from([key_field, value_field]);
99                    let struct_type = DataType::Struct(fields);
100                    let child_field = Field::new("entries", struct_type, false);
101
102                    Arc::new(Field::new(
103                        field.name().as_str(),
104                        DataType::Map(Arc::new(child_field), *sorted),
105                        field.is_nullable(),
106                    ))
107                }
108                _ => panic!("The child field of Map type should be Struct type with 2 fields."),
109            },
110            _ => field.clone(),
111        })
112        .collect::<Fields>();
113
114    Schema::new(fields).with_metadata(schema.metadata().clone())
115}
116
117/// Read an Arrow file in JSON format
118pub fn open_json_file(json_name: &str) -> Result<ArrowFile> {
119    let json_file = File::open(json_name)?;
120    let reader = BufReader::new(json_file);
121    let arrow_json: Value = serde_json::from_reader(reader).unwrap();
122    let schema = schema_from_json(&arrow_json["schema"])?;
123    // read dictionaries
124    let mut dictionaries = HashMap::new();
125    if let Some(dicts) = arrow_json.get("dictionaries") {
126        for d in dicts
127            .as_array()
128            .expect("Unable to get dictionaries as array")
129        {
130            let json_dict: ArrowJsonDictionaryBatch =
131                serde_json::from_value(d.clone()).expect("Unable to get dictionary from JSON");
132            // TODO: convert to a concrete Arrow type
133            dictionaries.insert(json_dict.id, json_dict);
134        }
135    }
136    Ok(ArrowFile {
137        schema,
138        dictionaries,
139        arrow_json,
140    })
141}
142
143/// Read gzipped JSON test file
144///
145/// For example given the input:
146/// version = `0.17.1`
147/// path = `generated_union`
148///
149/// Returns the contents of
150/// `arrow-ipc-stream/integration/0.17.1/generated_union.json.gz`
151pub fn read_gzip_json(version: &str, path: &str) -> ArrowJson {
152    use flate2::read::GzDecoder;
153    use std::io::Read;
154
155    let testdata = arrow_test_data();
156    let file = File::open(format!(
157        "{testdata}/arrow-ipc-stream/integration/{version}/{path}.json.gz"
158    ))
159    .unwrap();
160    let mut gz = GzDecoder::new(&file);
161    let mut s = String::new();
162    gz.read_to_string(&mut s).unwrap();
163    // convert to Arrow JSON
164    let arrow_json: ArrowJson = serde_json::from_str(&s).unwrap();
165    arrow_json
166}
167
168/// C Data Integration entrypoint to export the schema from a JSON file
169fn cdata_integration_export_schema_from_json(
170    c_json_name: *const c_char,
171    out: *mut FFI_ArrowSchema,
172) -> Result<()> {
173    let json_name = unsafe { CStr::from_ptr(c_json_name) };
174    let f = open_json_file(json_name.to_str()?)?;
175    let c_schema = FFI_ArrowSchema::try_from(&f.schema)?;
176    // Move exported schema into output struct
177    unsafe { ptr::write(out, c_schema) };
178    Ok(())
179}
180
181/// C Data Integration entrypoint to export a batch from a JSON file
182fn cdata_integration_export_batch_from_json(
183    c_json_name: *const c_char,
184    batch_num: c_int,
185    out: *mut FFI_ArrowArray,
186) -> Result<()> {
187    let json_name = unsafe { CStr::from_ptr(c_json_name) };
188    let b = open_json_file(json_name.to_str()?)?.read_batch(batch_num.try_into().unwrap())?;
189    let a = StructArray::from(b).into_data();
190    let c_array = FFI_ArrowArray::new(&a);
191    // Move exported array into output struct
192    unsafe { ptr::write(out, c_array) };
193    Ok(())
194}
195
196fn cdata_integration_import_schema_and_compare_to_json(
197    c_json_name: *const c_char,
198    c_schema: *mut FFI_ArrowSchema,
199) -> Result<()> {
200    let json_name = unsafe { CStr::from_ptr(c_json_name) };
201    let json_schema = open_json_file(json_name.to_str()?)?.schema;
202
203    // The source ArrowSchema will be released when this is dropped
204    let imported_schema = unsafe { FFI_ArrowSchema::from_raw(c_schema) };
205    let imported_schema = Schema::try_from(&imported_schema)?;
206
207    // compare schemas
208    if canonicalize_schema(&json_schema) != canonicalize_schema(&imported_schema) {
209        return Err(ArrowError::ComputeError(format!(
210            "Schemas do not match.\n- JSON: {:?}\n- Imported: {:?}",
211            json_schema, imported_schema
212        )));
213    }
214    Ok(())
215}
216
217fn compare_batches(a: &RecordBatch, b: &RecordBatch) -> Result<()> {
218    if a.num_columns() != b.num_columns() {
219        return Err(ArrowError::InvalidArgumentError(
220            "batches do not have the same number of columns".to_string(),
221        ));
222    }
223    for (a_column, b_column) in zip(a.columns(), b.columns()) {
224        if a_column != b_column {
225            return Err(ArrowError::InvalidArgumentError(
226                "batch columns are not the same".to_string(),
227            ));
228        }
229    }
230    Ok(())
231}
232
233fn cdata_integration_import_batch_and_compare_to_json(
234    c_json_name: *const c_char,
235    batch_num: c_int,
236    c_array: *mut FFI_ArrowArray,
237) -> Result<()> {
238    let json_name = unsafe { CStr::from_ptr(c_json_name) };
239    let json_batch =
240        open_json_file(json_name.to_str()?)?.read_batch(batch_num.try_into().unwrap())?;
241    let schema = json_batch.schema();
242
243    let data_type_for_import = DataType::Struct(schema.fields.clone());
244    let imported_array = unsafe { FFI_ArrowArray::from_raw(c_array) };
245    let imported_array = unsafe { from_ffi_and_data_type(imported_array, data_type_for_import) }?;
246    imported_array.validate_full()?;
247    let imported_batch = RecordBatch::from(StructArray::from(imported_array));
248
249    compare_batches(&json_batch, &imported_batch)
250}
251
252// If Result is an error, then export a const char* to its string display, otherwise NULL
253fn result_to_c_error<T, E: std::fmt::Display>(result: &std::result::Result<T, E>) -> *mut c_char {
254    match result {
255        Ok(_) => ptr::null_mut(),
256        Err(e) => CString::new(format!("{}", e)).unwrap().into_raw(),
257    }
258}
259
260/// Release a const char* exported by result_to_c_error()
261///
262/// # Safety
263///
264/// The pointer is assumed to have been obtained using CString::into_raw.
265#[no_mangle]
266pub unsafe extern "C" fn arrow_rs_free_error(c_error: *mut c_char) {
267    if !c_error.is_null() {
268        drop(unsafe { CString::from_raw(c_error) });
269    }
270}
271
272/// A C-ABI for exporting an Arrow schema from a JSON file
273#[no_mangle]
274pub extern "C" fn arrow_rs_cdata_integration_export_schema_from_json(
275    c_json_name: *const c_char,
276    out: *mut FFI_ArrowSchema,
277) -> *mut c_char {
278    let r = cdata_integration_export_schema_from_json(c_json_name, out);
279    result_to_c_error(&r)
280}
281
282/// A C-ABI to compare an Arrow schema against a JSON file
283#[no_mangle]
284pub extern "C" fn arrow_rs_cdata_integration_import_schema_and_compare_to_json(
285    c_json_name: *const c_char,
286    c_schema: *mut FFI_ArrowSchema,
287) -> *mut c_char {
288    let r = cdata_integration_import_schema_and_compare_to_json(c_json_name, c_schema);
289    result_to_c_error(&r)
290}
291
292/// A C-ABI for exporting a RecordBatch from a JSON file
293#[no_mangle]
294pub extern "C" fn arrow_rs_cdata_integration_export_batch_from_json(
295    c_json_name: *const c_char,
296    batch_num: c_int,
297    out: *mut FFI_ArrowArray,
298) -> *mut c_char {
299    let r = cdata_integration_export_batch_from_json(c_json_name, batch_num, out);
300    result_to_c_error(&r)
301}
302
303/// A C-ABI to compare a RecordBatch against a JSON file
304#[no_mangle]
305pub extern "C" fn arrow_rs_cdata_integration_import_batch_and_compare_to_json(
306    c_json_name: *const c_char,
307    batch_num: c_int,
308    c_array: *mut FFI_ArrowArray,
309) -> *mut c_char {
310    let r = cdata_integration_import_batch_and_compare_to_json(c_json_name, batch_num, c_array);
311    result_to_c_error(&r)
312}