Skip to main content

parquet_derive/
parquet_field.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#[derive(Debug, PartialEq)]
19pub struct Field {
20    ident: syn::Ident,
21    ty: Type,
22    is_a_byte_buf: bool,
23    third_party_type: Option<ThirdPartyType>,
24}
25
26/// Use third party libraries, detected
27/// at compile time. These libraries will
28/// be written to parquet as their preferred
29/// physical type.
30///
31///   ChronoNaiveDateTime is written as i64
32///   ChronoNaiveDate is written as i32
33#[derive(Debug, PartialEq)]
34enum ThirdPartyType {
35    ChronoNaiveDateTime,
36    ChronoNaiveDate,
37    Uuid,
38}
39
40impl Field {
41    pub fn from(f: &syn::Field) -> Self {
42        let ty = Type::from(f);
43        let is_a_byte_buf = ty.physical_type() == parquet::basic::Type::BYTE_ARRAY;
44
45        let third_party_type = match &ty.last_part()[..] {
46            "NaiveDateTime" => Some(ThirdPartyType::ChronoNaiveDateTime),
47            "NaiveDate" => Some(ThirdPartyType::ChronoNaiveDate),
48            "Uuid" => Some(ThirdPartyType::Uuid),
49            _ => None,
50        };
51
52        Field {
53            ident: f
54                .ident
55                .clone()
56                .expect("Only structs with named fields are currently supported"),
57            ty,
58            is_a_byte_buf,
59            third_party_type,
60        }
61    }
62
63    /// Takes the parsed field of the struct and emits a valid
64    /// column writer snippet. Should match exactly what you
65    /// would write by hand.
66    ///
67    /// Can only generate writers for basic structs, for example:
68    ///
69    /// struct Record {
70    ///   a_bool: bool,
71    ///   maybe_a_bool: `Option<bool>`
72    /// }
73    ///
74    /// but not
75    ///
76    /// struct UnsupportedNestedRecord {
77    ///   a_property: bool,
78    ///   nested_record: Record
79    /// }
80    ///
81    /// because this parsing logic is not sophisticated enough for definition
82    /// levels beyond 2.
83    pub fn writer_snippet(&self) -> proc_macro2::TokenStream {
84        let ident = &self.ident;
85        let column_writer = self.ty.column_writer();
86
87        let vals_builder = match &self.ty {
88            Type::TypePath(_) => self.copied_direct_vals(),
89            Type::Option(first_type) => match **first_type {
90                Type::TypePath(_) => self.option_into_vals(),
91                Type::Reference(_, ref second_type) => match **second_type {
92                    Type::TypePath(_) => self.option_into_vals(),
93                    _ => unimplemented!("Unsupported type encountered"),
94                },
95                Type::Vec(ref first_type) => match **first_type {
96                    Type::TypePath(_) => self.option_into_vals(),
97                    _ => unimplemented!("Unsupported type encountered"),
98                },
99                ref f => unimplemented!("Unsupported: {:#?}", f),
100            },
101            Type::Reference(_, first_type) => match **first_type {
102                Type::TypePath(_) => self.copied_direct_vals(),
103                Type::Option(ref second_type) => match **second_type {
104                    Type::TypePath(_) => self.option_into_vals(),
105                    Type::Reference(_, ref second_type) => match **second_type {
106                        Type::TypePath(_) => self.option_into_vals(),
107                        Type::Slice(ref second_type) => match **second_type {
108                            Type::TypePath(_) => self.option_into_vals(),
109                            ref f => unimplemented!("Unsupported: {:#?}", f),
110                        },
111                        _ => unimplemented!("Unsupported type encountered"),
112                    },
113                    Type::Vec(ref first_type) => match **first_type {
114                        Type::TypePath(_) => self.option_into_vals(),
115                        _ => unimplemented!("Unsupported type encountered"),
116                    },
117                    ref f => unimplemented!("Unsupported: {:#?}", f),
118                },
119                Type::Slice(ref second_type) => match **second_type {
120                    Type::TypePath(_) => self.copied_direct_vals(),
121                    ref f => unimplemented!("Unsupported: {:#?}", f),
122                },
123                ref f => unimplemented!("Unsupported: {:#?}", f),
124            },
125            Type::Vec(first_type) => match **first_type {
126                Type::TypePath(_) => self.copied_direct_vals(),
127                ref f => unimplemented!("Unsupported: {:#?}", f),
128            },
129            f => unimplemented!("Unsupported: {:#?}", f),
130        };
131
132        let definition_levels = match &self.ty {
133            Type::TypePath(_) => None,
134            Type::Option(first_type) => match **first_type {
135                Type::TypePath(_) => Some(self.optional_definition_levels()),
136                Type::Option(_) => unimplemented!("Unsupported nesting encountered"),
137                Type::Reference(_, ref second_type)
138                | Type::Vec(ref second_type)
139                | Type::Array(ref second_type, _)
140                | Type::Slice(ref second_type) => match **second_type {
141                    Type::TypePath(_) => Some(self.optional_definition_levels()),
142                    _ => unimplemented!("Unsupported nesting encountered"),
143                },
144            },
145            Type::Reference(_, first_type)
146            | Type::Vec(first_type)
147            | Type::Array(first_type, _)
148            | Type::Slice(first_type) => match **first_type {
149                Type::TypePath(_) => None,
150                Type::Vec(ref second_type)
151                | Type::Array(ref second_type, _)
152                | Type::Slice(ref second_type) => match **second_type {
153                    Type::TypePath(_) => None,
154                    Type::Reference(_, ref third_type) => match **third_type {
155                        Type::TypePath(_) => None,
156                        _ => unimplemented!("Unsupported definition encountered"),
157                    },
158                    _ => unimplemented!("Unsupported definition encountered"),
159                },
160                Type::Reference(_, ref second_type) | Type::Option(ref second_type) => {
161                    match **second_type {
162                        Type::TypePath(_) => Some(self.optional_definition_levels()),
163                        Type::Vec(ref third_type)
164                        | Type::Array(ref third_type, _)
165                        | Type::Slice(ref third_type) => match **third_type {
166                            Type::TypePath(_) => Some(self.optional_definition_levels()),
167                            Type::Reference(_, ref fourth_type) => match **fourth_type {
168                                Type::TypePath(_) => Some(self.optional_definition_levels()),
169                                _ => unimplemented!("Unsupported definition encountered"),
170                            },
171                            _ => unimplemented!("Unsupported definition encountered"),
172                        },
173                        Type::Reference(_, ref third_type) => match **third_type {
174                            Type::TypePath(_) => Some(self.optional_definition_levels()),
175                            Type::Slice(ref fourth_type) => match **fourth_type {
176                                Type::TypePath(_) => Some(self.optional_definition_levels()),
177                                _ => unimplemented!("Unsupported definition encountered"),
178                            },
179                            _ => unimplemented!("Unsupported definition encountered"),
180                        },
181                        _ => unimplemented!("Unsupported definition encountered"),
182                    }
183                }
184            },
185        };
186
187        // "vals" is the run of primitive data being written for the column
188        // "definition_levels" is a vector of bools which controls whether a value is missing or present
189        // this TokenStream is only one part of the code for writing a column and
190        // it relies on values calculated in prior code snippets, namely "definition_levels" and "vals_builder".
191        // All the context is put together in this functions final quote and
192        // this expression just switches between non-nullable and nullable write statements
193        let write_batch_expr = if definition_levels.is_some() {
194            quote! {
195                if let #column_writer(typed) = column_writer.untyped() {
196                    typed.write_batch(&vals[..], Some(&definition_levels[..]), None)?;
197                } else {
198                    panic!("Schema and struct disagree on type for {}", stringify!{#ident})
199                }
200            }
201        } else {
202            quote! {
203                if let #column_writer(typed) = column_writer.untyped() {
204                    typed.write_batch(&vals[..], None, None)?;
205                } else {
206                    panic!("Schema and struct disagree on type for {}", stringify!{#ident})
207                }
208            }
209        };
210
211        quote! {
212            {
213                #definition_levels
214
215                #vals_builder
216
217                #write_batch_expr
218            }
219        }
220    }
221
222    /// Takes the parsed field of the struct and emits a valid
223    /// column reader snippet. Should match exactly what you
224    /// would write by hand.
225    ///
226    /// Can only generate writers for basic structs, for example:
227    ///
228    /// struct Record {
229    ///   a_bool: bool
230    /// }
231    ///
232    /// but not
233    ///
234    /// struct UnsupportedNestedRecord {
235    ///   a_property: bool,
236    ///   nested_record: Record
237    /// }
238    ///
239    /// because this parsing logic is not sophisticated enough for definition
240    /// levels beyond 2.
241    ///
242    /// `Option` types and references not supported, but the column itself can be nullable
243    /// (i.e., def_level==1), as long as the values are all valid.
244    pub fn reader_snippet(&self) -> proc_macro2::TokenStream {
245        let ident = &self.ident;
246        let column_reader = self.ty.column_reader();
247
248        // generate the code to read the column into a vector `vals`
249        let write_batch_expr = quote! {
250            let mut vals = Vec::new();
251            if let #column_reader(mut typed) = column_reader {
252                let mut definition_levels = Vec::new();
253                let (total_num, valid_num, decoded_num) = typed.read_records(
254                    num_records, Some(&mut definition_levels), None, &mut vals)?;
255                if valid_num != decoded_num {
256                    panic!("Support only valid records, found {} null records in column type {}",
257                        decoded_num - valid_num, stringify!{#ident});
258                }
259            } else {
260                panic!("Schema and struct disagree on type for {}", stringify!{#ident});
261            }
262        };
263
264        // generate the code to convert each element of `vals` to the correct type and then write
265        // it to its field in the corresponding struct
266        let vals_writer = match &self.ty {
267            Type::TypePath(_) => self.copied_direct_fields(),
268            Type::Reference(_, first_type) => match **first_type {
269                Type::TypePath(_) => self.copied_direct_fields(),
270                Type::Slice(ref second_type) => match **second_type {
271                    Type::TypePath(_) => self.copied_direct_fields(),
272                    ref f => unimplemented!("Unsupported: {:#?}", f),
273                },
274                ref f => unimplemented!("Unsupported: {:#?}", f),
275            },
276            Type::Vec(first_type) => match **first_type {
277                Type::TypePath(_) => self.copied_direct_fields(),
278                ref f => unimplemented!("Unsupported: {:#?}", f),
279            },
280            f => unimplemented!("Unsupported: {:#?}", f),
281        };
282
283        quote! {
284            {
285                #write_batch_expr
286
287                #vals_writer
288            }
289        }
290    }
291
292    pub fn parquet_type(&self) -> proc_macro2::TokenStream {
293        // TODO: Support group types
294        // TODO: Add length if dealing with fixedlenbinary
295
296        let field_name = &self.ident.to_string();
297        let physical_type = match self.ty.physical_type() {
298            parquet::basic::Type::BOOLEAN => quote! {
299                ::parquet::basic::Type::BOOLEAN
300            },
301            parquet::basic::Type::INT32 => quote! {
302                ::parquet::basic::Type::INT32
303            },
304            parquet::basic::Type::INT64 => quote! {
305                ::parquet::basic::Type::INT64
306            },
307            parquet::basic::Type::INT96 => quote! {
308                ::parquet::basic::Type::INT96
309            },
310            parquet::basic::Type::FLOAT => quote! {
311                ::parquet::basic::Type::FLOAT
312            },
313            parquet::basic::Type::DOUBLE => quote! {
314                ::parquet::basic::Type::DOUBLE
315            },
316            parquet::basic::Type::BYTE_ARRAY => quote! {
317                ::parquet::basic::Type::BYTE_ARRAY
318            },
319            parquet::basic::Type::FIXED_LEN_BYTE_ARRAY => quote! {
320                ::parquet::basic::Type::FIXED_LEN_BYTE_ARRAY
321            },
322        };
323        let logical_type = self.ty.logical_type();
324        let repetition = self.ty.repetition();
325        let converted_type = self.ty.converted_type();
326        let length = self.ty.length();
327
328        let mut builder = quote! {
329            ParquetType::primitive_type_builder(#field_name, #physical_type)
330                .with_logical_type(#logical_type)
331                .with_repetition(#repetition)
332        };
333
334        if let Some(converted_type) = converted_type {
335            builder = quote! { #builder.with_converted_type(#converted_type) };
336        }
337
338        if let Some(length) = length {
339            builder = quote! { #builder.with_length(#length) };
340        }
341
342        quote! {  fields.push(#builder.build().unwrap().into()) }
343    }
344
345    fn option_into_vals(&self) -> proc_macro2::TokenStream {
346        let field_name = &self.ident;
347        let is_a_byte_buf = self.is_a_byte_buf;
348        let is_a_timestamp = self.third_party_type == Some(ThirdPartyType::ChronoNaiveDateTime);
349        let is_a_date = self.third_party_type == Some(ThirdPartyType::ChronoNaiveDate);
350        let is_a_uuid = self.third_party_type == Some(ThirdPartyType::Uuid);
351        let copy_to_vec = !matches!(
352            self.ty.physical_type(),
353            parquet::basic::Type::BYTE_ARRAY | parquet::basic::Type::FIXED_LEN_BYTE_ARRAY
354        );
355
356        let binding = if copy_to_vec {
357            quote! { let Some(inner) = rec.#field_name }
358        } else {
359            quote! { let Some(inner) = &rec.#field_name }
360        };
361
362        let some = if is_a_timestamp {
363            quote! { Some(inner.timestamp_millis()) }
364        } else if is_a_date {
365            quote! { Some(inner.signed_duration_since(::chrono::NaiveDate::from_ymd(1970, 1, 1)).num_days() as i32)  }
366        } else if is_a_uuid {
367            quote! { Some((&inner.to_string()[..]).into()) }
368        } else if is_a_byte_buf {
369            quote! { Some((&inner[..]).into())}
370        } else {
371            // Type might need converting to a physical type
372            match self.ty.physical_type() {
373                parquet::basic::Type::INT32 => quote! { Some(inner as i32) },
374                parquet::basic::Type::INT64 => quote! { Some(inner as i64) },
375                _ => quote! { Some(inner) },
376            }
377        };
378
379        quote! {
380            let vals: Vec<_> = records.iter().filter_map(|rec| {
381                if #binding {
382                    #some
383                } else {
384                    None
385                }
386            }).collect();
387        }
388    }
389
390    // generates code to read `field_name` from each record into a vector `vals`
391    fn copied_direct_vals(&self) -> proc_macro2::TokenStream {
392        let field_name = &self.ident;
393
394        let access = match self.third_party_type {
395            Some(ThirdPartyType::ChronoNaiveDateTime) => {
396                quote! { rec.#field_name.timestamp_millis() }
397            }
398            Some(ThirdPartyType::ChronoNaiveDate) => {
399                quote! { rec.#field_name.signed_duration_since(::chrono::NaiveDate::from_ymd(1970, 1, 1)).num_days() as i32 }
400            }
401            Some(ThirdPartyType::Uuid) => {
402                quote! { rec.#field_name.as_bytes().to_vec().into() }
403            }
404            _ => {
405                if self.is_a_byte_buf {
406                    quote! { (&rec.#field_name[..]).into() }
407                } else {
408                    // Type might need converting to a physical type
409                    match self.ty.physical_type() {
410                        parquet::basic::Type::INT32 => quote! { rec.#field_name as i32 },
411                        parquet::basic::Type::INT64 => quote! { rec.#field_name as i64 },
412                        _ => quote! { rec.#field_name },
413                    }
414                }
415            }
416        };
417
418        quote! {
419            let vals: Vec<_> = records.iter().map(|rec| #access).collect();
420        }
421    }
422
423    // generates code to read a vector `records` into `field_name` for each record
424    fn copied_direct_fields(&self) -> proc_macro2::TokenStream {
425        let field_name = &self.ident;
426
427        let value = match self.third_party_type {
428            Some(ThirdPartyType::ChronoNaiveDateTime) => {
429                quote! { ::chrono::naive::NaiveDateTime::from_timestamp_millis(vals[i]).unwrap() }
430            }
431            Some(ThirdPartyType::ChronoNaiveDate) => {
432                // NaiveDateTime::UNIX_EPOCH.num_days_from_ce() == 719163
433                quote! {
434                    ::chrono::naive::NaiveDate::from_num_days_from_ce_opt(vals[i].saturating_add(719163)).unwrap()
435                }
436            }
437            Some(ThirdPartyType::Uuid) => {
438                quote! { ::uuid::Uuid::from_bytes(vals[i].data().try_into().unwrap()) }
439            }
440            _ => match &self.ty {
441                Type::TypePath(_) => match self.ty.last_part().as_str() {
442                    "String" => quote! { String::from(std::str::from_utf8(vals[i].data())
443                    .expect("invalid UTF-8 sequence")) },
444                    t => {
445                        let s: proc_macro2::TokenStream = t.parse().unwrap();
446                        quote! { vals[i] as #s }
447                    }
448                },
449                Type::Vec(_) => quote! { vals[i].data().to_vec() },
450                f => unimplemented!("Unsupported: {:#?}", f),
451            },
452        };
453
454        quote! {
455            for (i, r) in &mut records[..num_records].iter_mut().enumerate() {
456                r.#field_name = #value;
457            }
458        }
459    }
460
461    fn optional_definition_levels(&self) -> proc_macro2::TokenStream {
462        let field_name = &self.ident;
463
464        quote! {
465            let definition_levels: Vec<i16> = self
466              .iter()
467              .map(|rec| if rec.#field_name.is_some() { 1 } else { 0 })
468              .collect();
469        }
470    }
471}
472
473#[allow(clippy::enum_variant_names)]
474#[allow(clippy::large_enum_variant)]
475#[derive(Debug, PartialEq)]
476enum Type {
477    Array(Box<Type>, syn::Expr),
478    Option(Box<Type>),
479    Slice(Box<Type>),
480    Vec(Box<Type>),
481    TypePath(syn::Type),
482    Reference(Option<syn::Lifetime>, Box<Type>),
483}
484
485impl Type {
486    /// Takes a rust type and returns the appropriate
487    /// parquet-rs column writer
488    fn column_writer(&self) -> syn::TypePath {
489        use parquet::basic::Type as BasicType;
490
491        match self.physical_type() {
492            BasicType::BOOLEAN => {
493                syn::parse_quote!(ColumnWriter::BoolColumnWriter)
494            }
495            BasicType::INT32 => syn::parse_quote!(ColumnWriter::Int32ColumnWriter),
496            BasicType::INT64 => syn::parse_quote!(ColumnWriter::Int64ColumnWriter),
497            BasicType::INT96 => syn::parse_quote!(ColumnWriter::Int96ColumnWriter),
498            BasicType::FLOAT => syn::parse_quote!(ColumnWriter::FloatColumnWriter),
499            BasicType::DOUBLE => syn::parse_quote!(ColumnWriter::DoubleColumnWriter),
500            BasicType::BYTE_ARRAY => {
501                syn::parse_quote!(ColumnWriter::ByteArrayColumnWriter)
502            }
503            BasicType::FIXED_LEN_BYTE_ARRAY => {
504                syn::parse_quote!(ColumnWriter::FixedLenByteArrayColumnWriter)
505            }
506        }
507    }
508
509    /// Takes a rust type and returns the appropriate
510    /// parquet-rs column reader
511    fn column_reader(&self) -> syn::TypePath {
512        use parquet::basic::Type as BasicType;
513
514        match self.physical_type() {
515            BasicType::BOOLEAN => {
516                syn::parse_quote!(ColumnReader::BoolColumnReader)
517            }
518            BasicType::INT32 => syn::parse_quote!(ColumnReader::Int32ColumnReader),
519            BasicType::INT64 => syn::parse_quote!(ColumnReader::Int64ColumnReader),
520            BasicType::INT96 => syn::parse_quote!(ColumnReader::Int96ColumnReader),
521            BasicType::FLOAT => syn::parse_quote!(ColumnReader::FloatColumnReader),
522            BasicType::DOUBLE => syn::parse_quote!(ColumnReader::DoubleColumnReader),
523            BasicType::BYTE_ARRAY => {
524                syn::parse_quote!(ColumnReader::ByteArrayColumnReader)
525            }
526            BasicType::FIXED_LEN_BYTE_ARRAY => {
527                syn::parse_quote!(ColumnReader::FixedLenByteArrayColumnReader)
528            }
529        }
530    }
531
532    /// Helper to simplify a nested field definition to its leaf type
533    ///
534    /// Ex:
535    ///   `Option<&String>` => Type::TypePath(String)
536    ///   `&Option<i32>` => Type::TypePath(i32)
537    ///   `Vec<Vec<u8>>` => Type::Vec(u8)
538    ///
539    /// Useful in determining the physical type of a field and the
540    /// definition levels.
541    fn leaf_type_recursive(&self) -> &Type {
542        Type::leaf_type_recursive_helper(self, None)
543    }
544
545    fn leaf_type_recursive_helper<'a>(ty: &'a Type, parent_ty: Option<&'a Type>) -> &'a Type {
546        match ty {
547            Type::TypePath(_) => parent_ty.unwrap_or(ty),
548            Type::Option(first_type)
549            | Type::Vec(first_type)
550            | Type::Array(first_type, _)
551            | Type::Slice(first_type)
552            | Type::Reference(_, first_type) => {
553                Type::leaf_type_recursive_helper(first_type, Some(ty))
554            }
555        }
556    }
557
558    /// Helper method to further unwrap leaf_type() to get inner-most
559    /// type information, useful for determining the physical type
560    /// and normalizing the type paths.
561    fn inner_type(&self) -> &syn::Type {
562        let leaf_type = self.leaf_type_recursive();
563
564        match leaf_type {
565            Type::TypePath(type_) => type_,
566            Type::Option(first_type)
567            | Type::Vec(first_type)
568            | Type::Array(first_type, _)
569            | Type::Slice(first_type)
570            | Type::Reference(_, first_type) => match **first_type {
571                Type::TypePath(ref type_) => type_,
572                _ => unimplemented!("leaf_type() should only return shallow types"),
573            },
574        }
575    }
576
577    /// Helper to normalize a type path by extracting the
578    /// most identifiable part
579    ///
580    /// Ex:
581    ///   std::string::String => String
582    ///   `Vec<u8>` => `Vec<u8>`
583    ///   chrono::NaiveDateTime => NaiveDateTime
584    ///
585    /// Does run the risk of mis-identifying a type if import
586    /// rename is in play. Please note procedural macros always
587    /// run before type resolution so this is a risk the user
588    /// takes on when renaming imports.
589    fn last_part(&self) -> String {
590        let inner_type = self.inner_type();
591        let inner_type_str = (quote! { #inner_type }).to_string();
592
593        inner_type_str
594            .split("::")
595            .last()
596            .unwrap()
597            .trim()
598            .to_string()
599    }
600
601    /// Converts rust types to parquet physical types.
602    ///
603    /// Ex:
604    ///   [u8; 10] => FIXED_LEN_BYTE_ARRAY
605    ///   `Vec<u8>`  => BYTE_ARRAY
606    ///   String => BYTE_ARRAY
607    ///   i32 => INT32
608    fn physical_type(&self) -> parquet::basic::Type {
609        use parquet::basic::Type as BasicType;
610
611        let last_part = self.last_part();
612        let leaf_type = self.leaf_type_recursive();
613
614        match leaf_type {
615            Type::Array(first_type, _length) => {
616                if let Type::TypePath(_) = **first_type {
617                    if last_part == "u8" {
618                        return BasicType::FIXED_LEN_BYTE_ARRAY;
619                    }
620                }
621            }
622            Type::Vec(first_type) | Type::Slice(first_type) => {
623                if let Type::TypePath(_) = **first_type {
624                    if last_part == "u8" {
625                        return BasicType::BYTE_ARRAY;
626                    }
627                }
628            }
629            _ => (),
630        }
631
632        match last_part.trim() {
633            "bool" => BasicType::BOOLEAN,
634            "u8" | "u16" | "u32" => BasicType::INT32,
635            "i8" | "i16" | "i32" | "NaiveDate" => BasicType::INT32,
636            "u64" | "i64" | "NaiveDateTime" => BasicType::INT64,
637            "usize" | "isize" => {
638                if usize::BITS == 64 {
639                    BasicType::INT64
640                } else {
641                    BasicType::INT32
642                }
643            }
644            "f32" => BasicType::FLOAT,
645            "f64" => BasicType::DOUBLE,
646            "String" | "str" | "Arc < str >" => BasicType::BYTE_ARRAY,
647            "Uuid" => BasicType::FIXED_LEN_BYTE_ARRAY,
648            f => unimplemented!("{} currently is not supported", f),
649        }
650    }
651
652    fn length(&self) -> Option<syn::Expr> {
653        let last_part = self.last_part();
654        let leaf_type = self.leaf_type_recursive();
655
656        // `[u8; N]` => Some(N)
657        if let Type::Array(first_type, length) = leaf_type {
658            if let Type::TypePath(_) = **first_type {
659                if last_part == "u8" {
660                    return Some(length.clone());
661                }
662            }
663        }
664
665        match last_part.trim() {
666            // Uuid => [u8; 16] => Some(16)
667            "Uuid" => Some(syn::parse_quote!(16)),
668            _ => None,
669        }
670    }
671
672    fn logical_type(&self) -> proc_macro2::TokenStream {
673        let last_part = self.last_part();
674        let leaf_type = self.leaf_type_recursive();
675
676        match leaf_type {
677            Type::Array(first_type, _length) => {
678                if let Type::TypePath(_) = **first_type {
679                    if last_part == "u8" {
680                        return quote! { None };
681                    }
682                }
683            }
684            Type::Vec(first_type) | Type::Slice(first_type) => {
685                if let Type::TypePath(_) = **first_type {
686                    if last_part == "u8" {
687                        return quote! { None };
688                    }
689                }
690            }
691            _ => (),
692        }
693
694        match last_part.trim() {
695            "bool" => quote! { None },
696            "u8" => quote! { Some(LogicalType::integer(8, false)) },
697            "u16" => quote! { Some(LogicalType::integer(16, false)) },
698            "u32" => quote! { Some(LogicalType::integer(32, false)) },
699            "u64" => quote! { Some(LogicalType::integer(64, false)) },
700            "i8" => quote! { Some(LogicalType::integer(8, true)) },
701            "i16" => quote! { Some(LogicalType::integer(16, true)) },
702            "i32" | "i64" => quote! { None },
703            "usize" => {
704                quote! { Some(LogicalType::integer(usize::BITS as i8, false)) }
705            }
706            "isize" => {
707                quote! { Some(LogicalType::integer(usize::BITS as i8, true)) }
708            }
709            "NaiveDate" => quote! { Some(LogicalType::Date) },
710            "NaiveDateTime" => quote! { None },
711            "f32" | "f64" => quote! { None },
712            "String" | "str" | "Arc < str >" => quote! { Some(LogicalType::String) },
713            "Uuid" => quote! { Some(LogicalType::Uuid) },
714            f => unimplemented!("{} currently is not supported", f),
715        }
716    }
717
718    fn converted_type(&self) -> Option<proc_macro2::TokenStream> {
719        let last_part = self.last_part();
720
721        match last_part.trim() {
722            "NaiveDateTime" => Some(quote! { ::parquet::basic::ConvertedType::TIMESTAMP_MILLIS }),
723            _ => None,
724        }
725    }
726
727    fn repetition(&self) -> proc_macro2::TokenStream {
728        match self {
729            Type::Option(_) => quote! { ::parquet::basic::Repetition::OPTIONAL },
730            Type::Reference(_, ty) => ty.repetition(),
731            _ => quote! { ::parquet::basic::Repetition::REQUIRED },
732        }
733    }
734
735    /// Convert a parsed rust field AST in to a more easy to manipulate
736    /// parquet_derive::Field
737    fn from(f: &syn::Field) -> Self {
738        Type::from_type(f, &f.ty)
739    }
740
741    fn from_type(f: &syn::Field, ty: &syn::Type) -> Self {
742        match ty {
743            syn::Type::Path(p) => Type::from_type_path(f, p),
744            syn::Type::Reference(tr) => Type::from_type_reference(f, tr),
745            syn::Type::Array(ta) => Type::from_type_array(f, ta),
746            syn::Type::Slice(ts) => Type::from_type_slice(f, ts),
747            other => unimplemented!(
748                "Unable to derive {:?} - it is currently an unsupported type\n{:#?}",
749                f.ident.as_ref().unwrap(),
750                other
751            ),
752        }
753    }
754
755    fn from_type_path(f: &syn::Field, p: &syn::TypePath) -> Self {
756        let last_segment = p.path.segments.last().unwrap();
757
758        let is_vec = last_segment.ident == syn::Ident::new("Vec", proc_macro2::Span::call_site());
759        let is_option =
760            last_segment.ident == syn::Ident::new("Option", proc_macro2::Span::call_site());
761
762        if is_vec || is_option {
763            let generic_type = match &last_segment.arguments {
764                syn::PathArguments::AngleBracketed(angle_args) => {
765                    assert_eq!(angle_args.args.len(), 1);
766                    let first_arg = &angle_args.args[0];
767
768                    match first_arg {
769                        syn::GenericArgument::Type(typath) => typath.clone(),
770                        other => unimplemented!("Unsupported: {:#?}", other),
771                    }
772                }
773                other => unimplemented!("Unsupported: {:#?}", other),
774            };
775
776            if is_vec {
777                Type::Vec(Box::new(Type::from_type(f, &generic_type)))
778            } else {
779                Type::Option(Box::new(Type::from_type(f, &generic_type)))
780            }
781        } else {
782            Type::TypePath(syn::Type::Path(p.clone()))
783        }
784    }
785
786    fn from_type_reference(f: &syn::Field, tr: &syn::TypeReference) -> Self {
787        let lifetime = tr.lifetime.clone();
788        let inner_type = Type::from_type(f, tr.elem.as_ref());
789        Type::Reference(lifetime, Box::new(inner_type))
790    }
791
792    fn from_type_array(f: &syn::Field, ta: &syn::TypeArray) -> Self {
793        let inner_type = Type::from_type(f, ta.elem.as_ref());
794        Type::Array(Box::new(inner_type), ta.len.clone())
795    }
796
797    fn from_type_slice(f: &syn::Field, ts: &syn::TypeSlice) -> Self {
798        let inner_type = Type::from_type(f, ts.elem.as_ref());
799        Type::Slice(Box::new(inner_type))
800    }
801}
802
803#[cfg(test)]
804mod test {
805    use super::*;
806    use syn::{Data, DataStruct, DeriveInput};
807
808    fn extract_fields(input: proc_macro2::TokenStream) -> Vec<syn::Field> {
809        let input: DeriveInput = syn::parse2(input).unwrap();
810
811        let fields = match input.data {
812            Data::Struct(DataStruct { fields, .. }) => fields,
813            _ => panic!("Input must be a struct"),
814        };
815
816        fields.iter().map(|field| field.to_owned()).collect()
817    }
818
819    #[test]
820    fn test_generating_a_simple_writer_snippet() {
821        let snippet: proc_macro2::TokenStream = quote! {
822          struct ABoringStruct {
823            counter: usize,
824          }
825        };
826
827        let fields = extract_fields(snippet);
828        let counter = Field::from(&fields[0]);
829
830        let snippet = counter.writer_snippet().to_string();
831        assert_eq!(snippet,
832                   (quote!{
833                        {
834                            let vals : Vec < _ > = records . iter ( ) . map ( | rec | rec . counter as i64 ) . collect ( );
835
836                            if let ColumnWriter::Int64ColumnWriter ( typed ) = column_writer.untyped() {
837                                typed . write_batch ( & vals [ .. ] , None , None ) ?;
838                            }  else {
839                                panic!("Schema and struct disagree on type for {}" , stringify!{ counter } )
840                            }
841                        }
842                   }).to_string()
843        )
844    }
845
846    #[test]
847    fn test_generating_a_simple_reader_snippet() {
848        let snippet: proc_macro2::TokenStream = quote! {
849          struct ABoringStruct {
850            counter: usize,
851          }
852        };
853
854        let fields = extract_fields(snippet);
855        let counter = Field::from(&fields[0]);
856
857        let snippet = counter.reader_snippet().to_string();
858        assert_eq!(
859            snippet,
860            (quote! {
861                 {
862                    let mut vals = Vec::new();
863                    if let ColumnReader::Int64ColumnReader(mut typed) = column_reader {
864                        let mut definition_levels = Vec::new();
865                        let (total_num, valid_num, decoded_num) = typed.read_records(
866                            num_records, Some(&mut definition_levels), None, &mut vals)?;
867                        if valid_num != decoded_num {
868                            panic!("Support only valid records, found {} null records in column type {}",
869                                decoded_num - valid_num, stringify!{counter});
870                        }
871                    } else {
872                        panic!("Schema and struct disagree on type for {}", stringify!{counter});
873                    }
874                    for (i, r) in &mut records[..num_records].iter_mut().enumerate() {
875                        r.counter = vals[i] as usize;
876                    }
877                 }
878            })
879            .to_string()
880        )
881    }
882
883    #[test]
884    fn test_optional_to_writer_snippet() {
885        let struct_def: proc_macro2::TokenStream = quote! {
886          struct StringBorrower<'a> {
887            optional_str: Option<&'a str>,
888            optional_string: Option<&String>,
889            optional_dumb_int: Option<&i32>,
890          }
891        };
892
893        let fields = extract_fields(struct_def);
894
895        let optional = Field::from(&fields[0]);
896        let snippet = optional.writer_snippet();
897        assert_eq!(snippet.to_string(),
898          (quote! {
899          {
900                let definition_levels : Vec < i16 > = self . iter ( ) . map ( | rec | if rec . optional_str . is_some ( ) { 1 } else { 0 } ) . collect ( ) ;
901
902                let vals: Vec <_> = records.iter().filter_map( |rec| {
903                    if let Some ( inner ) = &rec . optional_str {
904                        Some ( (&inner[..]).into() )
905                    } else {
906                        None
907                    }
908                }).collect();
909
910                if let ColumnWriter::ByteArrayColumnWriter ( typed ) = column_writer.untyped() {
911                    typed . write_batch ( & vals [ .. ] , Some(&definition_levels[..]) , None ) ? ;
912                } else {
913                    panic!("Schema and struct disagree on type for {}" , stringify ! { optional_str } )
914                }
915           }
916            }
917          ).to_string());
918
919        let optional = Field::from(&fields[1]);
920        let snippet = optional.writer_snippet();
921        assert_eq!(snippet.to_string(),
922                   (quote!{
923                   {
924                        let definition_levels : Vec < i16 > = self . iter ( ) . map ( | rec | if rec . optional_string . is_some ( ) { 1 } else { 0 } ) . collect ( ) ;
925
926                        let vals: Vec <_> = records.iter().filter_map( |rec| {
927                            if let Some ( inner ) = &rec . optional_string {
928                                Some ( (&inner[..]).into() )
929                            } else {
930                                None
931                            }
932                        }).collect();
933
934                        if let ColumnWriter::ByteArrayColumnWriter ( typed ) = column_writer.untyped() {
935                            typed . write_batch ( & vals [ .. ] , Some(&definition_levels[..]) , None ) ? ;
936                        } else {
937                            panic!("Schema and struct disagree on type for {}" , stringify ! { optional_string } )
938                        }
939                    }
940        }).to_string());
941
942        let optional = Field::from(&fields[2]);
943        let snippet = optional.writer_snippet();
944        assert_eq!(snippet.to_string(),
945                   (quote!{
946                    {
947                        let definition_levels : Vec < i16 > = self . iter ( ) . map ( | rec | if rec . optional_dumb_int . is_some ( ) { 1 } else { 0 } ) . collect ( ) ;
948
949                        let vals: Vec <_> = records.iter().filter_map( |rec| {
950                            if let Some ( inner ) = rec . optional_dumb_int {
951                                Some ( inner as i32 )
952                            } else {
953                                None
954                            }
955                        }).collect();
956
957                        if let ColumnWriter::Int32ColumnWriter ( typed ) = column_writer.untyped() {
958                            typed . write_batch ( & vals [ .. ] , Some(&definition_levels[..]) , None ) ? ;
959                        }  else {
960                            panic!("Schema and struct disagree on type for {}" , stringify ! { optional_dumb_int } )
961                        }
962                    }
963        }).to_string());
964    }
965
966    #[test]
967    fn test_converting_to_column_writer_type() {
968        let snippet: proc_macro2::TokenStream = quote! {
969          struct ABasicStruct {
970            yes_no: bool,
971            name: String,
972          }
973        };
974
975        let fields = extract_fields(snippet);
976        let processed: Vec<_> = fields.iter().map(Field::from).collect();
977
978        let column_writers: Vec<_> = processed
979            .iter()
980            .map(|field| field.ty.column_writer())
981            .collect();
982
983        assert_eq!(
984            column_writers,
985            vec![
986                syn::parse_quote!(ColumnWriter::BoolColumnWriter),
987                syn::parse_quote!(ColumnWriter::ByteArrayColumnWriter)
988            ]
989        );
990    }
991
992    #[test]
993    fn test_converting_to_column_reader_type() {
994        let snippet: proc_macro2::TokenStream = quote! {
995          struct ABasicStruct {
996            yes_no: bool,
997            name: String,
998          }
999        };
1000
1001        let fields = extract_fields(snippet);
1002        let processed: Vec<_> = fields.iter().map(Field::from).collect();
1003
1004        let column_readers: Vec<_> = processed
1005            .iter()
1006            .map(|field| field.ty.column_reader())
1007            .collect();
1008
1009        assert_eq!(
1010            column_readers,
1011            vec![
1012                syn::parse_quote!(ColumnReader::BoolColumnReader),
1013                syn::parse_quote!(ColumnReader::ByteArrayColumnReader)
1014            ]
1015        );
1016    }
1017
1018    #[test]
1019    fn convert_basic_struct() {
1020        let snippet: proc_macro2::TokenStream = quote! {
1021          struct ABasicStruct {
1022            yes_no: bool,
1023            name: String,
1024            length: usize
1025          }
1026        };
1027
1028        let fields = extract_fields(snippet);
1029        let processed: Vec<_> = fields.iter().map(Field::from).collect();
1030        assert_eq!(processed.len(), 3);
1031
1032        assert_eq!(
1033            processed,
1034            vec![
1035                Field {
1036                    ident: syn::Ident::new("yes_no", proc_macro2::Span::call_site()),
1037                    ty: Type::TypePath(syn::parse_quote!(bool)),
1038                    is_a_byte_buf: false,
1039                    third_party_type: None,
1040                },
1041                Field {
1042                    ident: syn::Ident::new("name", proc_macro2::Span::call_site()),
1043                    ty: Type::TypePath(syn::parse_quote!(String)),
1044                    is_a_byte_buf: true,
1045                    third_party_type: None,
1046                },
1047                Field {
1048                    ident: syn::Ident::new("length", proc_macro2::Span::call_site()),
1049                    ty: Type::TypePath(syn::parse_quote!(usize)),
1050                    is_a_byte_buf: false,
1051                    third_party_type: None,
1052                }
1053            ]
1054        )
1055    }
1056
1057    #[test]
1058    fn test_get_inner_type() {
1059        let snippet: proc_macro2::TokenStream = quote! {
1060          struct LotsOfInnerTypes {
1061            a_vec: Vec<u8>,
1062            a_option: ::std::option::Option<bool>,
1063            a_silly_string: ::std::string::String,
1064            a_complicated_thing: ::std::option::Option<::std::result::Result<(),()>>,
1065          }
1066        };
1067
1068        let fields = extract_fields(snippet);
1069        let converted_fields: Vec<_> = fields.iter().map(Type::from).collect();
1070        let inner_types: Vec<_> = converted_fields
1071            .iter()
1072            .map(|field| field.inner_type())
1073            .collect();
1074        let inner_types_strs: Vec<_> = inner_types
1075            .iter()
1076            .map(|ty| (quote! { #ty }).to_string())
1077            .collect();
1078
1079        assert_eq!(
1080            inner_types_strs,
1081            vec![
1082                "u8",
1083                "bool",
1084                ":: std :: string :: String",
1085                ":: std :: result :: Result < () , () >"
1086            ]
1087        )
1088    }
1089
1090    #[test]
1091    fn test_physical_type() {
1092        use parquet::basic::Type as BasicType;
1093        let snippet: proc_macro2::TokenStream = quote! {
1094          struct LotsOfInnerTypes {
1095            a_buf: ::std::vec::Vec<u8>,
1096            a_number: i32,
1097            a_verbose_option: ::std::option::Option<bool>,
1098            a_silly_string: String,
1099            a_fix_byte_buf: [u8; 10],
1100            a_complex_option: ::std::option::Option<&Vec<u8>>,
1101            a_complex_vec: &::std::vec::Vec<&Option<u8>>,
1102            a_uuid: ::uuid::Uuid,
1103          }
1104        };
1105
1106        let fields = extract_fields(snippet);
1107        let converted_fields: Vec<_> = fields.iter().map(Type::from).collect();
1108        let physical_types: Vec<_> = converted_fields
1109            .iter()
1110            .map(|ty| ty.physical_type())
1111            .collect();
1112
1113        assert_eq!(
1114            physical_types,
1115            vec![
1116                BasicType::BYTE_ARRAY,
1117                BasicType::INT32,
1118                BasicType::BOOLEAN,
1119                BasicType::BYTE_ARRAY,
1120                BasicType::FIXED_LEN_BYTE_ARRAY,
1121                BasicType::BYTE_ARRAY,
1122                BasicType::INT32,
1123                BasicType::FIXED_LEN_BYTE_ARRAY,
1124            ]
1125        )
1126    }
1127
1128    #[test]
1129    fn test_type_length() {
1130        let snippet: proc_macro2::TokenStream = quote! {
1131          struct LotsOfInnerTypes {
1132            a_buf: ::std::vec::Vec<u8>,
1133            a_number: i32,
1134            a_verbose_option: ::std::option::Option<bool>,
1135            a_silly_string: String,
1136            a_fix_byte_buf: [u8; 10],
1137            a_complex_option: ::std::option::Option<&Vec<u8>>,
1138            a_complex_vec: &::std::vec::Vec<&Option<u8>>,
1139            a_uuid: ::uuid::Uuid,
1140          }
1141        };
1142
1143        let fields = extract_fields(snippet);
1144        let converted_fields: Vec<_> = fields.iter().map(Type::from).collect();
1145        let lengths: Vec<_> = converted_fields.iter().map(|ty| ty.length()).collect();
1146
1147        assert_eq!(
1148            lengths,
1149            vec![
1150                None,
1151                None,
1152                None,
1153                None,
1154                Some(syn::parse_quote!(10)),
1155                None,
1156                None,
1157                Some(syn::parse_quote!(16)),
1158            ]
1159        )
1160    }
1161
1162    #[test]
1163    fn test_convert_comprehensive_owned_struct() {
1164        let snippet: proc_macro2::TokenStream = quote! {
1165          struct VecHolder {
1166            a_vec: ::std::vec::Vec<u8>,
1167            a_option: ::std::option::Option<bool>,
1168            a_silly_string: ::std::string::String,
1169            a_complicated_thing: ::std::option::Option<::std::result::Result<(),()>>,
1170          }
1171        };
1172
1173        let fields = extract_fields(snippet);
1174        let converted_fields: Vec<_> = fields.iter().map(Type::from).collect();
1175
1176        assert_eq!(
1177            converted_fields,
1178            vec![
1179                Type::Vec(Box::new(Type::TypePath(syn::parse_quote!(u8)))),
1180                Type::Option(Box::new(Type::TypePath(syn::parse_quote!(bool)))),
1181                Type::TypePath(syn::parse_quote!(::std::string::String)),
1182                Type::Option(Box::new(Type::TypePath(
1183                    syn::parse_quote!(::std::result::Result<(),()>)
1184                ))),
1185            ]
1186        );
1187    }
1188
1189    #[test]
1190    fn test_convert_borrowed_struct() {
1191        let snippet: proc_macro2::TokenStream = quote! {
1192          struct Borrower<'a> {
1193            a_str: &'a str,
1194            a_borrowed_option: &'a Option<bool>,
1195            so_many_borrows: &'a Option<&'a str>,
1196          }
1197        };
1198
1199        let fields = extract_fields(snippet);
1200        let types: Vec<_> = fields.iter().map(Type::from).collect();
1201
1202        assert_eq!(
1203            types,
1204            vec![
1205                Type::Reference(
1206                    Some(syn::Lifetime::new("'a", proc_macro2::Span::call_site())),
1207                    Box::new(Type::TypePath(syn::parse_quote!(str)))
1208                ),
1209                Type::Reference(
1210                    Some(syn::Lifetime::new("'a", proc_macro2::Span::call_site())),
1211                    Box::new(Type::Option(Box::new(Type::TypePath(syn::parse_quote!(
1212                        bool
1213                    )))))
1214                ),
1215                Type::Reference(
1216                    Some(syn::Lifetime::new("'a", proc_macro2::Span::call_site())),
1217                    Box::new(Type::Option(Box::new(Type::Reference(
1218                        Some(syn::Lifetime::new("'a", proc_macro2::Span::call_site())),
1219                        Box::new(Type::TypePath(syn::parse_quote!(str)))
1220                    ))))
1221                ),
1222            ]
1223        );
1224    }
1225
1226    #[test]
1227    fn test_chrono_timestamp_millis_write() {
1228        let snippet: proc_macro2::TokenStream = quote! {
1229          struct ATimestampStruct {
1230            henceforth: chrono::NaiveDateTime,
1231            maybe_happened: Option<&chrono::NaiveDateTime>,
1232          }
1233        };
1234
1235        let fields = extract_fields(snippet);
1236        let when = Field::from(&fields[0]);
1237        assert_eq!(when.writer_snippet().to_string(),(quote!{
1238            {
1239                let vals : Vec<_> = records.iter().map(|rec| rec.henceforth.timestamp_millis() ).collect();
1240                if let ColumnWriter::Int64ColumnWriter(typed) = column_writer.untyped() {
1241                    typed.write_batch(&vals[..], None, None) ?;
1242                } else {
1243                    panic!("Schema and struct disagree on type for {}" , stringify!{ henceforth })
1244                }
1245            }
1246        }).to_string());
1247
1248        let maybe_happened = Field::from(&fields[1]);
1249        assert_eq!(maybe_happened.writer_snippet().to_string(),(quote!{
1250            {
1251                let definition_levels : Vec<i16> = self.iter().map(|rec| if rec.maybe_happened.is_some() { 1 } else { 0 }).collect();
1252                let vals : Vec<_> = records.iter().filter_map(|rec| {
1253                    if let Some(inner) = rec.maybe_happened {
1254                        Some(inner.timestamp_millis())
1255                    } else {
1256                        None
1257                    }
1258                }).collect();
1259
1260                if let ColumnWriter::Int64ColumnWriter(typed) = column_writer.untyped() {
1261                    typed.write_batch(&vals[..], Some(&definition_levels[..]), None) ?;
1262                } else {
1263                    panic!("Schema and struct disagree on type for {}" , stringify!{ maybe_happened })
1264                }
1265            }
1266        }).to_string());
1267    }
1268
1269    #[test]
1270    fn test_chrono_timestamp_millis_read() {
1271        let snippet: proc_macro2::TokenStream = quote! {
1272          struct ATimestampStruct {
1273            henceforth: chrono::NaiveDateTime,
1274          }
1275        };
1276
1277        let fields = extract_fields(snippet);
1278        let when = Field::from(&fields[0]);
1279        assert_eq!(when.reader_snippet().to_string(),(quote!{
1280            {
1281                let mut vals = Vec::new();
1282                if let ColumnReader::Int64ColumnReader(mut typed) = column_reader {
1283                    let mut definition_levels = Vec::new();
1284                    let (total_num, valid_num, decoded_num) = typed.read_records(
1285                        num_records, Some(&mut definition_levels), None, &mut vals)?;
1286                    if valid_num != decoded_num {
1287                        panic!("Support only valid records, found {} null records in column type {}",
1288                            decoded_num - valid_num, stringify!{henceforth});
1289                    }
1290                } else {
1291                    panic!("Schema and struct disagree on type for {}", stringify!{ henceforth });
1292                }
1293                for (i, r) in &mut records[..num_records].iter_mut().enumerate() {
1294                    r.henceforth = ::chrono::naive::NaiveDateTime::from_timestamp_millis(vals[i]).unwrap();
1295                }
1296            }
1297        }).to_string());
1298    }
1299
1300    #[test]
1301    fn test_chrono_date_write() {
1302        let snippet: proc_macro2::TokenStream = quote! {
1303          struct ATimestampStruct {
1304            henceforth: chrono::NaiveDate,
1305            maybe_happened: Option<&chrono::NaiveDate>,
1306          }
1307        };
1308
1309        let fields = extract_fields(snippet);
1310        let when = Field::from(&fields[0]);
1311        assert_eq!(when.writer_snippet().to_string(),(quote!{
1312            {
1313                let vals : Vec<_> = records.iter().map(|rec| rec.henceforth.signed_duration_since(::chrono::NaiveDate::from_ymd(1970, 1, 1)).num_days() as i32).collect();
1314                if let ColumnWriter::Int32ColumnWriter(typed) = column_writer.untyped() {
1315                    typed.write_batch(&vals[..], None, None) ?;
1316                } else {
1317                    panic!("Schema and struct disagree on type for {}" , stringify!{ henceforth })
1318                }
1319            }
1320        }).to_string());
1321
1322        let maybe_happened = Field::from(&fields[1]);
1323        assert_eq!(maybe_happened.writer_snippet().to_string(),(quote!{
1324            {
1325                let definition_levels : Vec<i16> = self.iter().map(|rec| if rec.maybe_happened.is_some() { 1 } else { 0 }).collect();
1326                let vals : Vec<_> = records.iter().filter_map(|rec| {
1327                    if let Some(inner) = rec.maybe_happened {
1328                        Some(inner.signed_duration_since(::chrono::NaiveDate::from_ymd(1970, 1, 1)).num_days() as i32)
1329                    } else {
1330                        None
1331                    }
1332                }).collect();
1333
1334                if let ColumnWriter::Int32ColumnWriter(typed) = column_writer.untyped() {
1335                    typed.write_batch(&vals[..], Some(&definition_levels[..]), None) ?;
1336                } else {
1337                    panic!("Schema and struct disagree on type for {}" , stringify!{ maybe_happened })
1338                }
1339            }
1340        }).to_string());
1341    }
1342
1343    #[test]
1344    fn test_chrono_date_read() {
1345        let snippet: proc_macro2::TokenStream = quote! {
1346          struct ATimestampStruct {
1347            henceforth: chrono::NaiveDate,
1348          }
1349        };
1350
1351        let fields = extract_fields(snippet);
1352        let when = Field::from(&fields[0]);
1353        assert_eq!(when.reader_snippet().to_string(),(quote!{
1354            {
1355                let mut vals = Vec::new();
1356                if let ColumnReader::Int32ColumnReader(mut typed) = column_reader {
1357                    let mut definition_levels = Vec::new();
1358                    let (total_num, valid_num, decoded_num) = typed.read_records(
1359                        num_records, Some(&mut definition_levels), None, &mut vals)?;
1360                    if valid_num != decoded_num {
1361                        panic!("Support only valid records, found {} null records in column type {}",
1362                            decoded_num - valid_num, stringify!{henceforth});
1363                    }
1364                } else {
1365                    panic!("Schema and struct disagree on type for {}", stringify!{ henceforth });
1366                }
1367                for (i, r) in &mut records[..num_records].iter_mut().enumerate() {
1368                    r.henceforth = ::chrono::naive::NaiveDate::from_num_days_from_ce_opt(vals[i].saturating_add(719163)).unwrap();
1369                }
1370            }
1371        }).to_string());
1372    }
1373
1374    #[test]
1375    fn test_uuid_write() {
1376        let snippet: proc_macro2::TokenStream = quote! {
1377          struct AUuidStruct {
1378            unique_id: uuid::Uuid,
1379            maybe_unique_id: Option<&uuid::Uuid>,
1380          }
1381        };
1382
1383        let fields = extract_fields(snippet);
1384        let when = Field::from(&fields[0]);
1385        assert_eq!(when.writer_snippet().to_string(),(quote!{
1386            {
1387                let vals : Vec<_> = records.iter().map(|rec| rec.unique_id.as_bytes().to_vec().into() ).collect();
1388                if let ColumnWriter::FixedLenByteArrayColumnWriter(typed) = column_writer.untyped() {
1389                    typed.write_batch(&vals[..], None, None) ?;
1390                } else {
1391                    panic!("Schema and struct disagree on type for {}" , stringify!{ unique_id })
1392                }
1393            }
1394        }).to_string());
1395
1396        let maybe_happened = Field::from(&fields[1]);
1397        assert_eq!(maybe_happened.writer_snippet().to_string(),(quote!{
1398            {
1399                let definition_levels : Vec<i16> = self.iter().map(|rec| if rec.maybe_unique_id.is_some() { 1 } else { 0 }).collect();
1400                let vals : Vec<_> = records.iter().filter_map(|rec| {
1401                    if let Some(inner) = &rec.maybe_unique_id {
1402                        Some((&inner.to_string()[..]).into())
1403                    } else {
1404                        None
1405                    }
1406                }).collect();
1407
1408                if let ColumnWriter::FixedLenByteArrayColumnWriter(typed) = column_writer.untyped() {
1409                    typed.write_batch(&vals[..], Some(&definition_levels[..]), None) ?;
1410                } else {
1411                    panic!("Schema and struct disagree on type for {}" , stringify!{ maybe_unique_id })
1412                }
1413            }
1414        }).to_string());
1415    }
1416
1417    #[test]
1418    fn test_uuid_read() {
1419        let snippet: proc_macro2::TokenStream = quote! {
1420          struct AUuidStruct {
1421            unique_id: uuid::Uuid,
1422          }
1423        };
1424
1425        let fields = extract_fields(snippet);
1426        let when = Field::from(&fields[0]);
1427        assert_eq!(when.reader_snippet().to_string(),(quote!{
1428            {
1429                let mut vals = Vec::new();
1430                if let ColumnReader::FixedLenByteArrayColumnReader(mut typed) = column_reader {
1431                    let mut definition_levels = Vec::new();
1432                    let (total_num, valid_num, decoded_num) = typed.read_records(
1433                        num_records, Some(&mut definition_levels), None, &mut vals)?;
1434                    if valid_num != decoded_num {
1435                        panic!("Support only valid records, found {} null records in column type {}",
1436                            decoded_num - valid_num, stringify!{unique_id});
1437                    }
1438                } else {
1439                    panic!("Schema and struct disagree on type for {}", stringify!{ unique_id });
1440                }
1441                for (i, r) in &mut records[..num_records].iter_mut().enumerate() {
1442                    r.unique_id = ::uuid::Uuid::from_bytes(vals[i].data().try_into().unwrap());
1443                }
1444            }
1445        }).to_string());
1446    }
1447
1448    #[test]
1449    fn test_converted_type() {
1450        let snippet: proc_macro2::TokenStream = quote! {
1451          struct ATimeStruct {
1452            time: chrono::NaiveDateTime,
1453          }
1454        };
1455
1456        let fields = extract_fields(snippet);
1457
1458        let time = Field::from(&fields[0]);
1459
1460        let converted_type = time.ty.converted_type();
1461        assert_eq!(
1462            converted_type.unwrap().to_string(),
1463            quote! { ::parquet::basic::ConvertedType::TIMESTAMP_MILLIS }.to_string()
1464        );
1465    }
1466}