parquet_derive/
parquet_field.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18#[derive(Debug, PartialEq)]
19pub struct Field {
20    ident: syn::Ident,
21    ty: Type,
22    is_a_byte_buf: bool,
23    third_party_type: Option<ThirdPartyType>,
24}
25
26/// Use third party libraries, detected
27/// at compile time. These libraries will
28/// be written to parquet as their preferred
29/// physical type.
30///
31///   ChronoNaiveDateTime is written as i64
32///   ChronoNaiveDate is written as i32
33#[derive(Debug, PartialEq)]
34enum ThirdPartyType {
35    ChronoNaiveDateTime,
36    ChronoNaiveDate,
37    Uuid,
38}
39
40impl Field {
41    pub fn from(f: &syn::Field) -> Self {
42        let ty = Type::from(f);
43        let is_a_byte_buf = ty.physical_type() == parquet::basic::Type::BYTE_ARRAY;
44
45        let third_party_type = match &ty.last_part()[..] {
46            "NaiveDateTime" => Some(ThirdPartyType::ChronoNaiveDateTime),
47            "NaiveDate" => Some(ThirdPartyType::ChronoNaiveDate),
48            "Uuid" => Some(ThirdPartyType::Uuid),
49            _ => None,
50        };
51
52        Field {
53            ident: f
54                .ident
55                .clone()
56                .expect("Only structs with named fields are currently supported"),
57            ty,
58            is_a_byte_buf,
59            third_party_type,
60        }
61    }
62
63    /// Takes the parsed field of the struct and emits a valid
64    /// column writer snippet. Should match exactly what you
65    /// would write by hand.
66    ///
67    /// Can only generate writers for basic structs, for example:
68    ///
69    /// struct Record {
70    ///   a_bool: bool,
71    ///   maybe_a_bool: `Option<bool>`
72    /// }
73    ///
74    /// but not
75    ///
76    /// struct UnsupportedNestedRecord {
77    ///   a_property: bool,
78    ///   nested_record: Record
79    /// }
80    ///
81    /// because this parsing logic is not sophisticated enough for definition
82    /// levels beyond 2.
83    pub fn writer_snippet(&self) -> proc_macro2::TokenStream {
84        let ident = &self.ident;
85        let column_writer = self.ty.column_writer();
86
87        let vals_builder = match &self.ty {
88            Type::TypePath(_) => self.copied_direct_vals(),
89            Type::Option(ref first_type) => match **first_type {
90                Type::TypePath(_) => self.option_into_vals(),
91                Type::Reference(_, ref second_type) => match **second_type {
92                    Type::TypePath(_) => self.option_into_vals(),
93                    _ => unimplemented!("Unsupported type encountered"),
94                },
95                Type::Vec(ref first_type) => match **first_type {
96                    Type::TypePath(_) => self.option_into_vals(),
97                    _ => unimplemented!("Unsupported type encountered"),
98                },
99                ref f => unimplemented!("Unsupported: {:#?}", f),
100            },
101            Type::Reference(_, ref first_type) => match **first_type {
102                Type::TypePath(_) => self.copied_direct_vals(),
103                Type::Option(ref second_type) => match **second_type {
104                    Type::TypePath(_) => self.option_into_vals(),
105                    Type::Reference(_, ref second_type) => match **second_type {
106                        Type::TypePath(_) => self.option_into_vals(),
107                        Type::Slice(ref second_type) => match **second_type {
108                            Type::TypePath(_) => self.option_into_vals(),
109                            ref f => unimplemented!("Unsupported: {:#?}", f),
110                        },
111                        _ => unimplemented!("Unsupported type encountered"),
112                    },
113                    Type::Vec(ref first_type) => match **first_type {
114                        Type::TypePath(_) => self.option_into_vals(),
115                        _ => unimplemented!("Unsupported type encountered"),
116                    },
117                    ref f => unimplemented!("Unsupported: {:#?}", f),
118                },
119                Type::Slice(ref second_type) => match **second_type {
120                    Type::TypePath(_) => self.copied_direct_vals(),
121                    ref f => unimplemented!("Unsupported: {:#?}", f),
122                },
123                ref f => unimplemented!("Unsupported: {:#?}", f),
124            },
125            Type::Vec(ref first_type) => match **first_type {
126                Type::TypePath(_) => self.copied_direct_vals(),
127                ref f => unimplemented!("Unsupported: {:#?}", f),
128            },
129            f => unimplemented!("Unsupported: {:#?}", f),
130        };
131
132        let definition_levels = match &self.ty {
133            Type::TypePath(_) => None,
134            Type::Option(ref first_type) => match **first_type {
135                Type::TypePath(_) => Some(self.optional_definition_levels()),
136                Type::Option(_) => unimplemented!("Unsupported nesting encountered"),
137                Type::Reference(_, ref second_type)
138                | Type::Vec(ref second_type)
139                | Type::Array(ref second_type, _)
140                | Type::Slice(ref second_type) => match **second_type {
141                    Type::TypePath(_) => Some(self.optional_definition_levels()),
142                    _ => unimplemented!("Unsupported nesting encountered"),
143                },
144            },
145            Type::Reference(_, ref first_type)
146            | Type::Vec(ref first_type)
147            | Type::Array(ref first_type, _)
148            | Type::Slice(ref first_type) => match **first_type {
149                Type::TypePath(_) => None,
150                Type::Vec(ref second_type)
151                | Type::Array(ref second_type, _)
152                | Type::Slice(ref second_type) => match **second_type {
153                    Type::TypePath(_) => None,
154                    Type::Reference(_, ref third_type) => match **third_type {
155                        Type::TypePath(_) => None,
156                        _ => unimplemented!("Unsupported definition encountered"),
157                    },
158                    _ => unimplemented!("Unsupported definition encountered"),
159                },
160                Type::Reference(_, ref second_type) | Type::Option(ref second_type) => {
161                    match **second_type {
162                        Type::TypePath(_) => Some(self.optional_definition_levels()),
163                        Type::Vec(ref third_type)
164                        | Type::Array(ref third_type, _)
165                        | Type::Slice(ref third_type) => match **third_type {
166                            Type::TypePath(_) => Some(self.optional_definition_levels()),
167                            Type::Reference(_, ref fourth_type) => match **fourth_type {
168                                Type::TypePath(_) => Some(self.optional_definition_levels()),
169                                _ => unimplemented!("Unsupported definition encountered"),
170                            },
171                            _ => unimplemented!("Unsupported definition encountered"),
172                        },
173                        Type::Reference(_, ref third_type) => match **third_type {
174                            Type::TypePath(_) => Some(self.optional_definition_levels()),
175                            Type::Slice(ref fourth_type) => match **fourth_type {
176                                Type::TypePath(_) => Some(self.optional_definition_levels()),
177                                _ => unimplemented!("Unsupported definition encountered"),
178                            },
179                            _ => unimplemented!("Unsupported definition encountered"),
180                        },
181                        _ => unimplemented!("Unsupported definition encountered"),
182                    }
183                }
184            },
185        };
186
187        // "vals" is the run of primitive data being written for the column
188        // "definition_levels" is a vector of bools which controls whether a value is missing or present
189        // this TokenStream is only one part of the code for writing a column and
190        // it relies on values calculated in prior code snippets, namely "definition_levels" and "vals_builder".
191        // All the context is put together in this functions final quote and
192        // this expression just switches between non-nullable and nullable write statements
193        let write_batch_expr = if definition_levels.is_some() {
194            quote! {
195                if let #column_writer(ref mut typed) = column_writer.untyped() {
196                    typed.write_batch(&vals[..], Some(&definition_levels[..]), None)?;
197                } else {
198                    panic!("Schema and struct disagree on type for {}", stringify!{#ident})
199                }
200            }
201        } else {
202            quote! {
203                if let #column_writer(ref mut typed) = column_writer.untyped() {
204                    typed.write_batch(&vals[..], None, None)?;
205                } else {
206                    panic!("Schema and struct disagree on type for {}", stringify!{#ident})
207                }
208            }
209        };
210
211        quote! {
212            {
213                #definition_levels
214
215                #vals_builder
216
217                #write_batch_expr
218            }
219        }
220    }
221
222    /// Takes the parsed field of the struct and emits a valid
223    /// column reader snippet. Should match exactly what you
224    /// would write by hand.
225    ///
226    /// Can only generate writers for basic structs, for example:
227    ///
228    /// struct Record {
229    ///   a_bool: bool
230    /// }
231    ///
232    /// but not
233    ///
234    /// struct UnsupportedNestedRecord {
235    ///   a_property: bool,
236    ///   nested_record: Record
237    /// }
238    ///
239    /// because this parsing logic is not sophisticated enough for definition
240    /// levels beyond 2.
241    ///
242    /// `Option` types and references not supported, but the column itself can be nullable
243    /// (i.e., def_level==1), as long as the values are all valid.
244    pub fn reader_snippet(&self) -> proc_macro2::TokenStream {
245        let ident = &self.ident;
246        let column_reader = self.ty.column_reader();
247
248        // generate the code to read the column into a vector `vals`
249        let write_batch_expr = quote! {
250            let mut vals = Vec::new();
251            if let #column_reader(mut typed) = column_reader {
252                let mut definition_levels = Vec::new();
253                let (total_num, valid_num, decoded_num) = typed.read_records(
254                    num_records, Some(&mut definition_levels), None, &mut vals)?;
255                if valid_num != decoded_num {
256                    panic!("Support only valid records, found {} null records in column type {}",
257                        decoded_num - valid_num, stringify!{#ident});
258                }
259            } else {
260                panic!("Schema and struct disagree on type for {}", stringify!{#ident});
261            }
262        };
263
264        // generate the code to convert each element of `vals` to the correct type and then write
265        // it to its field in the corresponding struct
266        let vals_writer = match &self.ty {
267            Type::TypePath(_) => self.copied_direct_fields(),
268            Type::Reference(_, ref first_type) => match **first_type {
269                Type::TypePath(_) => self.copied_direct_fields(),
270                Type::Slice(ref second_type) => match **second_type {
271                    Type::TypePath(_) => self.copied_direct_fields(),
272                    ref f => unimplemented!("Unsupported: {:#?}", f),
273                },
274                ref f => unimplemented!("Unsupported: {:#?}", f),
275            },
276            Type::Vec(ref first_type) => match **first_type {
277                Type::TypePath(_) => self.copied_direct_fields(),
278                ref f => unimplemented!("Unsupported: {:#?}", f),
279            },
280            f => unimplemented!("Unsupported: {:#?}", f),
281        };
282
283        quote! {
284            {
285                #write_batch_expr
286
287                #vals_writer
288            }
289        }
290    }
291
292    pub fn parquet_type(&self) -> proc_macro2::TokenStream {
293        // TODO: Support group types
294        // TODO: Add length if dealing with fixedlenbinary
295
296        let field_name = &self.ident.to_string();
297        let physical_type = match self.ty.physical_type() {
298            parquet::basic::Type::BOOLEAN => quote! {
299                ::parquet::basic::Type::BOOLEAN
300            },
301            parquet::basic::Type::INT32 => quote! {
302                ::parquet::basic::Type::INT32
303            },
304            parquet::basic::Type::INT64 => quote! {
305                ::parquet::basic::Type::INT64
306            },
307            parquet::basic::Type::INT96 => quote! {
308                ::parquet::basic::Type::INT96
309            },
310            parquet::basic::Type::FLOAT => quote! {
311                ::parquet::basic::Type::FLOAT
312            },
313            parquet::basic::Type::DOUBLE => quote! {
314                ::parquet::basic::Type::DOUBLE
315            },
316            parquet::basic::Type::BYTE_ARRAY => quote! {
317                ::parquet::basic::Type::BYTE_ARRAY
318            },
319            parquet::basic::Type::FIXED_LEN_BYTE_ARRAY => quote! {
320                ::parquet::basic::Type::FIXED_LEN_BYTE_ARRAY
321            },
322        };
323        let logical_type = self.ty.logical_type();
324        let repetition = self.ty.repetition();
325        let converted_type = self.ty.converted_type();
326        let length = self.ty.length();
327
328        let mut builder = quote! {
329            ParquetType::primitive_type_builder(#field_name, #physical_type)
330                .with_logical_type(#logical_type)
331                .with_repetition(#repetition)
332        };
333
334        if let Some(converted_type) = converted_type {
335            builder = quote! { #builder.with_converted_type(#converted_type) };
336        }
337
338        if let Some(length) = length {
339            builder = quote! { #builder.with_length(#length) };
340        }
341
342        quote! {  fields.push(#builder.build().unwrap().into()) }
343    }
344
345    fn option_into_vals(&self) -> proc_macro2::TokenStream {
346        let field_name = &self.ident;
347        let is_a_byte_buf = self.is_a_byte_buf;
348        let is_a_timestamp = self.third_party_type == Some(ThirdPartyType::ChronoNaiveDateTime);
349        let is_a_date = self.third_party_type == Some(ThirdPartyType::ChronoNaiveDate);
350        let is_a_uuid = self.third_party_type == Some(ThirdPartyType::Uuid);
351        let copy_to_vec = !matches!(
352            self.ty.physical_type(),
353            parquet::basic::Type::BYTE_ARRAY | parquet::basic::Type::FIXED_LEN_BYTE_ARRAY
354        );
355
356        let binding = if copy_to_vec {
357            quote! { let Some(inner) = rec.#field_name }
358        } else {
359            quote! { let Some(ref inner) = rec.#field_name }
360        };
361
362        let some = if is_a_timestamp {
363            quote! { Some(inner.timestamp_millis()) }
364        } else if is_a_date {
365            quote! { Some(inner.signed_duration_since(::chrono::NaiveDate::from_ymd(1970, 1, 1)).num_days() as i32)  }
366        } else if is_a_uuid {
367            quote! { Some((&inner.to_string()[..]).into()) }
368        } else if is_a_byte_buf {
369            quote! { Some((&inner[..]).into())}
370        } else {
371            // Type might need converting to a physical type
372            match self.ty.physical_type() {
373                parquet::basic::Type::INT32 => quote! { Some(inner as i32) },
374                parquet::basic::Type::INT64 => quote! { Some(inner as i64) },
375                _ => quote! { Some(inner) },
376            }
377        };
378
379        quote! {
380            let vals: Vec<_> = records.iter().filter_map(|rec| {
381                if #binding {
382                    #some
383                } else {
384                    None
385                }
386            }).collect();
387        }
388    }
389
390    // generates code to read `field_name` from each record into a vector `vals`
391    fn copied_direct_vals(&self) -> proc_macro2::TokenStream {
392        let field_name = &self.ident;
393
394        let access = match self.third_party_type {
395            Some(ThirdPartyType::ChronoNaiveDateTime) => {
396                quote! { rec.#field_name.timestamp_millis() }
397            }
398            Some(ThirdPartyType::ChronoNaiveDate) => {
399                quote! { rec.#field_name.signed_duration_since(::chrono::NaiveDate::from_ymd(1970, 1, 1)).num_days() as i32 }
400            }
401            Some(ThirdPartyType::Uuid) => {
402                quote! { rec.#field_name.as_bytes().to_vec().into() }
403            }
404            _ => {
405                if self.is_a_byte_buf {
406                    quote! { (&rec.#field_name[..]).into() }
407                } else {
408                    // Type might need converting to a physical type
409                    match self.ty.physical_type() {
410                        parquet::basic::Type::INT32 => quote! { rec.#field_name as i32 },
411                        parquet::basic::Type::INT64 => quote! { rec.#field_name as i64 },
412                        _ => quote! { rec.#field_name },
413                    }
414                }
415            }
416        };
417
418        quote! {
419            let vals: Vec<_> = records.iter().map(|rec| #access).collect();
420        }
421    }
422
423    // generates code to read a vector `records` into `field_name` for each record
424    fn copied_direct_fields(&self) -> proc_macro2::TokenStream {
425        let field_name = &self.ident;
426
427        let value = match self.third_party_type {
428            Some(ThirdPartyType::ChronoNaiveDateTime) => {
429                quote! { ::chrono::naive::NaiveDateTime::from_timestamp_millis(vals[i]).unwrap() }
430            }
431            Some(ThirdPartyType::ChronoNaiveDate) => {
432                // NaiveDateTime::UNIX_EPOCH.num_days_from_ce() == 719163
433                quote! {
434                    ::chrono::naive::NaiveDate::from_num_days_from_ce_opt(vals[i].saturating_add(719163)).unwrap()
435                }
436            }
437            Some(ThirdPartyType::Uuid) => {
438                quote! { ::uuid::Uuid::from_bytes(vals[i].data().try_into().unwrap()) }
439            }
440            _ => match &self.ty {
441                Type::TypePath(_) => match self.ty.last_part().as_str() {
442                    "String" => quote! { String::from(std::str::from_utf8(vals[i].data())
443                    .expect("invalid UTF-8 sequence")) },
444                    t => {
445                        let s: proc_macro2::TokenStream = t.parse().unwrap();
446                        quote! { vals[i] as #s }
447                    }
448                },
449                Type::Vec(_) => quote! { vals[i].data().to_vec() },
450                f => unimplemented!("Unsupported: {:#?}", f),
451            },
452        };
453
454        quote! {
455            for (i, r) in &mut records[..num_records].iter_mut().enumerate() {
456                r.#field_name = #value;
457            }
458        }
459    }
460
461    fn optional_definition_levels(&self) -> proc_macro2::TokenStream {
462        let field_name = &self.ident;
463
464        quote! {
465            let definition_levels: Vec<i16> = self
466              .iter()
467              .map(|rec| if rec.#field_name.is_some() { 1 } else { 0 })
468              .collect();
469        }
470    }
471}
472
473#[allow(clippy::enum_variant_names)]
474#[allow(clippy::large_enum_variant)]
475#[derive(Debug, PartialEq)]
476enum Type {
477    Array(Box<Type>, syn::Expr),
478    Option(Box<Type>),
479    Slice(Box<Type>),
480    Vec(Box<Type>),
481    TypePath(syn::Type),
482    Reference(Option<syn::Lifetime>, Box<Type>),
483}
484
485impl Type {
486    /// Takes a rust type and returns the appropriate
487    /// parquet-rs column writer
488    fn column_writer(&self) -> syn::TypePath {
489        use parquet::basic::Type as BasicType;
490
491        match self.physical_type() {
492            BasicType::BOOLEAN => {
493                syn::parse_quote!(ColumnWriter::BoolColumnWriter)
494            }
495            BasicType::INT32 => syn::parse_quote!(ColumnWriter::Int32ColumnWriter),
496            BasicType::INT64 => syn::parse_quote!(ColumnWriter::Int64ColumnWriter),
497            BasicType::INT96 => syn::parse_quote!(ColumnWriter::Int96ColumnWriter),
498            BasicType::FLOAT => syn::parse_quote!(ColumnWriter::FloatColumnWriter),
499            BasicType::DOUBLE => syn::parse_quote!(ColumnWriter::DoubleColumnWriter),
500            BasicType::BYTE_ARRAY => {
501                syn::parse_quote!(ColumnWriter::ByteArrayColumnWriter)
502            }
503            BasicType::FIXED_LEN_BYTE_ARRAY => {
504                syn::parse_quote!(ColumnWriter::FixedLenByteArrayColumnWriter)
505            }
506        }
507    }
508
509    /// Takes a rust type and returns the appropriate
510    /// parquet-rs column reader
511    fn column_reader(&self) -> syn::TypePath {
512        use parquet::basic::Type as BasicType;
513
514        match self.physical_type() {
515            BasicType::BOOLEAN => {
516                syn::parse_quote!(ColumnReader::BoolColumnReader)
517            }
518            BasicType::INT32 => syn::parse_quote!(ColumnReader::Int32ColumnReader),
519            BasicType::INT64 => syn::parse_quote!(ColumnReader::Int64ColumnReader),
520            BasicType::INT96 => syn::parse_quote!(ColumnReader::Int96ColumnReader),
521            BasicType::FLOAT => syn::parse_quote!(ColumnReader::FloatColumnReader),
522            BasicType::DOUBLE => syn::parse_quote!(ColumnReader::DoubleColumnReader),
523            BasicType::BYTE_ARRAY => {
524                syn::parse_quote!(ColumnReader::ByteArrayColumnReader)
525            }
526            BasicType::FIXED_LEN_BYTE_ARRAY => {
527                syn::parse_quote!(ColumnReader::FixedLenByteArrayColumnReader)
528            }
529        }
530    }
531
532    /// Helper to simplify a nested field definition to its leaf type
533    ///
534    /// Ex:
535    ///   `Option<&String>` => Type::TypePath(String)
536    ///   `&Option<i32>` => Type::TypePath(i32)
537    ///   `Vec<Vec<u8>>` => Type::Vec(u8)
538    ///
539    /// Useful in determining the physical type of a field and the
540    /// definition levels.
541    fn leaf_type_recursive(&self) -> &Type {
542        Type::leaf_type_recursive_helper(self, None)
543    }
544
545    fn leaf_type_recursive_helper<'a>(ty: &'a Type, parent_ty: Option<&'a Type>) -> &'a Type {
546        match ty {
547            Type::TypePath(_) => parent_ty.unwrap_or(ty),
548            Type::Option(ref first_type)
549            | Type::Vec(ref first_type)
550            | Type::Array(ref first_type, _)
551            | Type::Slice(ref first_type)
552            | Type::Reference(_, ref first_type) => {
553                Type::leaf_type_recursive_helper(first_type, Some(ty))
554            }
555        }
556    }
557
558    /// Helper method to further unwrap leaf_type() to get inner-most
559    /// type information, useful for determining the physical type
560    /// and normalizing the type paths.
561    fn inner_type(&self) -> &syn::Type {
562        let leaf_type = self.leaf_type_recursive();
563
564        match leaf_type {
565            Type::TypePath(ref type_) => type_,
566            Type::Option(ref first_type)
567            | Type::Vec(ref first_type)
568            | Type::Array(ref first_type, _)
569            | Type::Slice(ref first_type)
570            | Type::Reference(_, ref first_type) => match **first_type {
571                Type::TypePath(ref type_) => type_,
572                _ => unimplemented!("leaf_type() should only return shallow types"),
573            },
574        }
575    }
576
577    /// Helper to normalize a type path by extracting the
578    /// most identifiable part
579    ///
580    /// Ex:
581    ///   std::string::String => String
582    ///   `Vec<u8>` => `Vec<u8>`
583    ///   chrono::NaiveDateTime => NaiveDateTime
584    ///
585    /// Does run the risk of mis-identifying a type if import
586    /// rename is in play. Please note procedural macros always
587    /// run before type resolution so this is a risk the user
588    /// takes on when renaming imports.
589    fn last_part(&self) -> String {
590        let inner_type = self.inner_type();
591        let inner_type_str = (quote! { #inner_type }).to_string();
592
593        inner_type_str
594            .split("::")
595            .last()
596            .unwrap()
597            .trim()
598            .to_string()
599    }
600
601    /// Converts rust types to parquet physical types.
602    ///
603    /// Ex:
604    ///   [u8; 10] => FIXED_LEN_BYTE_ARRAY
605    ///   `Vec<u8>`  => BYTE_ARRAY
606    ///   String => BYTE_ARRAY
607    ///   i32 => INT32
608    fn physical_type(&self) -> parquet::basic::Type {
609        use parquet::basic::Type as BasicType;
610
611        let last_part = self.last_part();
612        let leaf_type = self.leaf_type_recursive();
613
614        match leaf_type {
615            Type::Array(ref first_type, _length) => {
616                if let Type::TypePath(_) = **first_type {
617                    if last_part == "u8" {
618                        return BasicType::FIXED_LEN_BYTE_ARRAY;
619                    }
620                }
621            }
622            Type::Vec(ref first_type) | Type::Slice(ref first_type) => {
623                if let Type::TypePath(_) = **first_type {
624                    if last_part == "u8" {
625                        return BasicType::BYTE_ARRAY;
626                    }
627                }
628            }
629            _ => (),
630        }
631
632        match last_part.trim() {
633            "bool" => BasicType::BOOLEAN,
634            "u8" | "u16" | "u32" => BasicType::INT32,
635            "i8" | "i16" | "i32" | "NaiveDate" => BasicType::INT32,
636            "u64" | "i64" | "NaiveDateTime" => BasicType::INT64,
637            "usize" | "isize" => {
638                if usize::BITS == 64 {
639                    BasicType::INT64
640                } else {
641                    BasicType::INT32
642                }
643            }
644            "f32" => BasicType::FLOAT,
645            "f64" => BasicType::DOUBLE,
646            "String" | "str" => BasicType::BYTE_ARRAY,
647            "Uuid" => BasicType::FIXED_LEN_BYTE_ARRAY,
648            f => unimplemented!("{} currently is not supported", f),
649        }
650    }
651
652    fn length(&self) -> Option<syn::Expr> {
653        let last_part = self.last_part();
654        let leaf_type = self.leaf_type_recursive();
655
656        // `[u8; N]` => Some(N)
657        if let Type::Array(ref first_type, length) = leaf_type {
658            if let Type::TypePath(_) = **first_type {
659                if last_part == "u8" {
660                    return Some(length.clone());
661                }
662            }
663        }
664
665        match last_part.trim() {
666            // Uuid => [u8; 16] => Some(16)
667            "Uuid" => Some(syn::parse_quote!(16)),
668            _ => None,
669        }
670    }
671
672    fn logical_type(&self) -> proc_macro2::TokenStream {
673        let last_part = self.last_part();
674        let leaf_type = self.leaf_type_recursive();
675
676        match leaf_type {
677            Type::Array(ref first_type, _length) => {
678                if let Type::TypePath(_) = **first_type {
679                    if last_part == "u8" {
680                        return quote! { None };
681                    }
682                }
683            }
684            Type::Vec(ref first_type) | Type::Slice(ref first_type) => {
685                if let Type::TypePath(_) = **first_type {
686                    if last_part == "u8" {
687                        return quote! { None };
688                    }
689                }
690            }
691            _ => (),
692        }
693
694        match last_part.trim() {
695            "bool" => quote! { None },
696            "u8" => quote! { Some(LogicalType::Integer {
697                bit_width: 8,
698                is_signed: false,
699            }) },
700            "u16" => quote! { Some(LogicalType::Integer {
701                bit_width: 16,
702                is_signed: false,
703            }) },
704            "u32" => quote! { Some(LogicalType::Integer {
705                bit_width: 32,
706                is_signed: false,
707            }) },
708            "u64" => quote! { Some(LogicalType::Integer {
709                bit_width: 64,
710                is_signed: false,
711            }) },
712            "i8" => quote! { Some(LogicalType::Integer {
713                bit_width: 8,
714                is_signed: true,
715            }) },
716            "i16" => quote! { Some(LogicalType::Integer {
717                bit_width: 16,
718                is_signed: true,
719            }) },
720            "i32" | "i64" => quote! { None },
721            "usize" => {
722                quote! { Some(LogicalType::Integer {
723                    bit_width: usize::BITS as i8,
724                    is_signed: false
725                }) }
726            }
727            "isize" => {
728                quote! { Some(LogicalType::Integer {
729                    bit_width: usize::BITS as i8,
730                    is_signed: true
731                }) }
732            }
733            "NaiveDate" => quote! { Some(LogicalType::Date) },
734            "NaiveDateTime" => quote! { None },
735            "f32" | "f64" => quote! { None },
736            "String" | "str" => quote! { Some(LogicalType::String) },
737            "Uuid" => quote! { Some(LogicalType::Uuid) },
738            f => unimplemented!("{} currently is not supported", f),
739        }
740    }
741
742    fn converted_type(&self) -> Option<proc_macro2::TokenStream> {
743        let last_part = self.last_part();
744
745        match last_part.trim() {
746            "NaiveDateTime" => Some(quote! { ::parquet::basic::ConvertedType::TIMESTAMP_MILLIS }),
747            _ => None,
748        }
749    }
750
751    fn repetition(&self) -> proc_macro2::TokenStream {
752        match self {
753            Type::Option(_) => quote! { ::parquet::basic::Repetition::OPTIONAL },
754            Type::Reference(_, ty) => ty.repetition(),
755            _ => quote! { ::parquet::basic::Repetition::REQUIRED },
756        }
757    }
758
759    /// Convert a parsed rust field AST in to a more easy to manipulate
760    /// parquet_derive::Field
761    fn from(f: &syn::Field) -> Self {
762        Type::from_type(f, &f.ty)
763    }
764
765    fn from_type(f: &syn::Field, ty: &syn::Type) -> Self {
766        match ty {
767            syn::Type::Path(ref p) => Type::from_type_path(f, p),
768            syn::Type::Reference(ref tr) => Type::from_type_reference(f, tr),
769            syn::Type::Array(ref ta) => Type::from_type_array(f, ta),
770            syn::Type::Slice(ref ts) => Type::from_type_slice(f, ts),
771            other => unimplemented!(
772                "Unable to derive {:?} - it is currently an unsupported type\n{:#?}",
773                f.ident.as_ref().unwrap(),
774                other
775            ),
776        }
777    }
778
779    fn from_type_path(f: &syn::Field, p: &syn::TypePath) -> Self {
780        let last_segment = p.path.segments.last().unwrap();
781
782        let is_vec = last_segment.ident == syn::Ident::new("Vec", proc_macro2::Span::call_site());
783        let is_option =
784            last_segment.ident == syn::Ident::new("Option", proc_macro2::Span::call_site());
785
786        if is_vec || is_option {
787            let generic_type = match &last_segment.arguments {
788                syn::PathArguments::AngleBracketed(angle_args) => {
789                    assert_eq!(angle_args.args.len(), 1);
790                    let first_arg = &angle_args.args[0];
791
792                    match first_arg {
793                        syn::GenericArgument::Type(ref typath) => typath.clone(),
794                        other => unimplemented!("Unsupported: {:#?}", other),
795                    }
796                }
797                other => unimplemented!("Unsupported: {:#?}", other),
798            };
799
800            if is_vec {
801                Type::Vec(Box::new(Type::from_type(f, &generic_type)))
802            } else {
803                Type::Option(Box::new(Type::from_type(f, &generic_type)))
804            }
805        } else {
806            Type::TypePath(syn::Type::Path(p.clone()))
807        }
808    }
809
810    fn from_type_reference(f: &syn::Field, tr: &syn::TypeReference) -> Self {
811        let lifetime = tr.lifetime.clone();
812        let inner_type = Type::from_type(f, tr.elem.as_ref());
813        Type::Reference(lifetime, Box::new(inner_type))
814    }
815
816    fn from_type_array(f: &syn::Field, ta: &syn::TypeArray) -> Self {
817        let inner_type = Type::from_type(f, ta.elem.as_ref());
818        Type::Array(Box::new(inner_type), ta.len.clone())
819    }
820
821    fn from_type_slice(f: &syn::Field, ts: &syn::TypeSlice) -> Self {
822        let inner_type = Type::from_type(f, ts.elem.as_ref());
823        Type::Slice(Box::new(inner_type))
824    }
825}
826
827#[cfg(test)]
828mod test {
829    use super::*;
830    use syn::{Data, DataStruct, DeriveInput};
831
832    fn extract_fields(input: proc_macro2::TokenStream) -> Vec<syn::Field> {
833        let input: DeriveInput = syn::parse2(input).unwrap();
834
835        let fields = match input.data {
836            Data::Struct(DataStruct { fields, .. }) => fields,
837            _ => panic!("Input must be a struct"),
838        };
839
840        fields.iter().map(|field| field.to_owned()).collect()
841    }
842
843    #[test]
844    fn test_generating_a_simple_writer_snippet() {
845        let snippet: proc_macro2::TokenStream = quote! {
846          struct ABoringStruct {
847            counter: usize,
848          }
849        };
850
851        let fields = extract_fields(snippet);
852        let counter = Field::from(&fields[0]);
853
854        let snippet = counter.writer_snippet().to_string();
855        assert_eq!(snippet,
856                   (quote!{
857                        {
858                            let vals : Vec < _ > = records . iter ( ) . map ( | rec | rec . counter as i64 ) . collect ( );
859
860                            if let ColumnWriter::Int64ColumnWriter ( ref mut typed ) = column_writer.untyped() {
861                                typed . write_batch ( & vals [ .. ] , None , None ) ?;
862                            }  else {
863                                panic!("Schema and struct disagree on type for {}" , stringify!{ counter } )
864                            }
865                        }
866                   }).to_string()
867        )
868    }
869
870    #[test]
871    fn test_generating_a_simple_reader_snippet() {
872        let snippet: proc_macro2::TokenStream = quote! {
873          struct ABoringStruct {
874            counter: usize,
875          }
876        };
877
878        let fields = extract_fields(snippet);
879        let counter = Field::from(&fields[0]);
880
881        let snippet = counter.reader_snippet().to_string();
882        assert_eq!(
883            snippet,
884            (quote! {
885                 {
886                    let mut vals = Vec::new();
887                    if let ColumnReader::Int64ColumnReader(mut typed) = column_reader {
888                        let mut definition_levels = Vec::new();
889                        let (total_num, valid_num, decoded_num) = typed.read_records(
890                            num_records, Some(&mut definition_levels), None, &mut vals)?;
891                        if valid_num != decoded_num {
892                            panic!("Support only valid records, found {} null records in column type {}",
893                                decoded_num - valid_num, stringify!{counter});
894                        }
895                    } else {
896                        panic!("Schema and struct disagree on type for {}", stringify!{counter});
897                    }
898                    for (i, r) in &mut records[..num_records].iter_mut().enumerate() {
899                        r.counter = vals[i] as usize;
900                    }
901                 }
902            })
903            .to_string()
904        )
905    }
906
907    #[test]
908    fn test_optional_to_writer_snippet() {
909        let struct_def: proc_macro2::TokenStream = quote! {
910          struct StringBorrower<'a> {
911            optional_str: Option<&'a str>,
912            optional_string: Option<&String>,
913            optional_dumb_int: Option<&i32>,
914          }
915        };
916
917        let fields = extract_fields(struct_def);
918
919        let optional = Field::from(&fields[0]);
920        let snippet = optional.writer_snippet();
921        assert_eq!(snippet.to_string(),
922          (quote! {
923          {
924                let definition_levels : Vec < i16 > = self . iter ( ) . map ( | rec | if rec . optional_str . is_some ( ) { 1 } else { 0 } ) . collect ( ) ;
925
926                let vals: Vec <_> = records.iter().filter_map( |rec| {
927                    if let Some ( ref inner ) = rec . optional_str {
928                        Some ( (&inner[..]).into() )
929                    } else {
930                        None
931                    }
932                }).collect();
933
934                if let ColumnWriter::ByteArrayColumnWriter ( ref mut typed ) = column_writer.untyped() {
935                    typed . write_batch ( & vals [ .. ] , Some(&definition_levels[..]) , None ) ? ;
936                } else {
937                    panic!("Schema and struct disagree on type for {}" , stringify ! { optional_str } )
938                }
939           }
940            }
941          ).to_string());
942
943        let optional = Field::from(&fields[1]);
944        let snippet = optional.writer_snippet();
945        assert_eq!(snippet.to_string(),
946                   (quote!{
947                   {
948                        let definition_levels : Vec < i16 > = self . iter ( ) . map ( | rec | if rec . optional_string . is_some ( ) { 1 } else { 0 } ) . collect ( ) ;
949
950                        let vals: Vec <_> = records.iter().filter_map( |rec| {
951                            if let Some ( ref inner ) = rec . optional_string {
952                                Some ( (&inner[..]).into() )
953                            } else {
954                                None
955                            }
956                        }).collect();
957
958                        if let ColumnWriter::ByteArrayColumnWriter ( ref mut typed ) = column_writer.untyped() {
959                            typed . write_batch ( & vals [ .. ] , Some(&definition_levels[..]) , None ) ? ;
960                        } else {
961                            panic!("Schema and struct disagree on type for {}" , stringify ! { optional_string } )
962                        }
963                    }
964        }).to_string());
965
966        let optional = Field::from(&fields[2]);
967        let snippet = optional.writer_snippet();
968        assert_eq!(snippet.to_string(),
969                   (quote!{
970                    {
971                        let definition_levels : Vec < i16 > = self . iter ( ) . map ( | rec | if rec . optional_dumb_int . is_some ( ) { 1 } else { 0 } ) . collect ( ) ;
972
973                        let vals: Vec <_> = records.iter().filter_map( |rec| {
974                            if let Some ( inner ) = rec . optional_dumb_int {
975                                Some ( inner as i32 )
976                            } else {
977                                None
978                            }
979                        }).collect();
980
981                        if let ColumnWriter::Int32ColumnWriter ( ref mut typed ) = column_writer.untyped() {
982                            typed . write_batch ( & vals [ .. ] , Some(&definition_levels[..]) , None ) ? ;
983                        }  else {
984                            panic!("Schema and struct disagree on type for {}" , stringify ! { optional_dumb_int } )
985                        }
986                    }
987        }).to_string());
988    }
989
990    #[test]
991    fn test_converting_to_column_writer_type() {
992        let snippet: proc_macro2::TokenStream = quote! {
993          struct ABasicStruct {
994            yes_no: bool,
995            name: String,
996          }
997        };
998
999        let fields = extract_fields(snippet);
1000        let processed: Vec<_> = fields.iter().map(Field::from).collect();
1001
1002        let column_writers: Vec<_> = processed
1003            .iter()
1004            .map(|field| field.ty.column_writer())
1005            .collect();
1006
1007        assert_eq!(
1008            column_writers,
1009            vec![
1010                syn::parse_quote!(ColumnWriter::BoolColumnWriter),
1011                syn::parse_quote!(ColumnWriter::ByteArrayColumnWriter)
1012            ]
1013        );
1014    }
1015
1016    #[test]
1017    fn test_converting_to_column_reader_type() {
1018        let snippet: proc_macro2::TokenStream = quote! {
1019          struct ABasicStruct {
1020            yes_no: bool,
1021            name: String,
1022          }
1023        };
1024
1025        let fields = extract_fields(snippet);
1026        let processed: Vec<_> = fields.iter().map(Field::from).collect();
1027
1028        let column_readers: Vec<_> = processed
1029            .iter()
1030            .map(|field| field.ty.column_reader())
1031            .collect();
1032
1033        assert_eq!(
1034            column_readers,
1035            vec![
1036                syn::parse_quote!(ColumnReader::BoolColumnReader),
1037                syn::parse_quote!(ColumnReader::ByteArrayColumnReader)
1038            ]
1039        );
1040    }
1041
1042    #[test]
1043    fn convert_basic_struct() {
1044        let snippet: proc_macro2::TokenStream = quote! {
1045          struct ABasicStruct {
1046            yes_no: bool,
1047            name: String,
1048            length: usize
1049          }
1050        };
1051
1052        let fields = extract_fields(snippet);
1053        let processed: Vec<_> = fields.iter().map(Field::from).collect();
1054        assert_eq!(processed.len(), 3);
1055
1056        assert_eq!(
1057            processed,
1058            vec![
1059                Field {
1060                    ident: syn::Ident::new("yes_no", proc_macro2::Span::call_site()),
1061                    ty: Type::TypePath(syn::parse_quote!(bool)),
1062                    is_a_byte_buf: false,
1063                    third_party_type: None,
1064                },
1065                Field {
1066                    ident: syn::Ident::new("name", proc_macro2::Span::call_site()),
1067                    ty: Type::TypePath(syn::parse_quote!(String)),
1068                    is_a_byte_buf: true,
1069                    third_party_type: None,
1070                },
1071                Field {
1072                    ident: syn::Ident::new("length", proc_macro2::Span::call_site()),
1073                    ty: Type::TypePath(syn::parse_quote!(usize)),
1074                    is_a_byte_buf: false,
1075                    third_party_type: None,
1076                }
1077            ]
1078        )
1079    }
1080
1081    #[test]
1082    fn test_get_inner_type() {
1083        let snippet: proc_macro2::TokenStream = quote! {
1084          struct LotsOfInnerTypes {
1085            a_vec: Vec<u8>,
1086            a_option: ::std::option::Option<bool>,
1087            a_silly_string: ::std::string::String,
1088            a_complicated_thing: ::std::option::Option<::std::result::Result<(),()>>,
1089          }
1090        };
1091
1092        let fields = extract_fields(snippet);
1093        let converted_fields: Vec<_> = fields.iter().map(Type::from).collect();
1094        let inner_types: Vec<_> = converted_fields
1095            .iter()
1096            .map(|field| field.inner_type())
1097            .collect();
1098        let inner_types_strs: Vec<_> = inner_types
1099            .iter()
1100            .map(|ty| (quote! { #ty }).to_string())
1101            .collect();
1102
1103        assert_eq!(
1104            inner_types_strs,
1105            vec![
1106                "u8",
1107                "bool",
1108                ":: std :: string :: String",
1109                ":: std :: result :: Result < () , () >"
1110            ]
1111        )
1112    }
1113
1114    #[test]
1115    fn test_physical_type() {
1116        use parquet::basic::Type as BasicType;
1117        let snippet: proc_macro2::TokenStream = quote! {
1118          struct LotsOfInnerTypes {
1119            a_buf: ::std::vec::Vec<u8>,
1120            a_number: i32,
1121            a_verbose_option: ::std::option::Option<bool>,
1122            a_silly_string: String,
1123            a_fix_byte_buf: [u8; 10],
1124            a_complex_option: ::std::option::Option<&Vec<u8>>,
1125            a_complex_vec: &::std::vec::Vec<&Option<u8>>,
1126            a_uuid: ::uuid::Uuid,
1127          }
1128        };
1129
1130        let fields = extract_fields(snippet);
1131        let converted_fields: Vec<_> = fields.iter().map(Type::from).collect();
1132        let physical_types: Vec<_> = converted_fields
1133            .iter()
1134            .map(|ty| ty.physical_type())
1135            .collect();
1136
1137        assert_eq!(
1138            physical_types,
1139            vec![
1140                BasicType::BYTE_ARRAY,
1141                BasicType::INT32,
1142                BasicType::BOOLEAN,
1143                BasicType::BYTE_ARRAY,
1144                BasicType::FIXED_LEN_BYTE_ARRAY,
1145                BasicType::BYTE_ARRAY,
1146                BasicType::INT32,
1147                BasicType::FIXED_LEN_BYTE_ARRAY,
1148            ]
1149        )
1150    }
1151
1152    #[test]
1153    fn test_type_length() {
1154        let snippet: proc_macro2::TokenStream = quote! {
1155          struct LotsOfInnerTypes {
1156            a_buf: ::std::vec::Vec<u8>,
1157            a_number: i32,
1158            a_verbose_option: ::std::option::Option<bool>,
1159            a_silly_string: String,
1160            a_fix_byte_buf: [u8; 10],
1161            a_complex_option: ::std::option::Option<&Vec<u8>>,
1162            a_complex_vec: &::std::vec::Vec<&Option<u8>>,
1163            a_uuid: ::uuid::Uuid,
1164          }
1165        };
1166
1167        let fields = extract_fields(snippet);
1168        let converted_fields: Vec<_> = fields.iter().map(Type::from).collect();
1169        let lengths: Vec<_> = converted_fields.iter().map(|ty| ty.length()).collect();
1170
1171        assert_eq!(
1172            lengths,
1173            vec![
1174                None,
1175                None,
1176                None,
1177                None,
1178                Some(syn::parse_quote!(10)),
1179                None,
1180                None,
1181                Some(syn::parse_quote!(16)),
1182            ]
1183        )
1184    }
1185
1186    #[test]
1187    fn test_convert_comprehensive_owned_struct() {
1188        let snippet: proc_macro2::TokenStream = quote! {
1189          struct VecHolder {
1190            a_vec: ::std::vec::Vec<u8>,
1191            a_option: ::std::option::Option<bool>,
1192            a_silly_string: ::std::string::String,
1193            a_complicated_thing: ::std::option::Option<::std::result::Result<(),()>>,
1194          }
1195        };
1196
1197        let fields = extract_fields(snippet);
1198        let converted_fields: Vec<_> = fields.iter().map(Type::from).collect();
1199
1200        assert_eq!(
1201            converted_fields,
1202            vec![
1203                Type::Vec(Box::new(Type::TypePath(syn::parse_quote!(u8)))),
1204                Type::Option(Box::new(Type::TypePath(syn::parse_quote!(bool)))),
1205                Type::TypePath(syn::parse_quote!(::std::string::String)),
1206                Type::Option(Box::new(Type::TypePath(
1207                    syn::parse_quote!(::std::result::Result<(),()>)
1208                ))),
1209            ]
1210        );
1211    }
1212
1213    #[test]
1214    fn test_convert_borrowed_struct() {
1215        let snippet: proc_macro2::TokenStream = quote! {
1216          struct Borrower<'a> {
1217            a_str: &'a str,
1218            a_borrowed_option: &'a Option<bool>,
1219            so_many_borrows: &'a Option<&'a str>,
1220          }
1221        };
1222
1223        let fields = extract_fields(snippet);
1224        let types: Vec<_> = fields.iter().map(Type::from).collect();
1225
1226        assert_eq!(
1227            types,
1228            vec![
1229                Type::Reference(
1230                    Some(syn::Lifetime::new("'a", proc_macro2::Span::call_site())),
1231                    Box::new(Type::TypePath(syn::parse_quote!(str)))
1232                ),
1233                Type::Reference(
1234                    Some(syn::Lifetime::new("'a", proc_macro2::Span::call_site())),
1235                    Box::new(Type::Option(Box::new(Type::TypePath(syn::parse_quote!(
1236                        bool
1237                    )))))
1238                ),
1239                Type::Reference(
1240                    Some(syn::Lifetime::new("'a", proc_macro2::Span::call_site())),
1241                    Box::new(Type::Option(Box::new(Type::Reference(
1242                        Some(syn::Lifetime::new("'a", proc_macro2::Span::call_site())),
1243                        Box::new(Type::TypePath(syn::parse_quote!(str)))
1244                    ))))
1245                ),
1246            ]
1247        );
1248    }
1249
1250    #[test]
1251    fn test_chrono_timestamp_millis_write() {
1252        let snippet: proc_macro2::TokenStream = quote! {
1253          struct ATimestampStruct {
1254            henceforth: chrono::NaiveDateTime,
1255            maybe_happened: Option<&chrono::NaiveDateTime>,
1256          }
1257        };
1258
1259        let fields = extract_fields(snippet);
1260        let when = Field::from(&fields[0]);
1261        assert_eq!(when.writer_snippet().to_string(),(quote!{
1262            {
1263                let vals : Vec<_> = records.iter().map(|rec| rec.henceforth.timestamp_millis() ).collect();
1264                if let ColumnWriter::Int64ColumnWriter(ref mut typed) = column_writer.untyped() {
1265                    typed.write_batch(&vals[..], None, None) ?;
1266                } else {
1267                    panic!("Schema and struct disagree on type for {}" , stringify!{ henceforth })
1268                }
1269            }
1270        }).to_string());
1271
1272        let maybe_happened = Field::from(&fields[1]);
1273        assert_eq!(maybe_happened.writer_snippet().to_string(),(quote!{
1274            {
1275                let definition_levels : Vec<i16> = self.iter().map(|rec| if rec.maybe_happened.is_some() { 1 } else { 0 }).collect();
1276                let vals : Vec<_> = records.iter().filter_map(|rec| {
1277                    if let Some(inner) = rec.maybe_happened {
1278                        Some(inner.timestamp_millis())
1279                    } else {
1280                        None
1281                    }
1282                }).collect();
1283
1284                if let ColumnWriter::Int64ColumnWriter(ref mut typed) = column_writer.untyped() {
1285                    typed.write_batch(&vals[..], Some(&definition_levels[..]), None) ?;
1286                } else {
1287                    panic!("Schema and struct disagree on type for {}" , stringify!{ maybe_happened })
1288                }
1289            }
1290        }).to_string());
1291    }
1292
1293    #[test]
1294    fn test_chrono_timestamp_millis_read() {
1295        let snippet: proc_macro2::TokenStream = quote! {
1296          struct ATimestampStruct {
1297            henceforth: chrono::NaiveDateTime,
1298          }
1299        };
1300
1301        let fields = extract_fields(snippet);
1302        let when = Field::from(&fields[0]);
1303        assert_eq!(when.reader_snippet().to_string(),(quote!{
1304            {
1305                let mut vals = Vec::new();
1306                if let ColumnReader::Int64ColumnReader(mut typed) = column_reader {
1307                    let mut definition_levels = Vec::new();
1308                    let (total_num, valid_num, decoded_num) = typed.read_records(
1309                        num_records, Some(&mut definition_levels), None, &mut vals)?;
1310                    if valid_num != decoded_num {
1311                        panic!("Support only valid records, found {} null records in column type {}",
1312                            decoded_num - valid_num, stringify!{henceforth});
1313                    }
1314                } else {
1315                    panic!("Schema and struct disagree on type for {}", stringify!{ henceforth });
1316                }
1317                for (i, r) in &mut records[..num_records].iter_mut().enumerate() {
1318                    r.henceforth = ::chrono::naive::NaiveDateTime::from_timestamp_millis(vals[i]).unwrap();
1319                }
1320            }
1321        }).to_string());
1322    }
1323
1324    #[test]
1325    fn test_chrono_date_write() {
1326        let snippet: proc_macro2::TokenStream = quote! {
1327          struct ATimestampStruct {
1328            henceforth: chrono::NaiveDate,
1329            maybe_happened: Option<&chrono::NaiveDate>,
1330          }
1331        };
1332
1333        let fields = extract_fields(snippet);
1334        let when = Field::from(&fields[0]);
1335        assert_eq!(when.writer_snippet().to_string(),(quote!{
1336            {
1337                let vals : Vec<_> = records.iter().map(|rec| rec.henceforth.signed_duration_since(::chrono::NaiveDate::from_ymd(1970, 1, 1)).num_days() as i32).collect();
1338                if let ColumnWriter::Int32ColumnWriter(ref mut typed) = column_writer.untyped() {
1339                    typed.write_batch(&vals[..], None, None) ?;
1340                } else {
1341                    panic!("Schema and struct disagree on type for {}" , stringify!{ henceforth })
1342                }
1343            }
1344        }).to_string());
1345
1346        let maybe_happened = Field::from(&fields[1]);
1347        assert_eq!(maybe_happened.writer_snippet().to_string(),(quote!{
1348            {
1349                let definition_levels : Vec<i16> = self.iter().map(|rec| if rec.maybe_happened.is_some() { 1 } else { 0 }).collect();
1350                let vals : Vec<_> = records.iter().filter_map(|rec| {
1351                    if let Some(inner) = rec.maybe_happened {
1352                        Some(inner.signed_duration_since(::chrono::NaiveDate::from_ymd(1970, 1, 1)).num_days() as i32)
1353                    } else {
1354                        None
1355                    }
1356                }).collect();
1357
1358                if let ColumnWriter::Int32ColumnWriter(ref mut typed) = column_writer.untyped() {
1359                    typed.write_batch(&vals[..], Some(&definition_levels[..]), None) ?;
1360                } else {
1361                    panic!("Schema and struct disagree on type for {}" , stringify!{ maybe_happened })
1362                }
1363            }
1364        }).to_string());
1365    }
1366
1367    #[test]
1368    fn test_chrono_date_read() {
1369        let snippet: proc_macro2::TokenStream = quote! {
1370          struct ATimestampStruct {
1371            henceforth: chrono::NaiveDate,
1372          }
1373        };
1374
1375        let fields = extract_fields(snippet);
1376        let when = Field::from(&fields[0]);
1377        assert_eq!(when.reader_snippet().to_string(),(quote!{
1378            {
1379                let mut vals = Vec::new();
1380                if let ColumnReader::Int32ColumnReader(mut typed) = column_reader {
1381                    let mut definition_levels = Vec::new();
1382                    let (total_num, valid_num, decoded_num) = typed.read_records(
1383                        num_records, Some(&mut definition_levels), None, &mut vals)?;
1384                    if valid_num != decoded_num {
1385                        panic!("Support only valid records, found {} null records in column type {}",
1386                            decoded_num - valid_num, stringify!{henceforth});
1387                    }
1388                } else {
1389                    panic!("Schema and struct disagree on type for {}", stringify!{ henceforth });
1390                }
1391                for (i, r) in &mut records[..num_records].iter_mut().enumerate() {
1392                    r.henceforth = ::chrono::naive::NaiveDate::from_num_days_from_ce_opt(vals[i].saturating_add(719163)).unwrap();
1393                }
1394            }
1395        }).to_string());
1396    }
1397
1398    #[test]
1399    fn test_uuid_write() {
1400        let snippet: proc_macro2::TokenStream = quote! {
1401          struct AUuidStruct {
1402            unique_id: uuid::Uuid,
1403            maybe_unique_id: Option<&uuid::Uuid>,
1404          }
1405        };
1406
1407        let fields = extract_fields(snippet);
1408        let when = Field::from(&fields[0]);
1409        assert_eq!(when.writer_snippet().to_string(),(quote!{
1410            {
1411                let vals : Vec<_> = records.iter().map(|rec| rec.unique_id.as_bytes().to_vec().into() ).collect();
1412                if let ColumnWriter::FixedLenByteArrayColumnWriter(ref mut typed) = column_writer.untyped() {
1413                    typed.write_batch(&vals[..], None, None) ?;
1414                } else {
1415                    panic!("Schema and struct disagree on type for {}" , stringify!{ unique_id })
1416                }
1417            }
1418        }).to_string());
1419
1420        let maybe_happened = Field::from(&fields[1]);
1421        assert_eq!(maybe_happened.writer_snippet().to_string(),(quote!{
1422            {
1423                let definition_levels : Vec<i16> = self.iter().map(|rec| if rec.maybe_unique_id.is_some() { 1 } else { 0 }).collect();
1424                let vals : Vec<_> = records.iter().filter_map(|rec| {
1425                    if let Some(ref inner) = rec.maybe_unique_id {
1426                        Some((&inner.to_string()[..]).into())
1427                    } else {
1428                        None
1429                    }
1430                }).collect();
1431
1432                if let ColumnWriter::FixedLenByteArrayColumnWriter(ref mut typed) = column_writer.untyped() {
1433                    typed.write_batch(&vals[..], Some(&definition_levels[..]), None) ?;
1434                } else {
1435                    panic!("Schema and struct disagree on type for {}" , stringify!{ maybe_unique_id })
1436                }
1437            }
1438        }).to_string());
1439    }
1440
1441    #[test]
1442    fn test_uuid_read() {
1443        let snippet: proc_macro2::TokenStream = quote! {
1444          struct AUuidStruct {
1445            unique_id: uuid::Uuid,
1446          }
1447        };
1448
1449        let fields = extract_fields(snippet);
1450        let when = Field::from(&fields[0]);
1451        assert_eq!(when.reader_snippet().to_string(),(quote!{
1452            {
1453                let mut vals = Vec::new();
1454                if let ColumnReader::FixedLenByteArrayColumnReader(mut typed) = column_reader {
1455                    let mut definition_levels = Vec::new();
1456                    let (total_num, valid_num, decoded_num) = typed.read_records(
1457                        num_records, Some(&mut definition_levels), None, &mut vals)?;
1458                    if valid_num != decoded_num {
1459                        panic!("Support only valid records, found {} null records in column type {}",
1460                            decoded_num - valid_num, stringify!{unique_id});
1461                    }
1462                } else {
1463                    panic!("Schema and struct disagree on type for {}", stringify!{ unique_id });
1464                }
1465                for (i, r) in &mut records[..num_records].iter_mut().enumerate() {
1466                    r.unique_id = ::uuid::Uuid::from_bytes(vals[i].data().try_into().unwrap());
1467                }
1468            }
1469        }).to_string());
1470    }
1471
1472    #[test]
1473    fn test_converted_type() {
1474        let snippet: proc_macro2::TokenStream = quote! {
1475          struct ATimeStruct {
1476            time: chrono::NaiveDateTime,
1477          }
1478        };
1479
1480        let fields = extract_fields(snippet);
1481
1482        let time = Field::from(&fields[0]);
1483
1484        let converted_type = time.ty.converted_type();
1485        assert_eq!(
1486            converted_type.unwrap().to_string(),
1487            quote! { ::parquet::basic::ConvertedType::TIMESTAMP_MILLIS }.to_string()
1488        );
1489    }
1490}