1#[derive(Debug, PartialEq)]
19pub struct Field {
20 ident: syn::Ident,
21 ty: Type,
22 is_a_byte_buf: bool,
23 third_party_type: Option<ThirdPartyType>,
24}
25
26#[derive(Debug, PartialEq)]
34enum ThirdPartyType {
35 ChronoNaiveDateTime,
36 ChronoNaiveDate,
37 Uuid,
38}
39
40impl Field {
41 pub fn from(f: &syn::Field) -> Self {
42 let ty = Type::from(f);
43 let is_a_byte_buf = ty.physical_type() == parquet::basic::Type::BYTE_ARRAY;
44
45 let third_party_type = match &ty.last_part()[..] {
46 "NaiveDateTime" => Some(ThirdPartyType::ChronoNaiveDateTime),
47 "NaiveDate" => Some(ThirdPartyType::ChronoNaiveDate),
48 "Uuid" => Some(ThirdPartyType::Uuid),
49 _ => None,
50 };
51
52 Field {
53 ident: f
54 .ident
55 .clone()
56 .expect("Only structs with named fields are currently supported"),
57 ty,
58 is_a_byte_buf,
59 third_party_type,
60 }
61 }
62
63 pub fn writer_snippet(&self) -> proc_macro2::TokenStream {
84 let ident = &self.ident;
85 let column_writer = self.ty.column_writer();
86
87 let vals_builder = match &self.ty {
88 Type::TypePath(_) => self.copied_direct_vals(),
89 Type::Option(first_type) => match **first_type {
90 Type::TypePath(_) => self.option_into_vals(),
91 Type::Reference(_, ref second_type) => match **second_type {
92 Type::TypePath(_) => self.option_into_vals(),
93 _ => unimplemented!("Unsupported type encountered"),
94 },
95 Type::Vec(ref first_type) => match **first_type {
96 Type::TypePath(_) => self.option_into_vals(),
97 _ => unimplemented!("Unsupported type encountered"),
98 },
99 ref f => unimplemented!("Unsupported: {:#?}", f),
100 },
101 Type::Reference(_, first_type) => match **first_type {
102 Type::TypePath(_) => self.copied_direct_vals(),
103 Type::Option(ref second_type) => match **second_type {
104 Type::TypePath(_) => self.option_into_vals(),
105 Type::Reference(_, ref second_type) => match **second_type {
106 Type::TypePath(_) => self.option_into_vals(),
107 Type::Slice(ref second_type) => match **second_type {
108 Type::TypePath(_) => self.option_into_vals(),
109 ref f => unimplemented!("Unsupported: {:#?}", f),
110 },
111 _ => unimplemented!("Unsupported type encountered"),
112 },
113 Type::Vec(ref first_type) => match **first_type {
114 Type::TypePath(_) => self.option_into_vals(),
115 _ => unimplemented!("Unsupported type encountered"),
116 },
117 ref f => unimplemented!("Unsupported: {:#?}", f),
118 },
119 Type::Slice(ref second_type) => match **second_type {
120 Type::TypePath(_) => self.copied_direct_vals(),
121 ref f => unimplemented!("Unsupported: {:#?}", f),
122 },
123 ref f => unimplemented!("Unsupported: {:#?}", f),
124 },
125 Type::Vec(first_type) => match **first_type {
126 Type::TypePath(_) => self.copied_direct_vals(),
127 ref f => unimplemented!("Unsupported: {:#?}", f),
128 },
129 f => unimplemented!("Unsupported: {:#?}", f),
130 };
131
132 let definition_levels = match &self.ty {
133 Type::TypePath(_) => None,
134 Type::Option(first_type) => match **first_type {
135 Type::TypePath(_) => Some(self.optional_definition_levels()),
136 Type::Option(_) => unimplemented!("Unsupported nesting encountered"),
137 Type::Reference(_, ref second_type)
138 | Type::Vec(ref second_type)
139 | Type::Array(ref second_type, _)
140 | Type::Slice(ref second_type) => match **second_type {
141 Type::TypePath(_) => Some(self.optional_definition_levels()),
142 _ => unimplemented!("Unsupported nesting encountered"),
143 },
144 },
145 Type::Reference(_, first_type)
146 | Type::Vec(first_type)
147 | Type::Array(first_type, _)
148 | Type::Slice(first_type) => match **first_type {
149 Type::TypePath(_) => None,
150 Type::Vec(ref second_type)
151 | Type::Array(ref second_type, _)
152 | Type::Slice(ref second_type) => match **second_type {
153 Type::TypePath(_) => None,
154 Type::Reference(_, ref third_type) => match **third_type {
155 Type::TypePath(_) => None,
156 _ => unimplemented!("Unsupported definition encountered"),
157 },
158 _ => unimplemented!("Unsupported definition encountered"),
159 },
160 Type::Reference(_, ref second_type) | Type::Option(ref second_type) => {
161 match **second_type {
162 Type::TypePath(_) => Some(self.optional_definition_levels()),
163 Type::Vec(ref third_type)
164 | Type::Array(ref third_type, _)
165 | Type::Slice(ref third_type) => match **third_type {
166 Type::TypePath(_) => Some(self.optional_definition_levels()),
167 Type::Reference(_, ref fourth_type) => match **fourth_type {
168 Type::TypePath(_) => Some(self.optional_definition_levels()),
169 _ => unimplemented!("Unsupported definition encountered"),
170 },
171 _ => unimplemented!("Unsupported definition encountered"),
172 },
173 Type::Reference(_, ref third_type) => match **third_type {
174 Type::TypePath(_) => Some(self.optional_definition_levels()),
175 Type::Slice(ref fourth_type) => match **fourth_type {
176 Type::TypePath(_) => Some(self.optional_definition_levels()),
177 _ => unimplemented!("Unsupported definition encountered"),
178 },
179 _ => unimplemented!("Unsupported definition encountered"),
180 },
181 _ => unimplemented!("Unsupported definition encountered"),
182 }
183 }
184 },
185 };
186
187 let write_batch_expr = if definition_levels.is_some() {
194 quote! {
195 if let #column_writer(typed) = column_writer.untyped() {
196 typed.write_batch(&vals[..], Some(&definition_levels[..]), None)?;
197 } else {
198 panic!("Schema and struct disagree on type for {}", stringify!{#ident})
199 }
200 }
201 } else {
202 quote! {
203 if let #column_writer(typed) = column_writer.untyped() {
204 typed.write_batch(&vals[..], None, None)?;
205 } else {
206 panic!("Schema and struct disagree on type for {}", stringify!{#ident})
207 }
208 }
209 };
210
211 quote! {
212 {
213 #definition_levels
214
215 #vals_builder
216
217 #write_batch_expr
218 }
219 }
220 }
221
222 pub fn reader_snippet(&self) -> proc_macro2::TokenStream {
245 let ident = &self.ident;
246 let column_reader = self.ty.column_reader();
247
248 let write_batch_expr = quote! {
250 let mut vals = Vec::new();
251 if let #column_reader(mut typed) = column_reader {
252 let mut definition_levels = Vec::new();
253 let (total_num, valid_num, decoded_num) = typed.read_records(
254 num_records, Some(&mut definition_levels), None, &mut vals)?;
255 if valid_num != decoded_num {
256 panic!("Support only valid records, found {} null records in column type {}",
257 decoded_num - valid_num, stringify!{#ident});
258 }
259 } else {
260 panic!("Schema and struct disagree on type for {}", stringify!{#ident});
261 }
262 };
263
264 let vals_writer = match &self.ty {
267 Type::TypePath(_) => self.copied_direct_fields(),
268 Type::Reference(_, first_type) => match **first_type {
269 Type::TypePath(_) => self.copied_direct_fields(),
270 Type::Slice(ref second_type) => match **second_type {
271 Type::TypePath(_) => self.copied_direct_fields(),
272 ref f => unimplemented!("Unsupported: {:#?}", f),
273 },
274 ref f => unimplemented!("Unsupported: {:#?}", f),
275 },
276 Type::Vec(first_type) => match **first_type {
277 Type::TypePath(_) => self.copied_direct_fields(),
278 ref f => unimplemented!("Unsupported: {:#?}", f),
279 },
280 f => unimplemented!("Unsupported: {:#?}", f),
281 };
282
283 quote! {
284 {
285 #write_batch_expr
286
287 #vals_writer
288 }
289 }
290 }
291
292 pub fn parquet_type(&self) -> proc_macro2::TokenStream {
293 let field_name = &self.ident.to_string();
297 let physical_type = match self.ty.physical_type() {
298 parquet::basic::Type::BOOLEAN => quote! {
299 ::parquet::basic::Type::BOOLEAN
300 },
301 parquet::basic::Type::INT32 => quote! {
302 ::parquet::basic::Type::INT32
303 },
304 parquet::basic::Type::INT64 => quote! {
305 ::parquet::basic::Type::INT64
306 },
307 parquet::basic::Type::INT96 => quote! {
308 ::parquet::basic::Type::INT96
309 },
310 parquet::basic::Type::FLOAT => quote! {
311 ::parquet::basic::Type::FLOAT
312 },
313 parquet::basic::Type::DOUBLE => quote! {
314 ::parquet::basic::Type::DOUBLE
315 },
316 parquet::basic::Type::BYTE_ARRAY => quote! {
317 ::parquet::basic::Type::BYTE_ARRAY
318 },
319 parquet::basic::Type::FIXED_LEN_BYTE_ARRAY => quote! {
320 ::parquet::basic::Type::FIXED_LEN_BYTE_ARRAY
321 },
322 };
323 let logical_type = self.ty.logical_type();
324 let repetition = self.ty.repetition();
325 let converted_type = self.ty.converted_type();
326 let length = self.ty.length();
327
328 let mut builder = quote! {
329 ParquetType::primitive_type_builder(#field_name, #physical_type)
330 .with_logical_type(#logical_type)
331 .with_repetition(#repetition)
332 };
333
334 if let Some(converted_type) = converted_type {
335 builder = quote! { #builder.with_converted_type(#converted_type) };
336 }
337
338 if let Some(length) = length {
339 builder = quote! { #builder.with_length(#length) };
340 }
341
342 quote! { fields.push(#builder.build().unwrap().into()) }
343 }
344
345 fn option_into_vals(&self) -> proc_macro2::TokenStream {
346 let field_name = &self.ident;
347 let is_a_byte_buf = self.is_a_byte_buf;
348 let is_a_timestamp = self.third_party_type == Some(ThirdPartyType::ChronoNaiveDateTime);
349 let is_a_date = self.third_party_type == Some(ThirdPartyType::ChronoNaiveDate);
350 let is_a_uuid = self.third_party_type == Some(ThirdPartyType::Uuid);
351 let copy_to_vec = !matches!(
352 self.ty.physical_type(),
353 parquet::basic::Type::BYTE_ARRAY | parquet::basic::Type::FIXED_LEN_BYTE_ARRAY
354 );
355
356 let binding = if copy_to_vec {
357 quote! { let Some(inner) = rec.#field_name }
358 } else {
359 quote! { let Some(inner) = &rec.#field_name }
360 };
361
362 let some = if is_a_timestamp {
363 quote! { Some(inner.timestamp_millis()) }
364 } else if is_a_date {
365 quote! { Some(inner.signed_duration_since(::chrono::NaiveDate::from_ymd(1970, 1, 1)).num_days() as i32) }
366 } else if is_a_uuid {
367 quote! { Some((&inner.to_string()[..]).into()) }
368 } else if is_a_byte_buf {
369 quote! { Some((&inner[..]).into())}
370 } else {
371 match self.ty.physical_type() {
373 parquet::basic::Type::INT32 => quote! { Some(inner as i32) },
374 parquet::basic::Type::INT64 => quote! { Some(inner as i64) },
375 _ => quote! { Some(inner) },
376 }
377 };
378
379 quote! {
380 let vals: Vec<_> = records.iter().filter_map(|rec| {
381 if #binding {
382 #some
383 } else {
384 None
385 }
386 }).collect();
387 }
388 }
389
390 fn copied_direct_vals(&self) -> proc_macro2::TokenStream {
392 let field_name = &self.ident;
393
394 let access = match self.third_party_type {
395 Some(ThirdPartyType::ChronoNaiveDateTime) => {
396 quote! { rec.#field_name.timestamp_millis() }
397 }
398 Some(ThirdPartyType::ChronoNaiveDate) => {
399 quote! { rec.#field_name.signed_duration_since(::chrono::NaiveDate::from_ymd(1970, 1, 1)).num_days() as i32 }
400 }
401 Some(ThirdPartyType::Uuid) => {
402 quote! { rec.#field_name.as_bytes().to_vec().into() }
403 }
404 _ => {
405 if self.is_a_byte_buf {
406 quote! { (&rec.#field_name[..]).into() }
407 } else {
408 match self.ty.physical_type() {
410 parquet::basic::Type::INT32 => quote! { rec.#field_name as i32 },
411 parquet::basic::Type::INT64 => quote! { rec.#field_name as i64 },
412 _ => quote! { rec.#field_name },
413 }
414 }
415 }
416 };
417
418 quote! {
419 let vals: Vec<_> = records.iter().map(|rec| #access).collect();
420 }
421 }
422
423 fn copied_direct_fields(&self) -> proc_macro2::TokenStream {
425 let field_name = &self.ident;
426
427 let value = match self.third_party_type {
428 Some(ThirdPartyType::ChronoNaiveDateTime) => {
429 quote! { ::chrono::naive::NaiveDateTime::from_timestamp_millis(vals[i]).unwrap() }
430 }
431 Some(ThirdPartyType::ChronoNaiveDate) => {
432 quote! {
434 ::chrono::naive::NaiveDate::from_num_days_from_ce_opt(vals[i].saturating_add(719163)).unwrap()
435 }
436 }
437 Some(ThirdPartyType::Uuid) => {
438 quote! { ::uuid::Uuid::from_bytes(vals[i].data().try_into().unwrap()) }
439 }
440 _ => match &self.ty {
441 Type::TypePath(_) => match self.ty.last_part().as_str() {
442 "String" => quote! { String::from(std::str::from_utf8(vals[i].data())
443 .expect("invalid UTF-8 sequence")) },
444 t => {
445 let s: proc_macro2::TokenStream = t.parse().unwrap();
446 quote! { vals[i] as #s }
447 }
448 },
449 Type::Vec(_) => quote! { vals[i].data().to_vec() },
450 f => unimplemented!("Unsupported: {:#?}", f),
451 },
452 };
453
454 quote! {
455 for (i, r) in &mut records[..num_records].iter_mut().enumerate() {
456 r.#field_name = #value;
457 }
458 }
459 }
460
461 fn optional_definition_levels(&self) -> proc_macro2::TokenStream {
462 let field_name = &self.ident;
463
464 quote! {
465 let definition_levels: Vec<i16> = self
466 .iter()
467 .map(|rec| if rec.#field_name.is_some() { 1 } else { 0 })
468 .collect();
469 }
470 }
471}
472
473#[allow(clippy::enum_variant_names)]
474#[allow(clippy::large_enum_variant)]
475#[derive(Debug, PartialEq)]
476enum Type {
477 Array(Box<Type>, syn::Expr),
478 Option(Box<Type>),
479 Slice(Box<Type>),
480 Vec(Box<Type>),
481 TypePath(syn::Type),
482 Reference(Option<syn::Lifetime>, Box<Type>),
483}
484
485impl Type {
486 fn column_writer(&self) -> syn::TypePath {
489 use parquet::basic::Type as BasicType;
490
491 match self.physical_type() {
492 BasicType::BOOLEAN => {
493 syn::parse_quote!(ColumnWriter::BoolColumnWriter)
494 }
495 BasicType::INT32 => syn::parse_quote!(ColumnWriter::Int32ColumnWriter),
496 BasicType::INT64 => syn::parse_quote!(ColumnWriter::Int64ColumnWriter),
497 BasicType::INT96 => syn::parse_quote!(ColumnWriter::Int96ColumnWriter),
498 BasicType::FLOAT => syn::parse_quote!(ColumnWriter::FloatColumnWriter),
499 BasicType::DOUBLE => syn::parse_quote!(ColumnWriter::DoubleColumnWriter),
500 BasicType::BYTE_ARRAY => {
501 syn::parse_quote!(ColumnWriter::ByteArrayColumnWriter)
502 }
503 BasicType::FIXED_LEN_BYTE_ARRAY => {
504 syn::parse_quote!(ColumnWriter::FixedLenByteArrayColumnWriter)
505 }
506 }
507 }
508
509 fn column_reader(&self) -> syn::TypePath {
512 use parquet::basic::Type as BasicType;
513
514 match self.physical_type() {
515 BasicType::BOOLEAN => {
516 syn::parse_quote!(ColumnReader::BoolColumnReader)
517 }
518 BasicType::INT32 => syn::parse_quote!(ColumnReader::Int32ColumnReader),
519 BasicType::INT64 => syn::parse_quote!(ColumnReader::Int64ColumnReader),
520 BasicType::INT96 => syn::parse_quote!(ColumnReader::Int96ColumnReader),
521 BasicType::FLOAT => syn::parse_quote!(ColumnReader::FloatColumnReader),
522 BasicType::DOUBLE => syn::parse_quote!(ColumnReader::DoubleColumnReader),
523 BasicType::BYTE_ARRAY => {
524 syn::parse_quote!(ColumnReader::ByteArrayColumnReader)
525 }
526 BasicType::FIXED_LEN_BYTE_ARRAY => {
527 syn::parse_quote!(ColumnReader::FixedLenByteArrayColumnReader)
528 }
529 }
530 }
531
532 fn leaf_type_recursive(&self) -> &Type {
542 Type::leaf_type_recursive_helper(self, None)
543 }
544
545 fn leaf_type_recursive_helper<'a>(ty: &'a Type, parent_ty: Option<&'a Type>) -> &'a Type {
546 match ty {
547 Type::TypePath(_) => parent_ty.unwrap_or(ty),
548 Type::Option(first_type)
549 | Type::Vec(first_type)
550 | Type::Array(first_type, _)
551 | Type::Slice(first_type)
552 | Type::Reference(_, first_type) => {
553 Type::leaf_type_recursive_helper(first_type, Some(ty))
554 }
555 }
556 }
557
558 fn inner_type(&self) -> &syn::Type {
562 let leaf_type = self.leaf_type_recursive();
563
564 match leaf_type {
565 Type::TypePath(type_) => type_,
566 Type::Option(first_type)
567 | Type::Vec(first_type)
568 | Type::Array(first_type, _)
569 | Type::Slice(first_type)
570 | Type::Reference(_, first_type) => match **first_type {
571 Type::TypePath(ref type_) => type_,
572 _ => unimplemented!("leaf_type() should only return shallow types"),
573 },
574 }
575 }
576
577 fn last_part(&self) -> String {
590 let inner_type = self.inner_type();
591 let inner_type_str = (quote! { #inner_type }).to_string();
592
593 inner_type_str
594 .split("::")
595 .last()
596 .unwrap()
597 .trim()
598 .to_string()
599 }
600
601 fn physical_type(&self) -> parquet::basic::Type {
609 use parquet::basic::Type as BasicType;
610
611 let last_part = self.last_part();
612 let leaf_type = self.leaf_type_recursive();
613
614 match leaf_type {
615 Type::Array(first_type, _length) => {
616 if let Type::TypePath(_) = **first_type {
617 if last_part == "u8" {
618 return BasicType::FIXED_LEN_BYTE_ARRAY;
619 }
620 }
621 }
622 Type::Vec(first_type) | Type::Slice(first_type) => {
623 if let Type::TypePath(_) = **first_type {
624 if last_part == "u8" {
625 return BasicType::BYTE_ARRAY;
626 }
627 }
628 }
629 _ => (),
630 }
631
632 match last_part.trim() {
633 "bool" => BasicType::BOOLEAN,
634 "u8" | "u16" | "u32" => BasicType::INT32,
635 "i8" | "i16" | "i32" | "NaiveDate" => BasicType::INT32,
636 "u64" | "i64" | "NaiveDateTime" => BasicType::INT64,
637 "usize" | "isize" => {
638 if usize::BITS == 64 {
639 BasicType::INT64
640 } else {
641 BasicType::INT32
642 }
643 }
644 "f32" => BasicType::FLOAT,
645 "f64" => BasicType::DOUBLE,
646 "String" | "str" | "Arc < str >" => BasicType::BYTE_ARRAY,
647 "Uuid" => BasicType::FIXED_LEN_BYTE_ARRAY,
648 f => unimplemented!("{} currently is not supported", f),
649 }
650 }
651
652 fn length(&self) -> Option<syn::Expr> {
653 let last_part = self.last_part();
654 let leaf_type = self.leaf_type_recursive();
655
656 if let Type::Array(first_type, length) = leaf_type {
658 if let Type::TypePath(_) = **first_type {
659 if last_part == "u8" {
660 return Some(length.clone());
661 }
662 }
663 }
664
665 match last_part.trim() {
666 "Uuid" => Some(syn::parse_quote!(16)),
668 _ => None,
669 }
670 }
671
672 fn logical_type(&self) -> proc_macro2::TokenStream {
673 let last_part = self.last_part();
674 let leaf_type = self.leaf_type_recursive();
675
676 match leaf_type {
677 Type::Array(first_type, _length) => {
678 if let Type::TypePath(_) = **first_type {
679 if last_part == "u8" {
680 return quote! { None };
681 }
682 }
683 }
684 Type::Vec(first_type) | Type::Slice(first_type) => {
685 if let Type::TypePath(_) = **first_type {
686 if last_part == "u8" {
687 return quote! { None };
688 }
689 }
690 }
691 _ => (),
692 }
693
694 match last_part.trim() {
695 "bool" => quote! { None },
696 "u8" => quote! { Some(LogicalType::integer(8, false)) },
697 "u16" => quote! { Some(LogicalType::integer(16, false)) },
698 "u32" => quote! { Some(LogicalType::integer(32, false)) },
699 "u64" => quote! { Some(LogicalType::integer(64, false)) },
700 "i8" => quote! { Some(LogicalType::integer(8, true)) },
701 "i16" => quote! { Some(LogicalType::integer(16, true)) },
702 "i32" | "i64" => quote! { None },
703 "usize" => {
704 quote! { Some(LogicalType::integer(usize::BITS as i8, false)) }
705 }
706 "isize" => {
707 quote! { Some(LogicalType::integer(usize::BITS as i8, true)) }
708 }
709 "NaiveDate" => quote! { Some(LogicalType::Date) },
710 "NaiveDateTime" => quote! { None },
711 "f32" | "f64" => quote! { None },
712 "String" | "str" | "Arc < str >" => quote! { Some(LogicalType::String) },
713 "Uuid" => quote! { Some(LogicalType::Uuid) },
714 f => unimplemented!("{} currently is not supported", f),
715 }
716 }
717
718 fn converted_type(&self) -> Option<proc_macro2::TokenStream> {
719 let last_part = self.last_part();
720
721 match last_part.trim() {
722 "NaiveDateTime" => Some(quote! { ::parquet::basic::ConvertedType::TIMESTAMP_MILLIS }),
723 _ => None,
724 }
725 }
726
727 fn repetition(&self) -> proc_macro2::TokenStream {
728 match self {
729 Type::Option(_) => quote! { ::parquet::basic::Repetition::OPTIONAL },
730 Type::Reference(_, ty) => ty.repetition(),
731 _ => quote! { ::parquet::basic::Repetition::REQUIRED },
732 }
733 }
734
735 fn from(f: &syn::Field) -> Self {
738 Type::from_type(f, &f.ty)
739 }
740
741 fn from_type(f: &syn::Field, ty: &syn::Type) -> Self {
742 match ty {
743 syn::Type::Path(p) => Type::from_type_path(f, p),
744 syn::Type::Reference(tr) => Type::from_type_reference(f, tr),
745 syn::Type::Array(ta) => Type::from_type_array(f, ta),
746 syn::Type::Slice(ts) => Type::from_type_slice(f, ts),
747 other => unimplemented!(
748 "Unable to derive {:?} - it is currently an unsupported type\n{:#?}",
749 f.ident.as_ref().unwrap(),
750 other
751 ),
752 }
753 }
754
755 fn from_type_path(f: &syn::Field, p: &syn::TypePath) -> Self {
756 let last_segment = p.path.segments.last().unwrap();
757
758 let is_vec = last_segment.ident == syn::Ident::new("Vec", proc_macro2::Span::call_site());
759 let is_option =
760 last_segment.ident == syn::Ident::new("Option", proc_macro2::Span::call_site());
761
762 if is_vec || is_option {
763 let generic_type = match &last_segment.arguments {
764 syn::PathArguments::AngleBracketed(angle_args) => {
765 assert_eq!(angle_args.args.len(), 1);
766 let first_arg = &angle_args.args[0];
767
768 match first_arg {
769 syn::GenericArgument::Type(typath) => typath.clone(),
770 other => unimplemented!("Unsupported: {:#?}", other),
771 }
772 }
773 other => unimplemented!("Unsupported: {:#?}", other),
774 };
775
776 if is_vec {
777 Type::Vec(Box::new(Type::from_type(f, &generic_type)))
778 } else {
779 Type::Option(Box::new(Type::from_type(f, &generic_type)))
780 }
781 } else {
782 Type::TypePath(syn::Type::Path(p.clone()))
783 }
784 }
785
786 fn from_type_reference(f: &syn::Field, tr: &syn::TypeReference) -> Self {
787 let lifetime = tr.lifetime.clone();
788 let inner_type = Type::from_type(f, tr.elem.as_ref());
789 Type::Reference(lifetime, Box::new(inner_type))
790 }
791
792 fn from_type_array(f: &syn::Field, ta: &syn::TypeArray) -> Self {
793 let inner_type = Type::from_type(f, ta.elem.as_ref());
794 Type::Array(Box::new(inner_type), ta.len.clone())
795 }
796
797 fn from_type_slice(f: &syn::Field, ts: &syn::TypeSlice) -> Self {
798 let inner_type = Type::from_type(f, ts.elem.as_ref());
799 Type::Slice(Box::new(inner_type))
800 }
801}
802
803#[cfg(test)]
804mod test {
805 use super::*;
806 use syn::{Data, DataStruct, DeriveInput};
807
808 fn extract_fields(input: proc_macro2::TokenStream) -> Vec<syn::Field> {
809 let input: DeriveInput = syn::parse2(input).unwrap();
810
811 let fields = match input.data {
812 Data::Struct(DataStruct { fields, .. }) => fields,
813 _ => panic!("Input must be a struct"),
814 };
815
816 fields.iter().map(|field| field.to_owned()).collect()
817 }
818
819 #[test]
820 fn test_generating_a_simple_writer_snippet() {
821 let snippet: proc_macro2::TokenStream = quote! {
822 struct ABoringStruct {
823 counter: usize,
824 }
825 };
826
827 let fields = extract_fields(snippet);
828 let counter = Field::from(&fields[0]);
829
830 let snippet = counter.writer_snippet().to_string();
831 assert_eq!(snippet,
832 (quote!{
833 {
834 let vals : Vec < _ > = records . iter ( ) . map ( | rec | rec . counter as i64 ) . collect ( );
835
836 if let ColumnWriter::Int64ColumnWriter ( typed ) = column_writer.untyped() {
837 typed . write_batch ( & vals [ .. ] , None , None ) ?;
838 } else {
839 panic!("Schema and struct disagree on type for {}" , stringify!{ counter } )
840 }
841 }
842 }).to_string()
843 )
844 }
845
846 #[test]
847 fn test_generating_a_simple_reader_snippet() {
848 let snippet: proc_macro2::TokenStream = quote! {
849 struct ABoringStruct {
850 counter: usize,
851 }
852 };
853
854 let fields = extract_fields(snippet);
855 let counter = Field::from(&fields[0]);
856
857 let snippet = counter.reader_snippet().to_string();
858 assert_eq!(
859 snippet,
860 (quote! {
861 {
862 let mut vals = Vec::new();
863 if let ColumnReader::Int64ColumnReader(mut typed) = column_reader {
864 let mut definition_levels = Vec::new();
865 let (total_num, valid_num, decoded_num) = typed.read_records(
866 num_records, Some(&mut definition_levels), None, &mut vals)?;
867 if valid_num != decoded_num {
868 panic!("Support only valid records, found {} null records in column type {}",
869 decoded_num - valid_num, stringify!{counter});
870 }
871 } else {
872 panic!("Schema and struct disagree on type for {}", stringify!{counter});
873 }
874 for (i, r) in &mut records[..num_records].iter_mut().enumerate() {
875 r.counter = vals[i] as usize;
876 }
877 }
878 })
879 .to_string()
880 )
881 }
882
883 #[test]
884 fn test_optional_to_writer_snippet() {
885 let struct_def: proc_macro2::TokenStream = quote! {
886 struct StringBorrower<'a> {
887 optional_str: Option<&'a str>,
888 optional_string: Option<&String>,
889 optional_dumb_int: Option<&i32>,
890 }
891 };
892
893 let fields = extract_fields(struct_def);
894
895 let optional = Field::from(&fields[0]);
896 let snippet = optional.writer_snippet();
897 assert_eq!(snippet.to_string(),
898 (quote! {
899 {
900 let definition_levels : Vec < i16 > = self . iter ( ) . map ( | rec | if rec . optional_str . is_some ( ) { 1 } else { 0 } ) . collect ( ) ;
901
902 let vals: Vec <_> = records.iter().filter_map( |rec| {
903 if let Some ( inner ) = &rec . optional_str {
904 Some ( (&inner[..]).into() )
905 } else {
906 None
907 }
908 }).collect();
909
910 if let ColumnWriter::ByteArrayColumnWriter ( typed ) = column_writer.untyped() {
911 typed . write_batch ( & vals [ .. ] , Some(&definition_levels[..]) , None ) ? ;
912 } else {
913 panic!("Schema and struct disagree on type for {}" , stringify ! { optional_str } )
914 }
915 }
916 }
917 ).to_string());
918
919 let optional = Field::from(&fields[1]);
920 let snippet = optional.writer_snippet();
921 assert_eq!(snippet.to_string(),
922 (quote!{
923 {
924 let definition_levels : Vec < i16 > = self . iter ( ) . map ( | rec | if rec . optional_string . is_some ( ) { 1 } else { 0 } ) . collect ( ) ;
925
926 let vals: Vec <_> = records.iter().filter_map( |rec| {
927 if let Some ( inner ) = &rec . optional_string {
928 Some ( (&inner[..]).into() )
929 } else {
930 None
931 }
932 }).collect();
933
934 if let ColumnWriter::ByteArrayColumnWriter ( typed ) = column_writer.untyped() {
935 typed . write_batch ( & vals [ .. ] , Some(&definition_levels[..]) , None ) ? ;
936 } else {
937 panic!("Schema and struct disagree on type for {}" , stringify ! { optional_string } )
938 }
939 }
940 }).to_string());
941
942 let optional = Field::from(&fields[2]);
943 let snippet = optional.writer_snippet();
944 assert_eq!(snippet.to_string(),
945 (quote!{
946 {
947 let definition_levels : Vec < i16 > = self . iter ( ) . map ( | rec | if rec . optional_dumb_int . is_some ( ) { 1 } else { 0 } ) . collect ( ) ;
948
949 let vals: Vec <_> = records.iter().filter_map( |rec| {
950 if let Some ( inner ) = rec . optional_dumb_int {
951 Some ( inner as i32 )
952 } else {
953 None
954 }
955 }).collect();
956
957 if let ColumnWriter::Int32ColumnWriter ( typed ) = column_writer.untyped() {
958 typed . write_batch ( & vals [ .. ] , Some(&definition_levels[..]) , None ) ? ;
959 } else {
960 panic!("Schema and struct disagree on type for {}" , stringify ! { optional_dumb_int } )
961 }
962 }
963 }).to_string());
964 }
965
966 #[test]
967 fn test_converting_to_column_writer_type() {
968 let snippet: proc_macro2::TokenStream = quote! {
969 struct ABasicStruct {
970 yes_no: bool,
971 name: String,
972 }
973 };
974
975 let fields = extract_fields(snippet);
976 let processed: Vec<_> = fields.iter().map(Field::from).collect();
977
978 let column_writers: Vec<_> = processed
979 .iter()
980 .map(|field| field.ty.column_writer())
981 .collect();
982
983 assert_eq!(
984 column_writers,
985 vec![
986 syn::parse_quote!(ColumnWriter::BoolColumnWriter),
987 syn::parse_quote!(ColumnWriter::ByteArrayColumnWriter)
988 ]
989 );
990 }
991
992 #[test]
993 fn test_converting_to_column_reader_type() {
994 let snippet: proc_macro2::TokenStream = quote! {
995 struct ABasicStruct {
996 yes_no: bool,
997 name: String,
998 }
999 };
1000
1001 let fields = extract_fields(snippet);
1002 let processed: Vec<_> = fields.iter().map(Field::from).collect();
1003
1004 let column_readers: Vec<_> = processed
1005 .iter()
1006 .map(|field| field.ty.column_reader())
1007 .collect();
1008
1009 assert_eq!(
1010 column_readers,
1011 vec![
1012 syn::parse_quote!(ColumnReader::BoolColumnReader),
1013 syn::parse_quote!(ColumnReader::ByteArrayColumnReader)
1014 ]
1015 );
1016 }
1017
1018 #[test]
1019 fn convert_basic_struct() {
1020 let snippet: proc_macro2::TokenStream = quote! {
1021 struct ABasicStruct {
1022 yes_no: bool,
1023 name: String,
1024 length: usize
1025 }
1026 };
1027
1028 let fields = extract_fields(snippet);
1029 let processed: Vec<_> = fields.iter().map(Field::from).collect();
1030 assert_eq!(processed.len(), 3);
1031
1032 assert_eq!(
1033 processed,
1034 vec![
1035 Field {
1036 ident: syn::Ident::new("yes_no", proc_macro2::Span::call_site()),
1037 ty: Type::TypePath(syn::parse_quote!(bool)),
1038 is_a_byte_buf: false,
1039 third_party_type: None,
1040 },
1041 Field {
1042 ident: syn::Ident::new("name", proc_macro2::Span::call_site()),
1043 ty: Type::TypePath(syn::parse_quote!(String)),
1044 is_a_byte_buf: true,
1045 third_party_type: None,
1046 },
1047 Field {
1048 ident: syn::Ident::new("length", proc_macro2::Span::call_site()),
1049 ty: Type::TypePath(syn::parse_quote!(usize)),
1050 is_a_byte_buf: false,
1051 third_party_type: None,
1052 }
1053 ]
1054 )
1055 }
1056
1057 #[test]
1058 fn test_get_inner_type() {
1059 let snippet: proc_macro2::TokenStream = quote! {
1060 struct LotsOfInnerTypes {
1061 a_vec: Vec<u8>,
1062 a_option: ::std::option::Option<bool>,
1063 a_silly_string: ::std::string::String,
1064 a_complicated_thing: ::std::option::Option<::std::result::Result<(),()>>,
1065 }
1066 };
1067
1068 let fields = extract_fields(snippet);
1069 let converted_fields: Vec<_> = fields.iter().map(Type::from).collect();
1070 let inner_types: Vec<_> = converted_fields
1071 .iter()
1072 .map(|field| field.inner_type())
1073 .collect();
1074 let inner_types_strs: Vec<_> = inner_types
1075 .iter()
1076 .map(|ty| (quote! { #ty }).to_string())
1077 .collect();
1078
1079 assert_eq!(
1080 inner_types_strs,
1081 vec![
1082 "u8",
1083 "bool",
1084 ":: std :: string :: String",
1085 ":: std :: result :: Result < () , () >"
1086 ]
1087 )
1088 }
1089
1090 #[test]
1091 fn test_physical_type() {
1092 use parquet::basic::Type as BasicType;
1093 let snippet: proc_macro2::TokenStream = quote! {
1094 struct LotsOfInnerTypes {
1095 a_buf: ::std::vec::Vec<u8>,
1096 a_number: i32,
1097 a_verbose_option: ::std::option::Option<bool>,
1098 a_silly_string: String,
1099 a_fix_byte_buf: [u8; 10],
1100 a_complex_option: ::std::option::Option<&Vec<u8>>,
1101 a_complex_vec: &::std::vec::Vec<&Option<u8>>,
1102 a_uuid: ::uuid::Uuid,
1103 }
1104 };
1105
1106 let fields = extract_fields(snippet);
1107 let converted_fields: Vec<_> = fields.iter().map(Type::from).collect();
1108 let physical_types: Vec<_> = converted_fields
1109 .iter()
1110 .map(|ty| ty.physical_type())
1111 .collect();
1112
1113 assert_eq!(
1114 physical_types,
1115 vec![
1116 BasicType::BYTE_ARRAY,
1117 BasicType::INT32,
1118 BasicType::BOOLEAN,
1119 BasicType::BYTE_ARRAY,
1120 BasicType::FIXED_LEN_BYTE_ARRAY,
1121 BasicType::BYTE_ARRAY,
1122 BasicType::INT32,
1123 BasicType::FIXED_LEN_BYTE_ARRAY,
1124 ]
1125 )
1126 }
1127
1128 #[test]
1129 fn test_type_length() {
1130 let snippet: proc_macro2::TokenStream = quote! {
1131 struct LotsOfInnerTypes {
1132 a_buf: ::std::vec::Vec<u8>,
1133 a_number: i32,
1134 a_verbose_option: ::std::option::Option<bool>,
1135 a_silly_string: String,
1136 a_fix_byte_buf: [u8; 10],
1137 a_complex_option: ::std::option::Option<&Vec<u8>>,
1138 a_complex_vec: &::std::vec::Vec<&Option<u8>>,
1139 a_uuid: ::uuid::Uuid,
1140 }
1141 };
1142
1143 let fields = extract_fields(snippet);
1144 let converted_fields: Vec<_> = fields.iter().map(Type::from).collect();
1145 let lengths: Vec<_> = converted_fields.iter().map(|ty| ty.length()).collect();
1146
1147 assert_eq!(
1148 lengths,
1149 vec![
1150 None,
1151 None,
1152 None,
1153 None,
1154 Some(syn::parse_quote!(10)),
1155 None,
1156 None,
1157 Some(syn::parse_quote!(16)),
1158 ]
1159 )
1160 }
1161
1162 #[test]
1163 fn test_convert_comprehensive_owned_struct() {
1164 let snippet: proc_macro2::TokenStream = quote! {
1165 struct VecHolder {
1166 a_vec: ::std::vec::Vec<u8>,
1167 a_option: ::std::option::Option<bool>,
1168 a_silly_string: ::std::string::String,
1169 a_complicated_thing: ::std::option::Option<::std::result::Result<(),()>>,
1170 }
1171 };
1172
1173 let fields = extract_fields(snippet);
1174 let converted_fields: Vec<_> = fields.iter().map(Type::from).collect();
1175
1176 assert_eq!(
1177 converted_fields,
1178 vec![
1179 Type::Vec(Box::new(Type::TypePath(syn::parse_quote!(u8)))),
1180 Type::Option(Box::new(Type::TypePath(syn::parse_quote!(bool)))),
1181 Type::TypePath(syn::parse_quote!(::std::string::String)),
1182 Type::Option(Box::new(Type::TypePath(
1183 syn::parse_quote!(::std::result::Result<(),()>)
1184 ))),
1185 ]
1186 );
1187 }
1188
1189 #[test]
1190 fn test_convert_borrowed_struct() {
1191 let snippet: proc_macro2::TokenStream = quote! {
1192 struct Borrower<'a> {
1193 a_str: &'a str,
1194 a_borrowed_option: &'a Option<bool>,
1195 so_many_borrows: &'a Option<&'a str>,
1196 }
1197 };
1198
1199 let fields = extract_fields(snippet);
1200 let types: Vec<_> = fields.iter().map(Type::from).collect();
1201
1202 assert_eq!(
1203 types,
1204 vec![
1205 Type::Reference(
1206 Some(syn::Lifetime::new("'a", proc_macro2::Span::call_site())),
1207 Box::new(Type::TypePath(syn::parse_quote!(str)))
1208 ),
1209 Type::Reference(
1210 Some(syn::Lifetime::new("'a", proc_macro2::Span::call_site())),
1211 Box::new(Type::Option(Box::new(Type::TypePath(syn::parse_quote!(
1212 bool
1213 )))))
1214 ),
1215 Type::Reference(
1216 Some(syn::Lifetime::new("'a", proc_macro2::Span::call_site())),
1217 Box::new(Type::Option(Box::new(Type::Reference(
1218 Some(syn::Lifetime::new("'a", proc_macro2::Span::call_site())),
1219 Box::new(Type::TypePath(syn::parse_quote!(str)))
1220 ))))
1221 ),
1222 ]
1223 );
1224 }
1225
1226 #[test]
1227 fn test_chrono_timestamp_millis_write() {
1228 let snippet: proc_macro2::TokenStream = quote! {
1229 struct ATimestampStruct {
1230 henceforth: chrono::NaiveDateTime,
1231 maybe_happened: Option<&chrono::NaiveDateTime>,
1232 }
1233 };
1234
1235 let fields = extract_fields(snippet);
1236 let when = Field::from(&fields[0]);
1237 assert_eq!(when.writer_snippet().to_string(),(quote!{
1238 {
1239 let vals : Vec<_> = records.iter().map(|rec| rec.henceforth.timestamp_millis() ).collect();
1240 if let ColumnWriter::Int64ColumnWriter(typed) = column_writer.untyped() {
1241 typed.write_batch(&vals[..], None, None) ?;
1242 } else {
1243 panic!("Schema and struct disagree on type for {}" , stringify!{ henceforth })
1244 }
1245 }
1246 }).to_string());
1247
1248 let maybe_happened = Field::from(&fields[1]);
1249 assert_eq!(maybe_happened.writer_snippet().to_string(),(quote!{
1250 {
1251 let definition_levels : Vec<i16> = self.iter().map(|rec| if rec.maybe_happened.is_some() { 1 } else { 0 }).collect();
1252 let vals : Vec<_> = records.iter().filter_map(|rec| {
1253 if let Some(inner) = rec.maybe_happened {
1254 Some(inner.timestamp_millis())
1255 } else {
1256 None
1257 }
1258 }).collect();
1259
1260 if let ColumnWriter::Int64ColumnWriter(typed) = column_writer.untyped() {
1261 typed.write_batch(&vals[..], Some(&definition_levels[..]), None) ?;
1262 } else {
1263 panic!("Schema and struct disagree on type for {}" , stringify!{ maybe_happened })
1264 }
1265 }
1266 }).to_string());
1267 }
1268
1269 #[test]
1270 fn test_chrono_timestamp_millis_read() {
1271 let snippet: proc_macro2::TokenStream = quote! {
1272 struct ATimestampStruct {
1273 henceforth: chrono::NaiveDateTime,
1274 }
1275 };
1276
1277 let fields = extract_fields(snippet);
1278 let when = Field::from(&fields[0]);
1279 assert_eq!(when.reader_snippet().to_string(),(quote!{
1280 {
1281 let mut vals = Vec::new();
1282 if let ColumnReader::Int64ColumnReader(mut typed) = column_reader {
1283 let mut definition_levels = Vec::new();
1284 let (total_num, valid_num, decoded_num) = typed.read_records(
1285 num_records, Some(&mut definition_levels), None, &mut vals)?;
1286 if valid_num != decoded_num {
1287 panic!("Support only valid records, found {} null records in column type {}",
1288 decoded_num - valid_num, stringify!{henceforth});
1289 }
1290 } else {
1291 panic!("Schema and struct disagree on type for {}", stringify!{ henceforth });
1292 }
1293 for (i, r) in &mut records[..num_records].iter_mut().enumerate() {
1294 r.henceforth = ::chrono::naive::NaiveDateTime::from_timestamp_millis(vals[i]).unwrap();
1295 }
1296 }
1297 }).to_string());
1298 }
1299
1300 #[test]
1301 fn test_chrono_date_write() {
1302 let snippet: proc_macro2::TokenStream = quote! {
1303 struct ATimestampStruct {
1304 henceforth: chrono::NaiveDate,
1305 maybe_happened: Option<&chrono::NaiveDate>,
1306 }
1307 };
1308
1309 let fields = extract_fields(snippet);
1310 let when = Field::from(&fields[0]);
1311 assert_eq!(when.writer_snippet().to_string(),(quote!{
1312 {
1313 let vals : Vec<_> = records.iter().map(|rec| rec.henceforth.signed_duration_since(::chrono::NaiveDate::from_ymd(1970, 1, 1)).num_days() as i32).collect();
1314 if let ColumnWriter::Int32ColumnWriter(typed) = column_writer.untyped() {
1315 typed.write_batch(&vals[..], None, None) ?;
1316 } else {
1317 panic!("Schema and struct disagree on type for {}" , stringify!{ henceforth })
1318 }
1319 }
1320 }).to_string());
1321
1322 let maybe_happened = Field::from(&fields[1]);
1323 assert_eq!(maybe_happened.writer_snippet().to_string(),(quote!{
1324 {
1325 let definition_levels : Vec<i16> = self.iter().map(|rec| if rec.maybe_happened.is_some() { 1 } else { 0 }).collect();
1326 let vals : Vec<_> = records.iter().filter_map(|rec| {
1327 if let Some(inner) = rec.maybe_happened {
1328 Some(inner.signed_duration_since(::chrono::NaiveDate::from_ymd(1970, 1, 1)).num_days() as i32)
1329 } else {
1330 None
1331 }
1332 }).collect();
1333
1334 if let ColumnWriter::Int32ColumnWriter(typed) = column_writer.untyped() {
1335 typed.write_batch(&vals[..], Some(&definition_levels[..]), None) ?;
1336 } else {
1337 panic!("Schema and struct disagree on type for {}" , stringify!{ maybe_happened })
1338 }
1339 }
1340 }).to_string());
1341 }
1342
1343 #[test]
1344 fn test_chrono_date_read() {
1345 let snippet: proc_macro2::TokenStream = quote! {
1346 struct ATimestampStruct {
1347 henceforth: chrono::NaiveDate,
1348 }
1349 };
1350
1351 let fields = extract_fields(snippet);
1352 let when = Field::from(&fields[0]);
1353 assert_eq!(when.reader_snippet().to_string(),(quote!{
1354 {
1355 let mut vals = Vec::new();
1356 if let ColumnReader::Int32ColumnReader(mut typed) = column_reader {
1357 let mut definition_levels = Vec::new();
1358 let (total_num, valid_num, decoded_num) = typed.read_records(
1359 num_records, Some(&mut definition_levels), None, &mut vals)?;
1360 if valid_num != decoded_num {
1361 panic!("Support only valid records, found {} null records in column type {}",
1362 decoded_num - valid_num, stringify!{henceforth});
1363 }
1364 } else {
1365 panic!("Schema and struct disagree on type for {}", stringify!{ henceforth });
1366 }
1367 for (i, r) in &mut records[..num_records].iter_mut().enumerate() {
1368 r.henceforth = ::chrono::naive::NaiveDate::from_num_days_from_ce_opt(vals[i].saturating_add(719163)).unwrap();
1369 }
1370 }
1371 }).to_string());
1372 }
1373
1374 #[test]
1375 fn test_uuid_write() {
1376 let snippet: proc_macro2::TokenStream = quote! {
1377 struct AUuidStruct {
1378 unique_id: uuid::Uuid,
1379 maybe_unique_id: Option<&uuid::Uuid>,
1380 }
1381 };
1382
1383 let fields = extract_fields(snippet);
1384 let when = Field::from(&fields[0]);
1385 assert_eq!(when.writer_snippet().to_string(),(quote!{
1386 {
1387 let vals : Vec<_> = records.iter().map(|rec| rec.unique_id.as_bytes().to_vec().into() ).collect();
1388 if let ColumnWriter::FixedLenByteArrayColumnWriter(typed) = column_writer.untyped() {
1389 typed.write_batch(&vals[..], None, None) ?;
1390 } else {
1391 panic!("Schema and struct disagree on type for {}" , stringify!{ unique_id })
1392 }
1393 }
1394 }).to_string());
1395
1396 let maybe_happened = Field::from(&fields[1]);
1397 assert_eq!(maybe_happened.writer_snippet().to_string(),(quote!{
1398 {
1399 let definition_levels : Vec<i16> = self.iter().map(|rec| if rec.maybe_unique_id.is_some() { 1 } else { 0 }).collect();
1400 let vals : Vec<_> = records.iter().filter_map(|rec| {
1401 if let Some(inner) = &rec.maybe_unique_id {
1402 Some((&inner.to_string()[..]).into())
1403 } else {
1404 None
1405 }
1406 }).collect();
1407
1408 if let ColumnWriter::FixedLenByteArrayColumnWriter(typed) = column_writer.untyped() {
1409 typed.write_batch(&vals[..], Some(&definition_levels[..]), None) ?;
1410 } else {
1411 panic!("Schema and struct disagree on type for {}" , stringify!{ maybe_unique_id })
1412 }
1413 }
1414 }).to_string());
1415 }
1416
1417 #[test]
1418 fn test_uuid_read() {
1419 let snippet: proc_macro2::TokenStream = quote! {
1420 struct AUuidStruct {
1421 unique_id: uuid::Uuid,
1422 }
1423 };
1424
1425 let fields = extract_fields(snippet);
1426 let when = Field::from(&fields[0]);
1427 assert_eq!(when.reader_snippet().to_string(),(quote!{
1428 {
1429 let mut vals = Vec::new();
1430 if let ColumnReader::FixedLenByteArrayColumnReader(mut typed) = column_reader {
1431 let mut definition_levels = Vec::new();
1432 let (total_num, valid_num, decoded_num) = typed.read_records(
1433 num_records, Some(&mut definition_levels), None, &mut vals)?;
1434 if valid_num != decoded_num {
1435 panic!("Support only valid records, found {} null records in column type {}",
1436 decoded_num - valid_num, stringify!{unique_id});
1437 }
1438 } else {
1439 panic!("Schema and struct disagree on type for {}", stringify!{ unique_id });
1440 }
1441 for (i, r) in &mut records[..num_records].iter_mut().enumerate() {
1442 r.unique_id = ::uuid::Uuid::from_bytes(vals[i].data().try_into().unwrap());
1443 }
1444 }
1445 }).to_string());
1446 }
1447
1448 #[test]
1449 fn test_converted_type() {
1450 let snippet: proc_macro2::TokenStream = quote! {
1451 struct ATimeStruct {
1452 time: chrono::NaiveDateTime,
1453 }
1454 };
1455
1456 let fields = extract_fields(snippet);
1457
1458 let time = Field::from(&fields[0]);
1459
1460 let converted_type = time.ty.converted_type();
1461 assert_eq!(
1462 converted_type.unwrap().to_string(),
1463 quote! { ::parquet::basic::ConvertedType::TIMESTAMP_MILLIS }.to_string()
1464 );
1465 }
1466}