arrow_integration_testing/
lib.rs1#![allow(unused_crate_dependencies)]
22#![warn(missing_docs)]
23use serde_json::Value;
24
25use arrow::array::{Array, StructArray};
26use arrow::datatypes::{DataType, Field, Fields, Schema};
27use arrow::error::{ArrowError, Result};
28use arrow::ffi::{from_ffi_and_data_type, FFI_ArrowArray, FFI_ArrowSchema};
29use arrow::record_batch::RecordBatch;
30use arrow::util::test_util::arrow_test_data;
31use arrow_integration_test::*;
32use std::collections::HashMap;
33use std::ffi::{c_char, c_int, CStr, CString};
34use std::fs::File;
35use std::io::BufReader;
36use std::iter::zip;
37use std::ptr;
38use std::sync::Arc;
39
40pub const AUTH_USERNAME: &str = "arrow";
42pub const AUTH_PASSWORD: &str = "flight";
44
45pub mod flight_client_scenarios;
46pub mod flight_server_scenarios;
47
48pub struct ArrowFile {
50 pub schema: Schema,
52 dictionaries: HashMap<i64, ArrowJsonDictionaryBatch>,
55 arrow_json: Value,
56}
57
58impl ArrowFile {
59 pub fn read_batch(&self, batch_num: usize) -> Result<RecordBatch> {
61 let b = self.arrow_json["batches"].get(batch_num).unwrap();
62 let json_batch: ArrowJsonBatch = serde_json::from_value(b.clone()).unwrap();
63 record_batch_from_json(&self.schema, json_batch, Some(&self.dictionaries))
64 }
65
66 pub fn read_batches(&self) -> Result<Vec<RecordBatch>> {
68 self.arrow_json["batches"]
69 .as_array()
70 .unwrap()
71 .iter()
72 .map(|b| {
73 let json_batch: ArrowJsonBatch = serde_json::from_value(b.clone()).unwrap();
74 record_batch_from_json(&self.schema, json_batch, Some(&self.dictionaries))
75 })
76 .collect()
77 }
78}
79
80pub fn canonicalize_schema(schema: &Schema) -> Schema {
82 let fields = schema
83 .fields()
84 .iter()
85 .map(|field| match field.data_type() {
86 DataType::Map(child_field, sorted) => match child_field.data_type() {
87 DataType::Struct(fields) if fields.len() == 2 => {
88 let first_field = &fields[0];
89 let key_field =
90 Arc::new(Field::new("key", first_field.data_type().clone(), false));
91 let second_field = &fields[1];
92 let value_field = Arc::new(Field::new(
93 "value",
94 second_field.data_type().clone(),
95 second_field.is_nullable(),
96 ));
97
98 let fields = Fields::from([key_field, value_field]);
99 let struct_type = DataType::Struct(fields);
100 let child_field = Field::new("entries", struct_type, false);
101
102 Arc::new(Field::new(
103 field.name().as_str(),
104 DataType::Map(Arc::new(child_field), *sorted),
105 field.is_nullable(),
106 ))
107 }
108 _ => panic!("The child field of Map type should be Struct type with 2 fields."),
109 },
110 _ => field.clone(),
111 })
112 .collect::<Fields>();
113
114 Schema::new(fields).with_metadata(schema.metadata().clone())
115}
116
117pub fn open_json_file(json_name: &str) -> Result<ArrowFile> {
119 let json_file = File::open(json_name)?;
120 let reader = BufReader::new(json_file);
121 let arrow_json: Value = serde_json::from_reader(reader).unwrap();
122 let schema = schema_from_json(&arrow_json["schema"])?;
123 let mut dictionaries = HashMap::new();
125 if let Some(dicts) = arrow_json.get("dictionaries") {
126 for d in dicts
127 .as_array()
128 .expect("Unable to get dictionaries as array")
129 {
130 let json_dict: ArrowJsonDictionaryBatch =
131 serde_json::from_value(d.clone()).expect("Unable to get dictionary from JSON");
132 dictionaries.insert(json_dict.id, json_dict);
134 }
135 }
136 Ok(ArrowFile {
137 schema,
138 dictionaries,
139 arrow_json,
140 })
141}
142
143pub fn read_gzip_json(version: &str, path: &str) -> ArrowJson {
152 use flate2::read::GzDecoder;
153 use std::io::Read;
154
155 let testdata = arrow_test_data();
156 let file = File::open(format!(
157 "{testdata}/arrow-ipc-stream/integration/{version}/{path}.json.gz"
158 ))
159 .unwrap();
160 let mut gz = GzDecoder::new(&file);
161 let mut s = String::new();
162 gz.read_to_string(&mut s).unwrap();
163 let arrow_json: ArrowJson = serde_json::from_str(&s).unwrap();
165 arrow_json
166}
167
168fn cdata_integration_export_schema_from_json(
170 c_json_name: *const c_char,
171 out: *mut FFI_ArrowSchema,
172) -> Result<()> {
173 let json_name = unsafe { CStr::from_ptr(c_json_name) };
174 let f = open_json_file(json_name.to_str()?)?;
175 let c_schema = FFI_ArrowSchema::try_from(&f.schema)?;
176 unsafe { ptr::write(out, c_schema) };
178 Ok(())
179}
180
181fn cdata_integration_export_batch_from_json(
183 c_json_name: *const c_char,
184 batch_num: c_int,
185 out: *mut FFI_ArrowArray,
186) -> Result<()> {
187 let json_name = unsafe { CStr::from_ptr(c_json_name) };
188 let b = open_json_file(json_name.to_str()?)?.read_batch(batch_num.try_into().unwrap())?;
189 let a = StructArray::from(b).into_data();
190 let c_array = FFI_ArrowArray::new(&a);
191 unsafe { ptr::write(out, c_array) };
193 Ok(())
194}
195
196fn cdata_integration_import_schema_and_compare_to_json(
197 c_json_name: *const c_char,
198 c_schema: *mut FFI_ArrowSchema,
199) -> Result<()> {
200 let json_name = unsafe { CStr::from_ptr(c_json_name) };
201 let json_schema = open_json_file(json_name.to_str()?)?.schema;
202
203 let imported_schema = unsafe { FFI_ArrowSchema::from_raw(c_schema) };
205 let imported_schema = Schema::try_from(&imported_schema)?;
206
207 if canonicalize_schema(&json_schema) != canonicalize_schema(&imported_schema) {
209 return Err(ArrowError::ComputeError(format!(
210 "Schemas do not match.\n- JSON: {:?}\n- Imported: {:?}",
211 json_schema, imported_schema
212 )));
213 }
214 Ok(())
215}
216
217fn compare_batches(a: &RecordBatch, b: &RecordBatch) -> Result<()> {
218 if a.num_columns() != b.num_columns() {
219 return Err(ArrowError::InvalidArgumentError(
220 "batches do not have the same number of columns".to_string(),
221 ));
222 }
223 for (a_column, b_column) in zip(a.columns(), b.columns()) {
224 if a_column != b_column {
225 return Err(ArrowError::InvalidArgumentError(
226 "batch columns are not the same".to_string(),
227 ));
228 }
229 }
230 Ok(())
231}
232
233fn cdata_integration_import_batch_and_compare_to_json(
234 c_json_name: *const c_char,
235 batch_num: c_int,
236 c_array: *mut FFI_ArrowArray,
237) -> Result<()> {
238 let json_name = unsafe { CStr::from_ptr(c_json_name) };
239 let json_batch =
240 open_json_file(json_name.to_str()?)?.read_batch(batch_num.try_into().unwrap())?;
241 let schema = json_batch.schema();
242
243 let data_type_for_import = DataType::Struct(schema.fields.clone());
244 let imported_array = unsafe { FFI_ArrowArray::from_raw(c_array) };
245 let imported_array = unsafe { from_ffi_and_data_type(imported_array, data_type_for_import) }?;
246 imported_array.validate_full()?;
247 let imported_batch = RecordBatch::from(StructArray::from(imported_array));
248
249 compare_batches(&json_batch, &imported_batch)
250}
251
252fn result_to_c_error<T, E: std::fmt::Display>(result: &std::result::Result<T, E>) -> *mut c_char {
254 match result {
255 Ok(_) => ptr::null_mut(),
256 Err(e) => CString::new(format!("{}", e)).unwrap().into_raw(),
257 }
258}
259
260#[no_mangle]
266pub unsafe extern "C" fn arrow_rs_free_error(c_error: *mut c_char) {
267 if !c_error.is_null() {
268 drop(unsafe { CString::from_raw(c_error) });
269 }
270}
271
272#[no_mangle]
274pub extern "C" fn arrow_rs_cdata_integration_export_schema_from_json(
275 c_json_name: *const c_char,
276 out: *mut FFI_ArrowSchema,
277) -> *mut c_char {
278 let r = cdata_integration_export_schema_from_json(c_json_name, out);
279 result_to_c_error(&r)
280}
281
282#[no_mangle]
284pub extern "C" fn arrow_rs_cdata_integration_import_schema_and_compare_to_json(
285 c_json_name: *const c_char,
286 c_schema: *mut FFI_ArrowSchema,
287) -> *mut c_char {
288 let r = cdata_integration_import_schema_and_compare_to_json(c_json_name, c_schema);
289 result_to_c_error(&r)
290}
291
292#[no_mangle]
294pub extern "C" fn arrow_rs_cdata_integration_export_batch_from_json(
295 c_json_name: *const c_char,
296 batch_num: c_int,
297 out: *mut FFI_ArrowArray,
298) -> *mut c_char {
299 let r = cdata_integration_export_batch_from_json(c_json_name, batch_num, out);
300 result_to_c_error(&r)
301}
302
303#[no_mangle]
305pub extern "C" fn arrow_rs_cdata_integration_import_batch_and_compare_to_json(
306 c_json_name: *const c_char,
307 batch_num: c_int,
308 c_array: *mut FFI_ArrowArray,
309) -> *mut c_char {
310 let r = cdata_integration_import_batch_and_compare_to_json(c_json_name, batch_num, c_array);
311 result_to_c_error(&r)
312}