Skip to main content

parquet_derive/
parquet_field.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use syn::ext::IdentExt;
19
20#[derive(Debug, PartialEq)]
21pub struct Field {
22    ident: syn::Ident,
23    ty: Type,
24    is_a_byte_buf: bool,
25    third_party_type: Option<ThirdPartyType>,
26}
27
28/// Use third party libraries, detected
29/// at compile time. These libraries will
30/// be written to parquet as their preferred
31/// physical type.
32///
33///   ChronoNaiveDateTime is written as i64
34///   ChronoNaiveDate is written as i32
35#[derive(Debug, PartialEq)]
36enum ThirdPartyType {
37    ChronoNaiveDateTime,
38    ChronoNaiveDate,
39    Uuid,
40}
41
42impl Field {
43    pub fn from(f: &syn::Field) -> Self {
44        let ty = Type::from(f);
45        let is_a_byte_buf = ty.physical_type() == parquet::basic::Type::BYTE_ARRAY;
46
47        let third_party_type = match &ty.last_part()[..] {
48            "NaiveDateTime" => Some(ThirdPartyType::ChronoNaiveDateTime),
49            "NaiveDate" => Some(ThirdPartyType::ChronoNaiveDate),
50            "Uuid" => Some(ThirdPartyType::Uuid),
51            _ => None,
52        };
53
54        Field {
55            ident: f
56                .ident
57                .clone()
58                .expect("Only structs with named fields are currently supported"),
59            ty,
60            is_a_byte_buf,
61            third_party_type,
62        }
63    }
64
65    /// Takes the parsed field of the struct and emits a valid
66    /// column writer snippet. Should match exactly what you
67    /// would write by hand.
68    ///
69    /// Can only generate writers for basic structs, for example:
70    ///
71    /// struct Record {
72    ///   a_bool: bool,
73    ///   maybe_a_bool: `Option<bool>`
74    /// }
75    ///
76    /// but not
77    ///
78    /// struct UnsupportedNestedRecord {
79    ///   a_property: bool,
80    ///   nested_record: Record
81    /// }
82    ///
83    /// because this parsing logic is not sophisticated enough for definition
84    /// levels beyond 2.
85    pub fn writer_snippet(&self) -> proc_macro2::TokenStream {
86        let ident = &self.ident;
87        let column_writer = self.ty.column_writer();
88
89        let vals_builder = match &self.ty {
90            Type::TypePath(_) => self.copied_direct_vals(),
91            Type::Option(first_type) => match **first_type {
92                Type::TypePath(_) => self.option_into_vals(),
93                Type::Reference(_, ref second_type) => match **second_type {
94                    Type::TypePath(_) => self.option_into_vals(),
95                    _ => unimplemented!("Unsupported type encountered"),
96                },
97                Type::Vec(ref first_type) => match **first_type {
98                    Type::TypePath(_) => self.option_into_vals(),
99                    _ => unimplemented!("Unsupported type encountered"),
100                },
101                ref f => unimplemented!("Unsupported: {:#?}", f),
102            },
103            Type::Reference(_, first_type) => match **first_type {
104                Type::TypePath(_) => self.copied_direct_vals(),
105                Type::Option(ref second_type) => match **second_type {
106                    Type::TypePath(_) => self.option_into_vals(),
107                    Type::Reference(_, ref second_type) => match **second_type {
108                        Type::TypePath(_) => self.option_into_vals(),
109                        Type::Slice(ref second_type) => match **second_type {
110                            Type::TypePath(_) => self.option_into_vals(),
111                            ref f => unimplemented!("Unsupported: {:#?}", f),
112                        },
113                        _ => unimplemented!("Unsupported type encountered"),
114                    },
115                    Type::Vec(ref first_type) => match **first_type {
116                        Type::TypePath(_) => self.option_into_vals(),
117                        _ => unimplemented!("Unsupported type encountered"),
118                    },
119                    ref f => unimplemented!("Unsupported: {:#?}", f),
120                },
121                Type::Slice(ref second_type) => match **second_type {
122                    Type::TypePath(_) => self.copied_direct_vals(),
123                    ref f => unimplemented!("Unsupported: {:#?}", f),
124                },
125                ref f => unimplemented!("Unsupported: {:#?}", f),
126            },
127            Type::Vec(first_type) => match **first_type {
128                Type::TypePath(_) => self.copied_direct_vals(),
129                ref f => unimplemented!("Unsupported: {:#?}", f),
130            },
131            f => unimplemented!("Unsupported: {:#?}", f),
132        };
133
134        let definition_levels = match &self.ty {
135            Type::TypePath(_) => None,
136            Type::Option(first_type) => match **first_type {
137                Type::TypePath(_) => Some(self.optional_definition_levels()),
138                Type::Option(_) => unimplemented!("Unsupported nesting encountered"),
139                Type::Reference(_, ref second_type)
140                | Type::Vec(ref second_type)
141                | Type::Array(ref second_type, _)
142                | Type::Slice(ref second_type) => match **second_type {
143                    Type::TypePath(_) => Some(self.optional_definition_levels()),
144                    _ => unimplemented!("Unsupported nesting encountered"),
145                },
146            },
147            Type::Reference(_, first_type)
148            | Type::Vec(first_type)
149            | Type::Array(first_type, _)
150            | Type::Slice(first_type) => match **first_type {
151                Type::TypePath(_) => None,
152                Type::Vec(ref second_type)
153                | Type::Array(ref second_type, _)
154                | Type::Slice(ref second_type) => match **second_type {
155                    Type::TypePath(_) => None,
156                    Type::Reference(_, ref third_type) => match **third_type {
157                        Type::TypePath(_) => None,
158                        _ => unimplemented!("Unsupported definition encountered"),
159                    },
160                    _ => unimplemented!("Unsupported definition encountered"),
161                },
162                Type::Reference(_, ref second_type) | Type::Option(ref second_type) => {
163                    match **second_type {
164                        Type::TypePath(_) => Some(self.optional_definition_levels()),
165                        Type::Vec(ref third_type)
166                        | Type::Array(ref third_type, _)
167                        | Type::Slice(ref third_type) => match **third_type {
168                            Type::TypePath(_) => Some(self.optional_definition_levels()),
169                            Type::Reference(_, ref fourth_type) => match **fourth_type {
170                                Type::TypePath(_) => Some(self.optional_definition_levels()),
171                                _ => unimplemented!("Unsupported definition encountered"),
172                            },
173                            _ => unimplemented!("Unsupported definition encountered"),
174                        },
175                        Type::Reference(_, ref third_type) => match **third_type {
176                            Type::TypePath(_) => Some(self.optional_definition_levels()),
177                            Type::Slice(ref fourth_type) => match **fourth_type {
178                                Type::TypePath(_) => Some(self.optional_definition_levels()),
179                                _ => unimplemented!("Unsupported definition encountered"),
180                            },
181                            _ => unimplemented!("Unsupported definition encountered"),
182                        },
183                        _ => unimplemented!("Unsupported definition encountered"),
184                    }
185                }
186            },
187        };
188
189        // "vals" is the run of primitive data being written for the column
190        // "definition_levels" is a vector of bools which controls whether a value is missing or present
191        // this TokenStream is only one part of the code for writing a column and
192        // it relies on values calculated in prior code snippets, namely "definition_levels" and "vals_builder".
193        // All the context is put together in this functions final quote and
194        // this expression just switches between non-nullable and nullable write statements
195        let write_batch_expr = if definition_levels.is_some() {
196            quote! {
197                if let #column_writer(typed) = column_writer.untyped() {
198                    typed.write_batch(&vals[..], Some(&definition_levels[..]), None)?;
199                } else {
200                    panic!("Schema and struct disagree on type for {}", stringify!{#ident})
201                }
202            }
203        } else {
204            quote! {
205                if let #column_writer(typed) = column_writer.untyped() {
206                    typed.write_batch(&vals[..], None, None)?;
207                } else {
208                    panic!("Schema and struct disagree on type for {}", stringify!{#ident})
209                }
210            }
211        };
212
213        quote! {
214            {
215                #definition_levels
216
217                #vals_builder
218
219                #write_batch_expr
220            }
221        }
222    }
223
224    /// Takes the parsed field of the struct and emits a valid
225    /// column reader snippet. Should match exactly what you
226    /// would write by hand.
227    ///
228    /// Can only generate writers for basic structs, for example:
229    ///
230    /// struct Record {
231    ///   a_bool: bool
232    /// }
233    ///
234    /// but not
235    ///
236    /// struct UnsupportedNestedRecord {
237    ///   a_property: bool,
238    ///   nested_record: Record
239    /// }
240    ///
241    /// because this parsing logic is not sophisticated enough for definition
242    /// levels beyond 2.
243    ///
244    /// `Option` types and references not supported, but the column itself can be nullable
245    /// (i.e., def_level==1), as long as the values are all valid.
246    pub fn reader_snippet(&self) -> proc_macro2::TokenStream {
247        let ident = &self.ident;
248        let column_reader = self.ty.column_reader();
249
250        // generate the code to read the column into a vector `vals`
251        let write_batch_expr = quote! {
252            let mut vals = Vec::new();
253            if let #column_reader(mut typed) = column_reader {
254                let mut definition_levels = Vec::new();
255                let (total_num, valid_num, decoded_num) = typed.read_records(
256                    num_records, Some(&mut definition_levels), None, &mut vals)?;
257                if valid_num != decoded_num {
258                    panic!("Support only valid records, found {} null records in column type {}",
259                        decoded_num - valid_num, stringify!{#ident});
260                }
261            } else {
262                panic!("Schema and struct disagree on type for {}", stringify!{#ident});
263            }
264        };
265
266        // generate the code to convert each element of `vals` to the correct type and then write
267        // it to its field in the corresponding struct
268        let vals_writer = match &self.ty {
269            Type::TypePath(_) => self.copied_direct_fields(),
270            Type::Reference(_, first_type) => match **first_type {
271                Type::TypePath(_) => self.copied_direct_fields(),
272                Type::Slice(ref second_type) => match **second_type {
273                    Type::TypePath(_) => self.copied_direct_fields(),
274                    ref f => unimplemented!("Unsupported: {:#?}", f),
275                },
276                ref f => unimplemented!("Unsupported: {:#?}", f),
277            },
278            Type::Vec(first_type) => match **first_type {
279                Type::TypePath(_) => self.copied_direct_fields(),
280                ref f => unimplemented!("Unsupported: {:#?}", f),
281            },
282            f => unimplemented!("Unsupported: {:#?}", f),
283        };
284
285        quote! {
286            {
287                #write_batch_expr
288
289                #vals_writer
290            }
291        }
292    }
293
294    pub fn parquet_type(&self) -> proc_macro2::TokenStream {
295        // TODO: Support group types
296        // TODO: Add length if dealing with fixedlenbinary
297
298        // unraw the identifier, so a raw identifier like `r#type`
299        // becomes a column named `type` in the parquet schema
300        let field_name = self.ident.unraw().to_string();
301        let physical_type = match self.ty.physical_type() {
302            parquet::basic::Type::BOOLEAN => quote! {
303                ::parquet::basic::Type::BOOLEAN
304            },
305            parquet::basic::Type::INT32 => quote! {
306                ::parquet::basic::Type::INT32
307            },
308            parquet::basic::Type::INT64 => quote! {
309                ::parquet::basic::Type::INT64
310            },
311            parquet::basic::Type::INT96 => quote! {
312                ::parquet::basic::Type::INT96
313            },
314            parquet::basic::Type::FLOAT => quote! {
315                ::parquet::basic::Type::FLOAT
316            },
317            parquet::basic::Type::DOUBLE => quote! {
318                ::parquet::basic::Type::DOUBLE
319            },
320            parquet::basic::Type::BYTE_ARRAY => quote! {
321                ::parquet::basic::Type::BYTE_ARRAY
322            },
323            parquet::basic::Type::FIXED_LEN_BYTE_ARRAY => quote! {
324                ::parquet::basic::Type::FIXED_LEN_BYTE_ARRAY
325            },
326        };
327        let logical_type = self.ty.logical_type();
328        let repetition = self.ty.repetition();
329        let converted_type = self.ty.converted_type();
330        let length = self.ty.length();
331
332        let mut builder = quote! {
333            ParquetType::primitive_type_builder(#field_name, #physical_type)
334                .with_logical_type(#logical_type)
335                .with_repetition(#repetition)
336        };
337
338        if let Some(converted_type) = converted_type {
339            builder = quote! { #builder.with_converted_type(#converted_type) };
340        }
341
342        if let Some(length) = length {
343            builder = quote! { #builder.with_length(#length) };
344        }
345
346        quote! {  fields.push(#builder.build().unwrap().into()) }
347    }
348
349    fn option_into_vals(&self) -> proc_macro2::TokenStream {
350        let field_name = &self.ident;
351        let is_a_byte_buf = self.is_a_byte_buf;
352        let is_a_timestamp = self.third_party_type == Some(ThirdPartyType::ChronoNaiveDateTime);
353        let is_a_date = self.third_party_type == Some(ThirdPartyType::ChronoNaiveDate);
354        let is_a_uuid = self.third_party_type == Some(ThirdPartyType::Uuid);
355        let copy_to_vec = !matches!(
356            self.ty.physical_type(),
357            parquet::basic::Type::BYTE_ARRAY | parquet::basic::Type::FIXED_LEN_BYTE_ARRAY
358        );
359
360        let binding = if copy_to_vec {
361            quote! { let Some(inner) = rec.#field_name }
362        } else {
363            quote! { let Some(inner) = &rec.#field_name }
364        };
365
366        let some = if is_a_timestamp {
367            quote! { Some(inner.timestamp_millis()) }
368        } else if is_a_date {
369            quote! { Some(inner.signed_duration_since(::chrono::NaiveDate::from_ymd(1970, 1, 1)).num_days() as i32)  }
370        } else if is_a_uuid {
371            quote! { Some((&inner.to_string()[..]).into()) }
372        } else if is_a_byte_buf {
373            quote! { Some((&inner[..]).into())}
374        } else {
375            // Type might need converting to a physical type
376            match self.ty.physical_type() {
377                parquet::basic::Type::INT32 => quote! { Some(inner as i32) },
378                parquet::basic::Type::INT64 => quote! { Some(inner as i64) },
379                _ => quote! { Some(inner) },
380            }
381        };
382
383        quote! {
384            let vals: Vec<_> = records.iter().filter_map(|rec| {
385                if #binding {
386                    #some
387                } else {
388                    None
389                }
390            }).collect();
391        }
392    }
393
394    // generates code to read `field_name` from each record into a vector `vals`
395    fn copied_direct_vals(&self) -> proc_macro2::TokenStream {
396        let field_name = &self.ident;
397
398        let access = match self.third_party_type {
399            Some(ThirdPartyType::ChronoNaiveDateTime) => {
400                quote! { rec.#field_name.timestamp_millis() }
401            }
402            Some(ThirdPartyType::ChronoNaiveDate) => {
403                quote! { rec.#field_name.signed_duration_since(::chrono::NaiveDate::from_ymd(1970, 1, 1)).num_days() as i32 }
404            }
405            Some(ThirdPartyType::Uuid) => {
406                quote! { rec.#field_name.as_bytes().to_vec().into() }
407            }
408            _ => {
409                if self.is_a_byte_buf {
410                    quote! { (&rec.#field_name[..]).into() }
411                } else {
412                    // Type might need converting to a physical type
413                    match self.ty.physical_type() {
414                        parquet::basic::Type::INT32 => quote! { rec.#field_name as i32 },
415                        parquet::basic::Type::INT64 => quote! { rec.#field_name as i64 },
416                        _ => quote! { rec.#field_name },
417                    }
418                }
419            }
420        };
421
422        quote! {
423            let vals: Vec<_> = records.iter().map(|rec| #access).collect();
424        }
425    }
426
427    // generates code to read a vector `records` into `field_name` for each record
428    fn copied_direct_fields(&self) -> proc_macro2::TokenStream {
429        let field_name = &self.ident;
430
431        let value = match self.third_party_type {
432            Some(ThirdPartyType::ChronoNaiveDateTime) => {
433                quote! { ::chrono::naive::NaiveDateTime::from_timestamp_millis(vals[i]).unwrap() }
434            }
435            Some(ThirdPartyType::ChronoNaiveDate) => {
436                // NaiveDateTime::UNIX_EPOCH.num_days_from_ce() == 719163
437                quote! {
438                    ::chrono::naive::NaiveDate::from_num_days_from_ce_opt(vals[i].saturating_add(719163)).unwrap()
439                }
440            }
441            Some(ThirdPartyType::Uuid) => {
442                quote! { ::uuid::Uuid::from_bytes(vals[i].data().try_into().unwrap()) }
443            }
444            _ => match &self.ty {
445                Type::TypePath(_) => match self.ty.last_part().as_str() {
446                    "String" => quote! { String::from(std::str::from_utf8(vals[i].data())
447                    .expect("invalid UTF-8 sequence")) },
448                    t => {
449                        let s: proc_macro2::TokenStream = t.parse().unwrap();
450                        quote! { vals[i] as #s }
451                    }
452                },
453                Type::Vec(_) => quote! { vals[i].data().to_vec() },
454                f => unimplemented!("Unsupported: {:#?}", f),
455            },
456        };
457
458        quote! {
459            for (i, r) in &mut records[..num_records].iter_mut().enumerate() {
460                r.#field_name = #value;
461            }
462        }
463    }
464
465    fn optional_definition_levels(&self) -> proc_macro2::TokenStream {
466        let field_name = &self.ident;
467
468        quote! {
469            let definition_levels: Vec<i16> = self
470              .iter()
471              .map(|rec| if rec.#field_name.is_some() { 1 } else { 0 })
472              .collect();
473        }
474    }
475}
476
477#[allow(clippy::enum_variant_names)]
478#[allow(clippy::large_enum_variant)]
479#[derive(Debug, PartialEq)]
480enum Type {
481    Array(Box<Type>, syn::Expr),
482    Option(Box<Type>),
483    Slice(Box<Type>),
484    Vec(Box<Type>),
485    TypePath(syn::Type),
486    Reference(Option<syn::Lifetime>, Box<Type>),
487}
488
489impl Type {
490    /// Takes a rust type and returns the appropriate
491    /// parquet-rs column writer
492    fn column_writer(&self) -> syn::TypePath {
493        use parquet::basic::Type as BasicType;
494
495        match self.physical_type() {
496            BasicType::BOOLEAN => {
497                syn::parse_quote!(ColumnWriter::BoolColumnWriter)
498            }
499            BasicType::INT32 => syn::parse_quote!(ColumnWriter::Int32ColumnWriter),
500            BasicType::INT64 => syn::parse_quote!(ColumnWriter::Int64ColumnWriter),
501            BasicType::INT96 => syn::parse_quote!(ColumnWriter::Int96ColumnWriter),
502            BasicType::FLOAT => syn::parse_quote!(ColumnWriter::FloatColumnWriter),
503            BasicType::DOUBLE => syn::parse_quote!(ColumnWriter::DoubleColumnWriter),
504            BasicType::BYTE_ARRAY => {
505                syn::parse_quote!(ColumnWriter::ByteArrayColumnWriter)
506            }
507            BasicType::FIXED_LEN_BYTE_ARRAY => {
508                syn::parse_quote!(ColumnWriter::FixedLenByteArrayColumnWriter)
509            }
510        }
511    }
512
513    /// Takes a rust type and returns the appropriate
514    /// parquet-rs column reader
515    fn column_reader(&self) -> syn::TypePath {
516        use parquet::basic::Type as BasicType;
517
518        match self.physical_type() {
519            BasicType::BOOLEAN => {
520                syn::parse_quote!(ColumnReader::BoolColumnReader)
521            }
522            BasicType::INT32 => syn::parse_quote!(ColumnReader::Int32ColumnReader),
523            BasicType::INT64 => syn::parse_quote!(ColumnReader::Int64ColumnReader),
524            BasicType::INT96 => syn::parse_quote!(ColumnReader::Int96ColumnReader),
525            BasicType::FLOAT => syn::parse_quote!(ColumnReader::FloatColumnReader),
526            BasicType::DOUBLE => syn::parse_quote!(ColumnReader::DoubleColumnReader),
527            BasicType::BYTE_ARRAY => {
528                syn::parse_quote!(ColumnReader::ByteArrayColumnReader)
529            }
530            BasicType::FIXED_LEN_BYTE_ARRAY => {
531                syn::parse_quote!(ColumnReader::FixedLenByteArrayColumnReader)
532            }
533        }
534    }
535
536    /// Helper to simplify a nested field definition to its leaf type
537    ///
538    /// Ex:
539    ///   `Option<&String>` => Type::TypePath(String)
540    ///   `&Option<i32>` => Type::TypePath(i32)
541    ///   `Vec<Vec<u8>>` => Type::Vec(u8)
542    ///
543    /// Useful in determining the physical type of a field and the
544    /// definition levels.
545    fn leaf_type_recursive(&self) -> &Type {
546        Type::leaf_type_recursive_helper(self, None)
547    }
548
549    fn leaf_type_recursive_helper<'a>(ty: &'a Type, parent_ty: Option<&'a Type>) -> &'a Type {
550        match ty {
551            Type::TypePath(_) => parent_ty.unwrap_or(ty),
552            Type::Option(first_type)
553            | Type::Vec(first_type)
554            | Type::Array(first_type, _)
555            | Type::Slice(first_type)
556            | Type::Reference(_, first_type) => {
557                Type::leaf_type_recursive_helper(first_type, Some(ty))
558            }
559        }
560    }
561
562    /// Helper method to further unwrap leaf_type() to get inner-most
563    /// type information, useful for determining the physical type
564    /// and normalizing the type paths.
565    fn inner_type(&self) -> &syn::Type {
566        let leaf_type = self.leaf_type_recursive();
567
568        match leaf_type {
569            Type::TypePath(type_) => type_,
570            Type::Option(first_type)
571            | Type::Vec(first_type)
572            | Type::Array(first_type, _)
573            | Type::Slice(first_type)
574            | Type::Reference(_, first_type) => match **first_type {
575                Type::TypePath(ref type_) => type_,
576                _ => unimplemented!("leaf_type() should only return shallow types"),
577            },
578        }
579    }
580
581    /// Helper to normalize a type path by extracting the
582    /// most identifiable part
583    ///
584    /// Ex:
585    ///   std::string::String => String
586    ///   `Vec<u8>` => `Vec<u8>`
587    ///   chrono::NaiveDateTime => NaiveDateTime
588    ///
589    /// Does run the risk of mis-identifying a type if import
590    /// rename is in play. Please note procedural macros always
591    /// run before type resolution so this is a risk the user
592    /// takes on when renaming imports.
593    fn last_part(&self) -> String {
594        let inner_type = self.inner_type();
595        let inner_type_str = (quote! { #inner_type }).to_string();
596
597        inner_type_str
598            .split("::")
599            .last()
600            .unwrap()
601            .trim()
602            .to_string()
603    }
604
605    /// Converts rust types to parquet physical types.
606    ///
607    /// Ex:
608    ///   [u8; 10] => FIXED_LEN_BYTE_ARRAY
609    ///   `Vec<u8>`  => BYTE_ARRAY
610    ///   String => BYTE_ARRAY
611    ///   i32 => INT32
612    fn physical_type(&self) -> parquet::basic::Type {
613        use parquet::basic::Type as BasicType;
614
615        let last_part = self.last_part();
616        let leaf_type = self.leaf_type_recursive();
617
618        match leaf_type {
619            Type::Array(first_type, _length) => {
620                if let Type::TypePath(_) = **first_type {
621                    if last_part == "u8" {
622                        return BasicType::FIXED_LEN_BYTE_ARRAY;
623                    }
624                }
625            }
626            Type::Vec(first_type) | Type::Slice(first_type) => {
627                if let Type::TypePath(_) = **first_type {
628                    if last_part == "u8" {
629                        return BasicType::BYTE_ARRAY;
630                    }
631                }
632            }
633            _ => (),
634        }
635
636        match last_part.trim() {
637            "bool" => BasicType::BOOLEAN,
638            "u8" | "u16" | "u32" => BasicType::INT32,
639            "i8" | "i16" | "i32" | "NaiveDate" => BasicType::INT32,
640            "u64" | "i64" | "NaiveDateTime" => BasicType::INT64,
641            "usize" | "isize" => {
642                if usize::BITS == 64 {
643                    BasicType::INT64
644                } else {
645                    BasicType::INT32
646                }
647            }
648            "f32" => BasicType::FLOAT,
649            "f64" => BasicType::DOUBLE,
650            "String" | "str" | "Arc < str >" => BasicType::BYTE_ARRAY,
651            "Uuid" => BasicType::FIXED_LEN_BYTE_ARRAY,
652            f => unimplemented!("{} currently is not supported", f),
653        }
654    }
655
656    fn length(&self) -> Option<syn::Expr> {
657        let last_part = self.last_part();
658        let leaf_type = self.leaf_type_recursive();
659
660        // `[u8; N]` => Some(N)
661        if let Type::Array(first_type, length) = leaf_type {
662            if let Type::TypePath(_) = **first_type {
663                if last_part == "u8" {
664                    return Some(length.clone());
665                }
666            }
667        }
668
669        match last_part.trim() {
670            // Uuid => [u8; 16] => Some(16)
671            "Uuid" => Some(syn::parse_quote!(16)),
672            _ => None,
673        }
674    }
675
676    fn logical_type(&self) -> proc_macro2::TokenStream {
677        let last_part = self.last_part();
678        let leaf_type = self.leaf_type_recursive();
679
680        match leaf_type {
681            Type::Array(first_type, _length) => {
682                if let Type::TypePath(_) = **first_type {
683                    if last_part == "u8" {
684                        return quote! { None };
685                    }
686                }
687            }
688            Type::Vec(first_type) | Type::Slice(first_type) => {
689                if let Type::TypePath(_) = **first_type {
690                    if last_part == "u8" {
691                        return quote! { None };
692                    }
693                }
694            }
695            _ => (),
696        }
697
698        match last_part.trim() {
699            "bool" => quote! { None },
700            "u8" => quote! { Some(LogicalType::integer(8, false)) },
701            "u16" => quote! { Some(LogicalType::integer(16, false)) },
702            "u32" => quote! { Some(LogicalType::integer(32, false)) },
703            "u64" => quote! { Some(LogicalType::integer(64, false)) },
704            "i8" => quote! { Some(LogicalType::integer(8, true)) },
705            "i16" => quote! { Some(LogicalType::integer(16, true)) },
706            "i32" | "i64" => quote! { None },
707            "usize" => {
708                quote! { Some(LogicalType::integer(usize::BITS as i8, false)) }
709            }
710            "isize" => {
711                quote! { Some(LogicalType::integer(usize::BITS as i8, true)) }
712            }
713            "NaiveDate" => quote! { Some(LogicalType::Date) },
714            "NaiveDateTime" => quote! { None },
715            "f32" | "f64" => quote! { None },
716            "String" | "str" | "Arc < str >" => quote! { Some(LogicalType::String) },
717            "Uuid" => quote! { Some(LogicalType::Uuid) },
718            f => unimplemented!("{} currently is not supported", f),
719        }
720    }
721
722    fn converted_type(&self) -> Option<proc_macro2::TokenStream> {
723        let last_part = self.last_part();
724
725        match last_part.trim() {
726            "NaiveDateTime" => Some(quote! { ::parquet::basic::ConvertedType::TIMESTAMP_MILLIS }),
727            _ => None,
728        }
729    }
730
731    fn repetition(&self) -> proc_macro2::TokenStream {
732        match self {
733            Type::Option(_) => quote! { ::parquet::basic::Repetition::OPTIONAL },
734            Type::Reference(_, ty) => ty.repetition(),
735            _ => quote! { ::parquet::basic::Repetition::REQUIRED },
736        }
737    }
738
739    /// Convert a parsed rust field AST in to a more easy to manipulate
740    /// parquet_derive::Field
741    fn from(f: &syn::Field) -> Self {
742        Type::from_type(f, &f.ty)
743    }
744
745    fn from_type(f: &syn::Field, ty: &syn::Type) -> Self {
746        match ty {
747            syn::Type::Path(p) => Type::from_type_path(f, p),
748            syn::Type::Reference(tr) => Type::from_type_reference(f, tr),
749            syn::Type::Array(ta) => Type::from_type_array(f, ta),
750            syn::Type::Slice(ts) => Type::from_type_slice(f, ts),
751            other => unimplemented!(
752                "Unable to derive {:?} - it is currently an unsupported type\n{:#?}",
753                f.ident.as_ref().unwrap(),
754                other
755            ),
756        }
757    }
758
759    fn from_type_path(f: &syn::Field, p: &syn::TypePath) -> Self {
760        let last_segment = p.path.segments.last().unwrap();
761
762        let is_vec = last_segment.ident == syn::Ident::new("Vec", proc_macro2::Span::call_site());
763        let is_option =
764            last_segment.ident == syn::Ident::new("Option", proc_macro2::Span::call_site());
765
766        if is_vec || is_option {
767            let generic_type = match &last_segment.arguments {
768                syn::PathArguments::AngleBracketed(angle_args) => {
769                    assert_eq!(angle_args.args.len(), 1);
770                    let first_arg = &angle_args.args[0];
771
772                    match first_arg {
773                        syn::GenericArgument::Type(typath) => typath.clone(),
774                        other => unimplemented!("Unsupported: {:#?}", other),
775                    }
776                }
777                other => unimplemented!("Unsupported: {:#?}", other),
778            };
779
780            if is_vec {
781                Type::Vec(Box::new(Type::from_type(f, &generic_type)))
782            } else {
783                Type::Option(Box::new(Type::from_type(f, &generic_type)))
784            }
785        } else {
786            Type::TypePath(syn::Type::Path(p.clone()))
787        }
788    }
789
790    fn from_type_reference(f: &syn::Field, tr: &syn::TypeReference) -> Self {
791        let lifetime = tr.lifetime.clone();
792        let inner_type = Type::from_type(f, tr.elem.as_ref());
793        Type::Reference(lifetime, Box::new(inner_type))
794    }
795
796    fn from_type_array(f: &syn::Field, ta: &syn::TypeArray) -> Self {
797        let inner_type = Type::from_type(f, ta.elem.as_ref());
798        Type::Array(Box::new(inner_type), ta.len.clone())
799    }
800
801    fn from_type_slice(f: &syn::Field, ts: &syn::TypeSlice) -> Self {
802        let inner_type = Type::from_type(f, ts.elem.as_ref());
803        Type::Slice(Box::new(inner_type))
804    }
805}
806
807#[cfg(test)]
808mod test {
809    use super::*;
810    use syn::{Data, DataStruct, DeriveInput};
811
812    fn extract_fields(input: proc_macro2::TokenStream) -> Vec<syn::Field> {
813        let input: DeriveInput = syn::parse2(input).unwrap();
814
815        let fields = match input.data {
816            Data::Struct(DataStruct { fields, .. }) => fields,
817            _ => panic!("Input must be a struct"),
818        };
819
820        fields.iter().map(|field| field.to_owned()).collect()
821    }
822
823    #[test]
824    fn test_generating_a_simple_writer_snippet() {
825        let snippet: proc_macro2::TokenStream = quote! {
826          struct ABoringStruct {
827            counter: usize,
828          }
829        };
830
831        let fields = extract_fields(snippet);
832        let counter = Field::from(&fields[0]);
833
834        let snippet = counter.writer_snippet().to_string();
835        assert_eq!(snippet,
836                   (quote!{
837                        {
838                            let vals : Vec < _ > = records . iter ( ) . map ( | rec | rec . counter as i64 ) . collect ( );
839
840                            if let ColumnWriter::Int64ColumnWriter ( typed ) = column_writer.untyped() {
841                                typed . write_batch ( & vals [ .. ] , None , None ) ?;
842                            }  else {
843                                panic!("Schema and struct disagree on type for {}" , stringify!{ counter } )
844                            }
845                        }
846                   }).to_string()
847        )
848    }
849
850    #[test]
851    fn test_generating_a_simple_reader_snippet() {
852        let snippet: proc_macro2::TokenStream = quote! {
853          struct ABoringStruct {
854            counter: usize,
855          }
856        };
857
858        let fields = extract_fields(snippet);
859        let counter = Field::from(&fields[0]);
860
861        let snippet = counter.reader_snippet().to_string();
862        assert_eq!(
863            snippet,
864            (quote! {
865                 {
866                    let mut vals = Vec::new();
867                    if let ColumnReader::Int64ColumnReader(mut typed) = column_reader {
868                        let mut definition_levels = Vec::new();
869                        let (total_num, valid_num, decoded_num) = typed.read_records(
870                            num_records, Some(&mut definition_levels), None, &mut vals)?;
871                        if valid_num != decoded_num {
872                            panic!("Support only valid records, found {} null records in column type {}",
873                                decoded_num - valid_num, stringify!{counter});
874                        }
875                    } else {
876                        panic!("Schema and struct disagree on type for {}", stringify!{counter});
877                    }
878                    for (i, r) in &mut records[..num_records].iter_mut().enumerate() {
879                        r.counter = vals[i] as usize;
880                    }
881                 }
882            })
883            .to_string()
884        )
885    }
886
887    #[test]
888    fn test_parquet_type_with_raw_identifier() {
889        let snippet: proc_macro2::TokenStream = quote! {
890          struct ABoringStruct {
891            r#type: i32,
892          }
893        };
894
895        let fields = extract_fields(snippet);
896        let r#type = Field::from(&fields[0]);
897
898        // the raw identifier `r#type` is named `type` in the parquet schema
899        let snippet = r#type.parquet_type().to_string();
900        assert!(
901            snippet.contains("primitive_type_builder (\"type\""),
902            "{snippet}"
903        );
904    }
905
906    #[test]
907    fn test_optional_to_writer_snippet() {
908        let struct_def: proc_macro2::TokenStream = quote! {
909          struct StringBorrower<'a> {
910            optional_str: Option<&'a str>,
911            optional_string: Option<&String>,
912            optional_dumb_int: Option<&i32>,
913          }
914        };
915
916        let fields = extract_fields(struct_def);
917
918        let optional = Field::from(&fields[0]);
919        let snippet = optional.writer_snippet();
920        assert_eq!(snippet.to_string(),
921          (quote! {
922          {
923                let definition_levels : Vec < i16 > = self . iter ( ) . map ( | rec | if rec . optional_str . is_some ( ) { 1 } else { 0 } ) . collect ( ) ;
924
925                let vals: Vec <_> = records.iter().filter_map( |rec| {
926                    if let Some ( inner ) = &rec . optional_str {
927                        Some ( (&inner[..]).into() )
928                    } else {
929                        None
930                    }
931                }).collect();
932
933                if let ColumnWriter::ByteArrayColumnWriter ( typed ) = column_writer.untyped() {
934                    typed . write_batch ( & vals [ .. ] , Some(&definition_levels[..]) , None ) ? ;
935                } else {
936                    panic!("Schema and struct disagree on type for {}" , stringify ! { optional_str } )
937                }
938           }
939            }
940          ).to_string());
941
942        let optional = Field::from(&fields[1]);
943        let snippet = optional.writer_snippet();
944        assert_eq!(snippet.to_string(),
945                   (quote!{
946                   {
947                        let definition_levels : Vec < i16 > = self . iter ( ) . map ( | rec | if rec . optional_string . is_some ( ) { 1 } else { 0 } ) . collect ( ) ;
948
949                        let vals: Vec <_> = records.iter().filter_map( |rec| {
950                            if let Some ( inner ) = &rec . optional_string {
951                                Some ( (&inner[..]).into() )
952                            } else {
953                                None
954                            }
955                        }).collect();
956
957                        if let ColumnWriter::ByteArrayColumnWriter ( typed ) = column_writer.untyped() {
958                            typed . write_batch ( & vals [ .. ] , Some(&definition_levels[..]) , None ) ? ;
959                        } else {
960                            panic!("Schema and struct disagree on type for {}" , stringify ! { optional_string } )
961                        }
962                    }
963        }).to_string());
964
965        let optional = Field::from(&fields[2]);
966        let snippet = optional.writer_snippet();
967        assert_eq!(snippet.to_string(),
968                   (quote!{
969                    {
970                        let definition_levels : Vec < i16 > = self . iter ( ) . map ( | rec | if rec . optional_dumb_int . is_some ( ) { 1 } else { 0 } ) . collect ( ) ;
971
972                        let vals: Vec <_> = records.iter().filter_map( |rec| {
973                            if let Some ( inner ) = rec . optional_dumb_int {
974                                Some ( inner as i32 )
975                            } else {
976                                None
977                            }
978                        }).collect();
979
980                        if let ColumnWriter::Int32ColumnWriter ( typed ) = column_writer.untyped() {
981                            typed . write_batch ( & vals [ .. ] , Some(&definition_levels[..]) , None ) ? ;
982                        }  else {
983                            panic!("Schema and struct disagree on type for {}" , stringify ! { optional_dumb_int } )
984                        }
985                    }
986        }).to_string());
987    }
988
989    #[test]
990    fn test_converting_to_column_writer_type() {
991        let snippet: proc_macro2::TokenStream = quote! {
992          struct ABasicStruct {
993            yes_no: bool,
994            name: String,
995          }
996        };
997
998        let fields = extract_fields(snippet);
999        let processed: Vec<_> = fields.iter().map(Field::from).collect();
1000
1001        let column_writers: Vec<_> = processed
1002            .iter()
1003            .map(|field| field.ty.column_writer())
1004            .collect();
1005
1006        assert_eq!(
1007            column_writers,
1008            vec![
1009                syn::parse_quote!(ColumnWriter::BoolColumnWriter),
1010                syn::parse_quote!(ColumnWriter::ByteArrayColumnWriter)
1011            ]
1012        );
1013    }
1014
1015    #[test]
1016    fn test_converting_to_column_reader_type() {
1017        let snippet: proc_macro2::TokenStream = quote! {
1018          struct ABasicStruct {
1019            yes_no: bool,
1020            name: String,
1021          }
1022        };
1023
1024        let fields = extract_fields(snippet);
1025        let processed: Vec<_> = fields.iter().map(Field::from).collect();
1026
1027        let column_readers: Vec<_> = processed
1028            .iter()
1029            .map(|field| field.ty.column_reader())
1030            .collect();
1031
1032        assert_eq!(
1033            column_readers,
1034            vec![
1035                syn::parse_quote!(ColumnReader::BoolColumnReader),
1036                syn::parse_quote!(ColumnReader::ByteArrayColumnReader)
1037            ]
1038        );
1039    }
1040
1041    #[test]
1042    fn convert_basic_struct() {
1043        let snippet: proc_macro2::TokenStream = quote! {
1044          struct ABasicStruct {
1045            yes_no: bool,
1046            name: String,
1047            length: usize
1048          }
1049        };
1050
1051        let fields = extract_fields(snippet);
1052        let processed: Vec<_> = fields.iter().map(Field::from).collect();
1053        assert_eq!(processed.len(), 3);
1054
1055        assert_eq!(
1056            processed,
1057            vec![
1058                Field {
1059                    ident: syn::Ident::new("yes_no", proc_macro2::Span::call_site()),
1060                    ty: Type::TypePath(syn::parse_quote!(bool)),
1061                    is_a_byte_buf: false,
1062                    third_party_type: None,
1063                },
1064                Field {
1065                    ident: syn::Ident::new("name", proc_macro2::Span::call_site()),
1066                    ty: Type::TypePath(syn::parse_quote!(String)),
1067                    is_a_byte_buf: true,
1068                    third_party_type: None,
1069                },
1070                Field {
1071                    ident: syn::Ident::new("length", proc_macro2::Span::call_site()),
1072                    ty: Type::TypePath(syn::parse_quote!(usize)),
1073                    is_a_byte_buf: false,
1074                    third_party_type: None,
1075                }
1076            ]
1077        )
1078    }
1079
1080    #[test]
1081    fn test_get_inner_type() {
1082        let snippet: proc_macro2::TokenStream = quote! {
1083          struct LotsOfInnerTypes {
1084            a_vec: Vec<u8>,
1085            a_option: ::std::option::Option<bool>,
1086            a_silly_string: ::std::string::String,
1087            a_complicated_thing: ::std::option::Option<::std::result::Result<(),()>>,
1088          }
1089        };
1090
1091        let fields = extract_fields(snippet);
1092        let converted_fields: Vec<_> = fields.iter().map(Type::from).collect();
1093        let inner_types: Vec<_> = converted_fields
1094            .iter()
1095            .map(|field| field.inner_type())
1096            .collect();
1097        let inner_types_strs: Vec<_> = inner_types
1098            .iter()
1099            .map(|ty| (quote! { #ty }).to_string())
1100            .collect();
1101
1102        assert_eq!(
1103            inner_types_strs,
1104            vec![
1105                "u8",
1106                "bool",
1107                ":: std :: string :: String",
1108                ":: std :: result :: Result < () , () >"
1109            ]
1110        )
1111    }
1112
1113    #[test]
1114    fn test_physical_type() {
1115        use parquet::basic::Type as BasicType;
1116        let snippet: proc_macro2::TokenStream = quote! {
1117          struct LotsOfInnerTypes {
1118            a_buf: ::std::vec::Vec<u8>,
1119            a_number: i32,
1120            a_verbose_option: ::std::option::Option<bool>,
1121            a_silly_string: String,
1122            a_fix_byte_buf: [u8; 10],
1123            a_complex_option: ::std::option::Option<&Vec<u8>>,
1124            a_complex_vec: &::std::vec::Vec<&Option<u8>>,
1125            a_uuid: ::uuid::Uuid,
1126          }
1127        };
1128
1129        let fields = extract_fields(snippet);
1130        let converted_fields: Vec<_> = fields.iter().map(Type::from).collect();
1131        let physical_types: Vec<_> = converted_fields
1132            .iter()
1133            .map(|ty| ty.physical_type())
1134            .collect();
1135
1136        assert_eq!(
1137            physical_types,
1138            vec![
1139                BasicType::BYTE_ARRAY,
1140                BasicType::INT32,
1141                BasicType::BOOLEAN,
1142                BasicType::BYTE_ARRAY,
1143                BasicType::FIXED_LEN_BYTE_ARRAY,
1144                BasicType::BYTE_ARRAY,
1145                BasicType::INT32,
1146                BasicType::FIXED_LEN_BYTE_ARRAY,
1147            ]
1148        )
1149    }
1150
1151    #[test]
1152    fn test_type_length() {
1153        let snippet: proc_macro2::TokenStream = quote! {
1154          struct LotsOfInnerTypes {
1155            a_buf: ::std::vec::Vec<u8>,
1156            a_number: i32,
1157            a_verbose_option: ::std::option::Option<bool>,
1158            a_silly_string: String,
1159            a_fix_byte_buf: [u8; 10],
1160            a_complex_option: ::std::option::Option<&Vec<u8>>,
1161            a_complex_vec: &::std::vec::Vec<&Option<u8>>,
1162            a_uuid: ::uuid::Uuid,
1163          }
1164        };
1165
1166        let fields = extract_fields(snippet);
1167        let converted_fields: Vec<_> = fields.iter().map(Type::from).collect();
1168        let lengths: Vec<_> = converted_fields.iter().map(|ty| ty.length()).collect();
1169
1170        assert_eq!(
1171            lengths,
1172            vec![
1173                None,
1174                None,
1175                None,
1176                None,
1177                Some(syn::parse_quote!(10)),
1178                None,
1179                None,
1180                Some(syn::parse_quote!(16)),
1181            ]
1182        )
1183    }
1184
1185    #[test]
1186    fn test_convert_comprehensive_owned_struct() {
1187        let snippet: proc_macro2::TokenStream = quote! {
1188          struct VecHolder {
1189            a_vec: ::std::vec::Vec<u8>,
1190            a_option: ::std::option::Option<bool>,
1191            a_silly_string: ::std::string::String,
1192            a_complicated_thing: ::std::option::Option<::std::result::Result<(),()>>,
1193          }
1194        };
1195
1196        let fields = extract_fields(snippet);
1197        let converted_fields: Vec<_> = fields.iter().map(Type::from).collect();
1198
1199        assert_eq!(
1200            converted_fields,
1201            vec![
1202                Type::Vec(Box::new(Type::TypePath(syn::parse_quote!(u8)))),
1203                Type::Option(Box::new(Type::TypePath(syn::parse_quote!(bool)))),
1204                Type::TypePath(syn::parse_quote!(::std::string::String)),
1205                Type::Option(Box::new(Type::TypePath(
1206                    syn::parse_quote!(::std::result::Result<(),()>)
1207                ))),
1208            ]
1209        );
1210    }
1211
1212    #[test]
1213    fn test_convert_borrowed_struct() {
1214        let snippet: proc_macro2::TokenStream = quote! {
1215          struct Borrower<'a> {
1216            a_str: &'a str,
1217            a_borrowed_option: &'a Option<bool>,
1218            so_many_borrows: &'a Option<&'a str>,
1219          }
1220        };
1221
1222        let fields = extract_fields(snippet);
1223        let types: Vec<_> = fields.iter().map(Type::from).collect();
1224
1225        assert_eq!(
1226            types,
1227            vec![
1228                Type::Reference(
1229                    Some(syn::Lifetime::new("'a", proc_macro2::Span::call_site())),
1230                    Box::new(Type::TypePath(syn::parse_quote!(str)))
1231                ),
1232                Type::Reference(
1233                    Some(syn::Lifetime::new("'a", proc_macro2::Span::call_site())),
1234                    Box::new(Type::Option(Box::new(Type::TypePath(syn::parse_quote!(
1235                        bool
1236                    )))))
1237                ),
1238                Type::Reference(
1239                    Some(syn::Lifetime::new("'a", proc_macro2::Span::call_site())),
1240                    Box::new(Type::Option(Box::new(Type::Reference(
1241                        Some(syn::Lifetime::new("'a", proc_macro2::Span::call_site())),
1242                        Box::new(Type::TypePath(syn::parse_quote!(str)))
1243                    ))))
1244                ),
1245            ]
1246        );
1247    }
1248
1249    #[test]
1250    fn test_chrono_timestamp_millis_write() {
1251        let snippet: proc_macro2::TokenStream = quote! {
1252          struct ATimestampStruct {
1253            henceforth: chrono::NaiveDateTime,
1254            maybe_happened: Option<&chrono::NaiveDateTime>,
1255          }
1256        };
1257
1258        let fields = extract_fields(snippet);
1259        let when = Field::from(&fields[0]);
1260        assert_eq!(when.writer_snippet().to_string(),(quote!{
1261            {
1262                let vals : Vec<_> = records.iter().map(|rec| rec.henceforth.timestamp_millis() ).collect();
1263                if let ColumnWriter::Int64ColumnWriter(typed) = column_writer.untyped() {
1264                    typed.write_batch(&vals[..], None, None) ?;
1265                } else {
1266                    panic!("Schema and struct disagree on type for {}" , stringify!{ henceforth })
1267                }
1268            }
1269        }).to_string());
1270
1271        let maybe_happened = Field::from(&fields[1]);
1272        assert_eq!(maybe_happened.writer_snippet().to_string(),(quote!{
1273            {
1274                let definition_levels : Vec<i16> = self.iter().map(|rec| if rec.maybe_happened.is_some() { 1 } else { 0 }).collect();
1275                let vals : Vec<_> = records.iter().filter_map(|rec| {
1276                    if let Some(inner) = rec.maybe_happened {
1277                        Some(inner.timestamp_millis())
1278                    } else {
1279                        None
1280                    }
1281                }).collect();
1282
1283                if let ColumnWriter::Int64ColumnWriter(typed) = column_writer.untyped() {
1284                    typed.write_batch(&vals[..], Some(&definition_levels[..]), None) ?;
1285                } else {
1286                    panic!("Schema and struct disagree on type for {}" , stringify!{ maybe_happened })
1287                }
1288            }
1289        }).to_string());
1290    }
1291
1292    #[test]
1293    fn test_chrono_timestamp_millis_read() {
1294        let snippet: proc_macro2::TokenStream = quote! {
1295          struct ATimestampStruct {
1296            henceforth: chrono::NaiveDateTime,
1297          }
1298        };
1299
1300        let fields = extract_fields(snippet);
1301        let when = Field::from(&fields[0]);
1302        assert_eq!(when.reader_snippet().to_string(),(quote!{
1303            {
1304                let mut vals = Vec::new();
1305                if let ColumnReader::Int64ColumnReader(mut typed) = column_reader {
1306                    let mut definition_levels = Vec::new();
1307                    let (total_num, valid_num, decoded_num) = typed.read_records(
1308                        num_records, Some(&mut definition_levels), None, &mut vals)?;
1309                    if valid_num != decoded_num {
1310                        panic!("Support only valid records, found {} null records in column type {}",
1311                            decoded_num - valid_num, stringify!{henceforth});
1312                    }
1313                } else {
1314                    panic!("Schema and struct disagree on type for {}", stringify!{ henceforth });
1315                }
1316                for (i, r) in &mut records[..num_records].iter_mut().enumerate() {
1317                    r.henceforth = ::chrono::naive::NaiveDateTime::from_timestamp_millis(vals[i]).unwrap();
1318                }
1319            }
1320        }).to_string());
1321    }
1322
1323    #[test]
1324    fn test_chrono_date_write() {
1325        let snippet: proc_macro2::TokenStream = quote! {
1326          struct ATimestampStruct {
1327            henceforth: chrono::NaiveDate,
1328            maybe_happened: Option<&chrono::NaiveDate>,
1329          }
1330        };
1331
1332        let fields = extract_fields(snippet);
1333        let when = Field::from(&fields[0]);
1334        assert_eq!(when.writer_snippet().to_string(),(quote!{
1335            {
1336                let vals : Vec<_> = records.iter().map(|rec| rec.henceforth.signed_duration_since(::chrono::NaiveDate::from_ymd(1970, 1, 1)).num_days() as i32).collect();
1337                if let ColumnWriter::Int32ColumnWriter(typed) = column_writer.untyped() {
1338                    typed.write_batch(&vals[..], None, None) ?;
1339                } else {
1340                    panic!("Schema and struct disagree on type for {}" , stringify!{ henceforth })
1341                }
1342            }
1343        }).to_string());
1344
1345        let maybe_happened = Field::from(&fields[1]);
1346        assert_eq!(maybe_happened.writer_snippet().to_string(),(quote!{
1347            {
1348                let definition_levels : Vec<i16> = self.iter().map(|rec| if rec.maybe_happened.is_some() { 1 } else { 0 }).collect();
1349                let vals : Vec<_> = records.iter().filter_map(|rec| {
1350                    if let Some(inner) = rec.maybe_happened {
1351                        Some(inner.signed_duration_since(::chrono::NaiveDate::from_ymd(1970, 1, 1)).num_days() as i32)
1352                    } else {
1353                        None
1354                    }
1355                }).collect();
1356
1357                if let ColumnWriter::Int32ColumnWriter(typed) = column_writer.untyped() {
1358                    typed.write_batch(&vals[..], Some(&definition_levels[..]), None) ?;
1359                } else {
1360                    panic!("Schema and struct disagree on type for {}" , stringify!{ maybe_happened })
1361                }
1362            }
1363        }).to_string());
1364    }
1365
1366    #[test]
1367    fn test_chrono_date_read() {
1368        let snippet: proc_macro2::TokenStream = quote! {
1369          struct ATimestampStruct {
1370            henceforth: chrono::NaiveDate,
1371          }
1372        };
1373
1374        let fields = extract_fields(snippet);
1375        let when = Field::from(&fields[0]);
1376        assert_eq!(when.reader_snippet().to_string(),(quote!{
1377            {
1378                let mut vals = Vec::new();
1379                if let ColumnReader::Int32ColumnReader(mut typed) = column_reader {
1380                    let mut definition_levels = Vec::new();
1381                    let (total_num, valid_num, decoded_num) = typed.read_records(
1382                        num_records, Some(&mut definition_levels), None, &mut vals)?;
1383                    if valid_num != decoded_num {
1384                        panic!("Support only valid records, found {} null records in column type {}",
1385                            decoded_num - valid_num, stringify!{henceforth});
1386                    }
1387                } else {
1388                    panic!("Schema and struct disagree on type for {}", stringify!{ henceforth });
1389                }
1390                for (i, r) in &mut records[..num_records].iter_mut().enumerate() {
1391                    r.henceforth = ::chrono::naive::NaiveDate::from_num_days_from_ce_opt(vals[i].saturating_add(719163)).unwrap();
1392                }
1393            }
1394        }).to_string());
1395    }
1396
1397    #[test]
1398    fn test_uuid_write() {
1399        let snippet: proc_macro2::TokenStream = quote! {
1400          struct AUuidStruct {
1401            unique_id: uuid::Uuid,
1402            maybe_unique_id: Option<&uuid::Uuid>,
1403          }
1404        };
1405
1406        let fields = extract_fields(snippet);
1407        let when = Field::from(&fields[0]);
1408        assert_eq!(when.writer_snippet().to_string(),(quote!{
1409            {
1410                let vals : Vec<_> = records.iter().map(|rec| rec.unique_id.as_bytes().to_vec().into() ).collect();
1411                if let ColumnWriter::FixedLenByteArrayColumnWriter(typed) = column_writer.untyped() {
1412                    typed.write_batch(&vals[..], None, None) ?;
1413                } else {
1414                    panic!("Schema and struct disagree on type for {}" , stringify!{ unique_id })
1415                }
1416            }
1417        }).to_string());
1418
1419        let maybe_happened = Field::from(&fields[1]);
1420        assert_eq!(maybe_happened.writer_snippet().to_string(),(quote!{
1421            {
1422                let definition_levels : Vec<i16> = self.iter().map(|rec| if rec.maybe_unique_id.is_some() { 1 } else { 0 }).collect();
1423                let vals : Vec<_> = records.iter().filter_map(|rec| {
1424                    if let Some(inner) = &rec.maybe_unique_id {
1425                        Some((&inner.to_string()[..]).into())
1426                    } else {
1427                        None
1428                    }
1429                }).collect();
1430
1431                if let ColumnWriter::FixedLenByteArrayColumnWriter(typed) = column_writer.untyped() {
1432                    typed.write_batch(&vals[..], Some(&definition_levels[..]), None) ?;
1433                } else {
1434                    panic!("Schema and struct disagree on type for {}" , stringify!{ maybe_unique_id })
1435                }
1436            }
1437        }).to_string());
1438    }
1439
1440    #[test]
1441    fn test_uuid_read() {
1442        let snippet: proc_macro2::TokenStream = quote! {
1443          struct AUuidStruct {
1444            unique_id: uuid::Uuid,
1445          }
1446        };
1447
1448        let fields = extract_fields(snippet);
1449        let when = Field::from(&fields[0]);
1450        assert_eq!(when.reader_snippet().to_string(),(quote!{
1451            {
1452                let mut vals = Vec::new();
1453                if let ColumnReader::FixedLenByteArrayColumnReader(mut typed) = column_reader {
1454                    let mut definition_levels = Vec::new();
1455                    let (total_num, valid_num, decoded_num) = typed.read_records(
1456                        num_records, Some(&mut definition_levels), None, &mut vals)?;
1457                    if valid_num != decoded_num {
1458                        panic!("Support only valid records, found {} null records in column type {}",
1459                            decoded_num - valid_num, stringify!{unique_id});
1460                    }
1461                } else {
1462                    panic!("Schema and struct disagree on type for {}", stringify!{ unique_id });
1463                }
1464                for (i, r) in &mut records[..num_records].iter_mut().enumerate() {
1465                    r.unique_id = ::uuid::Uuid::from_bytes(vals[i].data().try_into().unwrap());
1466                }
1467            }
1468        }).to_string());
1469    }
1470
1471    #[test]
1472    fn test_converted_type() {
1473        let snippet: proc_macro2::TokenStream = quote! {
1474          struct ATimeStruct {
1475            time: chrono::NaiveDateTime,
1476          }
1477        };
1478
1479        let fields = extract_fields(snippet);
1480
1481        let time = Field::from(&fields[0]);
1482
1483        let converted_type = time.ty.converted_type();
1484        assert_eq!(
1485            converted_type.unwrap().to_string(),
1486            quote! { ::parquet::basic::ConvertedType::TIMESTAMP_MILLIS }.to_string()
1487        );
1488    }
1489}