parquet_derive_test/
lib.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#![doc(
19    html_logo_url = "https://raw.githubusercontent.com/apache/parquet-format/25f05e73d8cd7f5c83532ce51cb4f4de8ba5f2a2/logo/parquet-logos_1.svg",
20    html_favicon_url = "https://raw.githubusercontent.com/apache/parquet-format/25f05e73d8cd7f5c83532ce51cb4f4de8ba5f2a2/logo/parquet-logos_1.svg"
21)]
22#![cfg_attr(docsrs, feature(doc_cfg))]
23#![allow(clippy::approx_constant)]
24
25use parquet_derive::{ParquetRecordReader, ParquetRecordWriter};
26use std::sync::Arc;
27
28#[derive(ParquetRecordWriter)]
29struct ACompleteRecord<'a> {
30    pub a_bool: bool,
31    pub a_str: &'a str,
32    pub a_string: String,
33    pub a_borrowed_string: &'a String,
34    pub a_arc_str: Arc<str>,
35    pub maybe_a_str: Option<&'a str>,
36    pub maybe_a_string: Option<String>,
37    pub maybe_a_arc_str: Option<Arc<str>>,
38    pub i16: i16,
39    pub i32: i32,
40    pub u64: u64,
41    pub maybe_u8: Option<u8>,
42    pub maybe_i16: Option<i16>,
43    pub maybe_u32: Option<u32>,
44    pub maybe_usize: Option<usize>,
45    pub isize: isize,
46    pub float: f32,
47    pub double: f64,
48    pub maybe_float: Option<f32>,
49    pub maybe_double: Option<f64>,
50    pub borrowed_maybe_a_string: &'a Option<String>,
51    pub borrowed_maybe_a_str: &'a Option<&'a str>,
52    pub now: chrono::NaiveDateTime,
53    pub uuid: uuid::Uuid,
54    pub byte_vec: Vec<u8>,
55    pub maybe_byte_vec: Option<Vec<u8>>,
56    pub borrowed_byte_vec: &'a [u8],
57    pub borrowed_maybe_byte_vec: &'a Option<Vec<u8>>,
58    pub borrowed_maybe_borrowed_byte_vec: &'a Option<&'a [u8]>,
59}
60
61#[derive(PartialEq, ParquetRecordWriter, ParquetRecordReader, Debug)]
62struct APartiallyCompleteRecord {
63    pub bool: bool,
64    pub string: String,
65    pub i16: i16,
66    pub i32: i32,
67    pub u64: u64,
68    pub isize: isize,
69    pub float: f32,
70    pub double: f64,
71    pub now: chrono::NaiveDateTime,
72    pub date: chrono::NaiveDate,
73    pub uuid: uuid::Uuid,
74    pub byte_vec: Vec<u8>,
75}
76
77// This struct has OPTIONAL columns
78// If these fields are guaranteed to be valid
79// we can load this struct into APartiallyCompleteRecord
80#[derive(PartialEq, ParquetRecordWriter, Debug)]
81struct APartiallyOptionalRecord {
82    pub bool: bool,
83    pub string: String,
84    pub i16: Option<i16>,
85    pub i32: Option<i32>,
86    pub u64: Option<u64>,
87    pub isize: isize,
88    pub float: f32,
89    pub double: f64,
90    pub now: chrono::NaiveDateTime,
91    pub date: chrono::NaiveDate,
92    pub uuid: uuid::Uuid,
93    pub byte_vec: Vec<u8>,
94}
95
96// This struct removes several fields from the "APartiallyCompleteRecord",
97// and it shuffles the fields.
98// we should still be able to load it from APartiallyCompleteRecord parquet file
99#[derive(PartialEq, ParquetRecordReader, Debug)]
100struct APrunedRecord {
101    pub bool: bool,
102    pub string: String,
103    pub byte_vec: Vec<u8>,
104    pub float: f32,
105    pub double: f64,
106    pub i16: i16,
107    pub i32: i32,
108    pub u64: u64,
109    pub isize: isize,
110}
111
112#[cfg(test)]
113mod tests {
114    use super::*;
115
116    use chrono::SubsecRound;
117    use std::{env, fs, io::Write, sync::Arc};
118
119    use parquet::{
120        file::writer::SerializedFileWriter,
121        record::{RecordReader, RecordWriter},
122        schema::parser::parse_message_type,
123    };
124
125    #[test]
126    fn test_parquet_derive_hello() {
127        let file = get_temp_file("test_parquet_derive_hello", &[]);
128
129        // The schema is not required, but this tests that the generated
130        // schema agrees with what one would write by hand.
131        let schema_str = "message rust_schema {
132            REQUIRED boolean         a_bool;
133            REQUIRED BINARY          a_str (STRING);
134            REQUIRED BINARY          a_string (STRING);
135            REQUIRED BINARY          a_borrowed_string (STRING);
136            REQUIRED BINARY          a_arc_str (STRING);
137            OPTIONAL BINARY          maybe_a_str (STRING);
138            OPTIONAL BINARY          maybe_a_string (STRING);
139            OPTIONAL BINARY          maybe_a_arc_str (STRING);
140            REQUIRED INT32           i16 (INTEGER(16,true));
141            REQUIRED INT32           i32;
142            REQUIRED INT64           u64 (INTEGER(64,false));
143            OPTIONAL INT32           maybe_u8 (INTEGER(8,false));
144            OPTIONAL INT32           maybe_i16 (INTEGER(16,true));
145            OPTIONAL INT32           maybe_u32 (INTEGER(32,false));
146            OPTIONAL INT64           maybe_usize (INTEGER(64,false));
147            REQUIRED INT64           isize (INTEGER(64,true));
148            REQUIRED FLOAT           float;
149            REQUIRED DOUBLE          double;
150            OPTIONAL FLOAT           maybe_float;
151            OPTIONAL DOUBLE          maybe_double;
152            OPTIONAL BINARY          borrowed_maybe_a_string (STRING);
153            OPTIONAL BINARY          borrowed_maybe_a_str (STRING);
154            REQUIRED INT64           now (TIMESTAMP_MILLIS);
155            REQUIRED FIXED_LEN_BYTE_ARRAY (16) uuid (UUID);
156            REQUIRED BINARY          byte_vec;
157            OPTIONAL BINARY          maybe_byte_vec;
158            REQUIRED BINARY          borrowed_byte_vec;
159            OPTIONAL BINARY          borrowed_maybe_byte_vec;
160            OPTIONAL BINARY          borrowed_maybe_borrowed_byte_vec;
161        }";
162
163        let schema = Arc::new(parse_message_type(schema_str).unwrap());
164
165        let a_str = "hello mother".to_owned();
166        let a_borrowed_string = "cool news".to_owned();
167        let a_arc_str: Arc<str> = "hello arc".into();
168        let maybe_a_string = Some("it's true, I'm a string".to_owned());
169        let maybe_a_str = Some(&a_str[..]);
170        let maybe_a_arc_str = Some(a_arc_str.clone());
171        let borrowed_byte_vec = vec![0x68, 0x69, 0x70];
172        let borrowed_maybe_byte_vec = Some(vec![0x71, 0x72]);
173        let borrowed_maybe_borrowed_byte_vec = Some(&borrowed_byte_vec[..]);
174
175        let drs: Vec<ACompleteRecord> = vec![ACompleteRecord {
176            a_bool: true,
177            a_str: &a_str[..],
178            a_string: "hello father".into(),
179            a_borrowed_string: &a_borrowed_string,
180            a_arc_str,
181            maybe_a_str: Some(&a_str[..]),
182            maybe_a_string: Some(a_str.clone()),
183            maybe_a_arc_str,
184            i16: -45,
185            i32: 456,
186            u64: 4563424,
187            maybe_u8: None,
188            maybe_i16: Some(3),
189            maybe_u32: None,
190            maybe_usize: Some(4456),
191            isize: -365,
192            float: 3.5,
193            double: f64::NAN,
194            maybe_float: None,
195            maybe_double: Some(f64::MAX),
196            borrowed_maybe_a_string: &maybe_a_string,
197            borrowed_maybe_a_str: &maybe_a_str,
198            now: chrono::Utc::now().naive_local(),
199            uuid: uuid::Uuid::new_v4(),
200            byte_vec: vec![0x65, 0x66, 0x67],
201            maybe_byte_vec: Some(vec![0x88, 0x89, 0x90]),
202            borrowed_byte_vec: &borrowed_byte_vec,
203            borrowed_maybe_byte_vec: &borrowed_maybe_byte_vec,
204            borrowed_maybe_borrowed_byte_vec: &borrowed_maybe_borrowed_byte_vec,
205        }];
206
207        let generated_schema = drs.as_slice().schema().unwrap();
208
209        assert_eq!(&schema, &generated_schema);
210
211        let props = Default::default();
212        let mut writer = SerializedFileWriter::new(file, generated_schema, props).unwrap();
213
214        let mut row_group = writer.next_row_group().unwrap();
215        drs.as_slice().write_to_row_group(&mut row_group).unwrap();
216        row_group.close().unwrap();
217        writer.close().unwrap();
218    }
219
220    #[test]
221    fn test_parquet_derive_read_write_combined() {
222        let file = get_temp_file("test_parquet_derive_combined", &[]);
223
224        let mut drs: Vec<APartiallyCompleteRecord> = vec![APartiallyCompleteRecord {
225            bool: true,
226            string: "a string".into(),
227            i16: -45,
228            i32: 456,
229            u64: 4563424,
230            isize: -365,
231            float: 3.5,
232            double: f64::NAN,
233            now: chrono::Utc::now().naive_local(),
234            date: chrono::naive::NaiveDate::from_ymd_opt(2015, 3, 14).unwrap(),
235            uuid: uuid::Uuid::new_v4(),
236            byte_vec: vec![0x65, 0x66, 0x67],
237        }];
238
239        let mut out: Vec<APartiallyCompleteRecord> = Vec::new();
240
241        use parquet::file::{reader::FileReader, serialized_reader::SerializedFileReader};
242
243        let generated_schema = drs.as_slice().schema().unwrap();
244
245        let props = Default::default();
246        let mut writer =
247            SerializedFileWriter::new(file.try_clone().unwrap(), generated_schema, props).unwrap();
248
249        let mut row_group = writer.next_row_group().unwrap();
250        drs.as_slice().write_to_row_group(&mut row_group).unwrap();
251        row_group.close().unwrap();
252        writer.close().unwrap();
253
254        let reader = SerializedFileReader::new(file).unwrap();
255
256        let mut row_group = reader.get_row_group(0).unwrap();
257        out.read_from_row_group(&mut *row_group, 1).unwrap();
258
259        // correct for rounding error when writing milliseconds
260
261        drs[0].now = drs[0].now.trunc_subsecs(3);
262
263        assert!(out[0].double.is_nan()); // these three lines are necessary because NAN != NAN
264        out[0].double = 0.;
265        drs[0].double = 0.;
266
267        assert_eq!(drs[0], out[0]);
268    }
269
270    #[test]
271    fn test_parquet_derive_read_optional_but_valid_column() {
272        let file = get_temp_file("test_parquet_derive_read_optional", &[]);
273        let drs = vec![APartiallyOptionalRecord {
274            bool: true,
275            string: "a string".into(),
276            i16: Some(-45),
277            i32: Some(456),
278            u64: Some(4563424),
279            isize: -365,
280            float: 3.5,
281            double: f64::NAN,
282            now: chrono::Utc::now().naive_local(),
283            date: chrono::naive::NaiveDate::from_ymd_opt(2015, 3, 14).unwrap(),
284            uuid: uuid::Uuid::new_v4(),
285            byte_vec: vec![0x65, 0x66, 0x67],
286        }];
287
288        let generated_schema = drs.as_slice().schema().unwrap();
289
290        let props = Default::default();
291        let mut writer =
292            SerializedFileWriter::new(file.try_clone().unwrap(), generated_schema, props).unwrap();
293
294        let mut row_group = writer.next_row_group().unwrap();
295        drs.as_slice().write_to_row_group(&mut row_group).unwrap();
296        row_group.close().unwrap();
297        writer.close().unwrap();
298
299        use parquet::file::{reader::FileReader, serialized_reader::SerializedFileReader};
300        let reader = SerializedFileReader::new(file).unwrap();
301        let mut out: Vec<APartiallyCompleteRecord> = Vec::new();
302
303        let mut row_group = reader.get_row_group(0).unwrap();
304        out.read_from_row_group(&mut *row_group, 1).unwrap();
305
306        assert_eq!(drs[0].i16.unwrap(), out[0].i16);
307        assert_eq!(drs[0].i32.unwrap(), out[0].i32);
308        assert_eq!(drs[0].u64.unwrap(), out[0].u64);
309    }
310
311    #[test]
312    fn test_parquet_derive_read_pruned_and_shuffled_columns() {
313        let file = get_temp_file("test_parquet_derive_read_pruned", &[]);
314        let drs = vec![APartiallyCompleteRecord {
315            bool: true,
316            string: "a string".into(),
317            i16: -45,
318            i32: 456,
319            u64: 4563424,
320            isize: -365,
321            float: 3.5,
322            double: f64::NAN,
323            now: chrono::Utc::now().naive_local(),
324            date: chrono::naive::NaiveDate::from_ymd_opt(2015, 3, 14).unwrap(),
325            uuid: uuid::Uuid::new_v4(),
326            byte_vec: vec![0x65, 0x66, 0x67],
327        }];
328
329        let generated_schema = drs.as_slice().schema().unwrap();
330
331        let props = Default::default();
332        let mut writer =
333            SerializedFileWriter::new(file.try_clone().unwrap(), generated_schema, props).unwrap();
334
335        let mut row_group = writer.next_row_group().unwrap();
336        drs.as_slice().write_to_row_group(&mut row_group).unwrap();
337        row_group.close().unwrap();
338        writer.close().unwrap();
339
340        use parquet::file::{reader::FileReader, serialized_reader::SerializedFileReader};
341        let reader = SerializedFileReader::new(file).unwrap();
342        let mut out: Vec<APrunedRecord> = Vec::new();
343
344        let mut row_group = reader.get_row_group(0).unwrap();
345        out.read_from_row_group(&mut *row_group, 1).unwrap();
346
347        assert_eq!(drs[0].bool, out[0].bool);
348        assert_eq!(drs[0].string, out[0].string);
349        assert_eq!(drs[0].byte_vec, out[0].byte_vec);
350        assert_eq!(drs[0].float, out[0].float);
351        assert!(drs[0].double.is_nan());
352        assert!(out[0].double.is_nan());
353        assert_eq!(drs[0].i16, out[0].i16);
354        assert_eq!(drs[0].i32, out[0].i32);
355        assert_eq!(drs[0].u64, out[0].u64);
356        assert_eq!(drs[0].isize, out[0].isize);
357    }
358
359    #[test]
360    fn test_aliased_result() {
361        // Issue 7547, Where aliasing the `Result` led to
362        // a collision with the macro internals of derive ParquetRecordReader
363
364        mod aliased_result {
365            use parquet_derive::{ParquetRecordReader, ParquetRecordWriter};
366
367            // This is the normal pattern that raised this issue
368            // pub type Result = std::result::Result<(), Box<dyn std::error::Error>>;
369
370            // not an actual result type
371            // Used here only to make the harder.
372            pub type Result = ();
373
374            #[derive(ParquetRecordReader, ParquetRecordWriter, Debug)]
375            pub struct ARecord {
376                pub bool: bool,
377                pub string: String,
378            }
379
380            impl ARecord {
381                pub fn do_nothing(&self) -> Result {}
382                pub fn validate(&self) -> std::result::Result<(), Box<dyn std::error::Error>> {
383                    Ok(())
384                }
385            }
386        }
387
388        use aliased_result::ARecord;
389        let foo = ARecord {
390            bool: true,
391            string: "test".to_string(),
392        };
393        foo.do_nothing();
394        assert!(foo.validate().is_ok());
395    }
396
397    /// Returns file handle for a temp file in 'target' directory with a provided content
398    pub fn get_temp_file(file_name: &str, content: &[u8]) -> fs::File {
399        // build tmp path to a file in "target/debug/testdata"
400        let mut path_buf = env::current_dir().unwrap();
401        path_buf.push("target");
402        path_buf.push("debug");
403        path_buf.push("testdata");
404        fs::create_dir_all(&path_buf).unwrap();
405        path_buf.push(file_name);
406
407        // write file content
408        let mut tmp_file = fs::File::create(path_buf.as_path()).unwrap();
409        tmp_file.write_all(content).unwrap();
410        tmp_file.sync_all().unwrap();
411
412        // return file handle for both read and write
413        let file = fs::OpenOptions::new()
414            .read(true)
415            .write(true)
416            .open(path_buf.as_path());
417        assert!(file.is_ok());
418        file.unwrap()
419    }
420}