1use syn::ext::IdentExt;
19
20#[derive(Debug, PartialEq)]
21pub struct Field {
22 ident: syn::Ident,
23 ty: Type,
24 is_a_byte_buf: bool,
25 third_party_type: Option<ThirdPartyType>,
26}
27
28#[derive(Debug, PartialEq)]
36enum ThirdPartyType {
37 ChronoNaiveDateTime,
38 ChronoNaiveDate,
39 Uuid,
40}
41
42impl Field {
43 pub fn from(f: &syn::Field) -> Self {
44 let ty = Type::from(f);
45 let is_a_byte_buf = ty.physical_type() == parquet::basic::Type::BYTE_ARRAY;
46
47 let third_party_type = match &ty.last_part()[..] {
48 "NaiveDateTime" => Some(ThirdPartyType::ChronoNaiveDateTime),
49 "NaiveDate" => Some(ThirdPartyType::ChronoNaiveDate),
50 "Uuid" => Some(ThirdPartyType::Uuid),
51 _ => None,
52 };
53
54 Field {
55 ident: f
56 .ident
57 .clone()
58 .expect("Only structs with named fields are currently supported"),
59 ty,
60 is_a_byte_buf,
61 third_party_type,
62 }
63 }
64
65 pub fn writer_snippet(&self) -> proc_macro2::TokenStream {
86 let ident = &self.ident;
87 let column_writer = self.ty.column_writer();
88
89 let vals_builder = match &self.ty {
90 Type::TypePath(_) => self.copied_direct_vals(),
91 Type::Option(first_type) => match **first_type {
92 Type::TypePath(_) => self.option_into_vals(),
93 Type::Reference(_, ref second_type) => match **second_type {
94 Type::TypePath(_) => self.option_into_vals(),
95 _ => unimplemented!("Unsupported type encountered"),
96 },
97 Type::Vec(ref first_type) => match **first_type {
98 Type::TypePath(_) => self.option_into_vals(),
99 _ => unimplemented!("Unsupported type encountered"),
100 },
101 ref f => unimplemented!("Unsupported: {:#?}", f),
102 },
103 Type::Reference(_, first_type) => match **first_type {
104 Type::TypePath(_) => self.copied_direct_vals(),
105 Type::Option(ref second_type) => match **second_type {
106 Type::TypePath(_) => self.option_into_vals(),
107 Type::Reference(_, ref second_type) => match **second_type {
108 Type::TypePath(_) => self.option_into_vals(),
109 Type::Slice(ref second_type) => match **second_type {
110 Type::TypePath(_) => self.option_into_vals(),
111 ref f => unimplemented!("Unsupported: {:#?}", f),
112 },
113 _ => unimplemented!("Unsupported type encountered"),
114 },
115 Type::Vec(ref first_type) => match **first_type {
116 Type::TypePath(_) => self.option_into_vals(),
117 _ => unimplemented!("Unsupported type encountered"),
118 },
119 ref f => unimplemented!("Unsupported: {:#?}", f),
120 },
121 Type::Slice(ref second_type) => match **second_type {
122 Type::TypePath(_) => self.copied_direct_vals(),
123 ref f => unimplemented!("Unsupported: {:#?}", f),
124 },
125 ref f => unimplemented!("Unsupported: {:#?}", f),
126 },
127 Type::Vec(first_type) => match **first_type {
128 Type::TypePath(_) => self.copied_direct_vals(),
129 ref f => unimplemented!("Unsupported: {:#?}", f),
130 },
131 f => unimplemented!("Unsupported: {:#?}", f),
132 };
133
134 let definition_levels = match &self.ty {
135 Type::TypePath(_) => None,
136 Type::Option(first_type) => match **first_type {
137 Type::TypePath(_) => Some(self.optional_definition_levels()),
138 Type::Option(_) => unimplemented!("Unsupported nesting encountered"),
139 Type::Reference(_, ref second_type)
140 | Type::Vec(ref second_type)
141 | Type::Array(ref second_type, _)
142 | Type::Slice(ref second_type) => match **second_type {
143 Type::TypePath(_) => Some(self.optional_definition_levels()),
144 _ => unimplemented!("Unsupported nesting encountered"),
145 },
146 },
147 Type::Reference(_, first_type)
148 | Type::Vec(first_type)
149 | Type::Array(first_type, _)
150 | Type::Slice(first_type) => match **first_type {
151 Type::TypePath(_) => None,
152 Type::Vec(ref second_type)
153 | Type::Array(ref second_type, _)
154 | Type::Slice(ref second_type) => match **second_type {
155 Type::TypePath(_) => None,
156 Type::Reference(_, ref third_type) => match **third_type {
157 Type::TypePath(_) => None,
158 _ => unimplemented!("Unsupported definition encountered"),
159 },
160 _ => unimplemented!("Unsupported definition encountered"),
161 },
162 Type::Reference(_, ref second_type) | Type::Option(ref second_type) => {
163 match **second_type {
164 Type::TypePath(_) => Some(self.optional_definition_levels()),
165 Type::Vec(ref third_type)
166 | Type::Array(ref third_type, _)
167 | Type::Slice(ref third_type) => match **third_type {
168 Type::TypePath(_) => Some(self.optional_definition_levels()),
169 Type::Reference(_, ref fourth_type) => match **fourth_type {
170 Type::TypePath(_) => Some(self.optional_definition_levels()),
171 _ => unimplemented!("Unsupported definition encountered"),
172 },
173 _ => unimplemented!("Unsupported definition encountered"),
174 },
175 Type::Reference(_, ref third_type) => match **third_type {
176 Type::TypePath(_) => Some(self.optional_definition_levels()),
177 Type::Slice(ref fourth_type) => match **fourth_type {
178 Type::TypePath(_) => Some(self.optional_definition_levels()),
179 _ => unimplemented!("Unsupported definition encountered"),
180 },
181 _ => unimplemented!("Unsupported definition encountered"),
182 },
183 _ => unimplemented!("Unsupported definition encountered"),
184 }
185 }
186 },
187 };
188
189 let write_batch_expr = if definition_levels.is_some() {
196 quote! {
197 if let #column_writer(typed) = column_writer.untyped() {
198 typed.write_batch(&vals[..], Some(&definition_levels[..]), None)?;
199 } else {
200 panic!("Schema and struct disagree on type for {}", stringify!{#ident})
201 }
202 }
203 } else {
204 quote! {
205 if let #column_writer(typed) = column_writer.untyped() {
206 typed.write_batch(&vals[..], None, None)?;
207 } else {
208 panic!("Schema and struct disagree on type for {}", stringify!{#ident})
209 }
210 }
211 };
212
213 quote! {
214 {
215 #definition_levels
216
217 #vals_builder
218
219 #write_batch_expr
220 }
221 }
222 }
223
224 pub fn reader_snippet(&self) -> proc_macro2::TokenStream {
247 let ident = &self.ident;
248 let column_reader = self.ty.column_reader();
249
250 let write_batch_expr = quote! {
252 let mut vals = Vec::new();
253 if let #column_reader(mut typed) = column_reader {
254 let mut definition_levels = Vec::new();
255 let (total_num, valid_num, decoded_num) = typed.read_records(
256 num_records, Some(&mut definition_levels), None, &mut vals)?;
257 if valid_num != decoded_num {
258 panic!("Support only valid records, found {} null records in column type {}",
259 decoded_num - valid_num, stringify!{#ident});
260 }
261 } else {
262 panic!("Schema and struct disagree on type for {}", stringify!{#ident});
263 }
264 };
265
266 let vals_writer = match &self.ty {
269 Type::TypePath(_) => self.copied_direct_fields(),
270 Type::Reference(_, first_type) => match **first_type {
271 Type::TypePath(_) => self.copied_direct_fields(),
272 Type::Slice(ref second_type) => match **second_type {
273 Type::TypePath(_) => self.copied_direct_fields(),
274 ref f => unimplemented!("Unsupported: {:#?}", f),
275 },
276 ref f => unimplemented!("Unsupported: {:#?}", f),
277 },
278 Type::Vec(first_type) => match **first_type {
279 Type::TypePath(_) => self.copied_direct_fields(),
280 ref f => unimplemented!("Unsupported: {:#?}", f),
281 },
282 f => unimplemented!("Unsupported: {:#?}", f),
283 };
284
285 quote! {
286 {
287 #write_batch_expr
288
289 #vals_writer
290 }
291 }
292 }
293
294 pub fn parquet_type(&self) -> proc_macro2::TokenStream {
295 let field_name = self.ident.unraw().to_string();
301 let physical_type = match self.ty.physical_type() {
302 parquet::basic::Type::BOOLEAN => quote! {
303 ::parquet::basic::Type::BOOLEAN
304 },
305 parquet::basic::Type::INT32 => quote! {
306 ::parquet::basic::Type::INT32
307 },
308 parquet::basic::Type::INT64 => quote! {
309 ::parquet::basic::Type::INT64
310 },
311 parquet::basic::Type::INT96 => quote! {
312 ::parquet::basic::Type::INT96
313 },
314 parquet::basic::Type::FLOAT => quote! {
315 ::parquet::basic::Type::FLOAT
316 },
317 parquet::basic::Type::DOUBLE => quote! {
318 ::parquet::basic::Type::DOUBLE
319 },
320 parquet::basic::Type::BYTE_ARRAY => quote! {
321 ::parquet::basic::Type::BYTE_ARRAY
322 },
323 parquet::basic::Type::FIXED_LEN_BYTE_ARRAY => quote! {
324 ::parquet::basic::Type::FIXED_LEN_BYTE_ARRAY
325 },
326 };
327 let logical_type = self.ty.logical_type();
328 let repetition = self.ty.repetition();
329 let converted_type = self.ty.converted_type();
330 let length = self.ty.length();
331
332 let mut builder = quote! {
333 ParquetType::primitive_type_builder(#field_name, #physical_type)
334 .with_logical_type(#logical_type)
335 .with_repetition(#repetition)
336 };
337
338 if let Some(converted_type) = converted_type {
339 builder = quote! { #builder.with_converted_type(#converted_type) };
340 }
341
342 if let Some(length) = length {
343 builder = quote! { #builder.with_length(#length) };
344 }
345
346 quote! { fields.push(#builder.build().unwrap().into()) }
347 }
348
349 fn option_into_vals(&self) -> proc_macro2::TokenStream {
350 let field_name = &self.ident;
351 let is_a_byte_buf = self.is_a_byte_buf;
352 let is_a_timestamp = self.third_party_type == Some(ThirdPartyType::ChronoNaiveDateTime);
353 let is_a_date = self.third_party_type == Some(ThirdPartyType::ChronoNaiveDate);
354 let is_a_uuid = self.third_party_type == Some(ThirdPartyType::Uuid);
355 let copy_to_vec = !matches!(
356 self.ty.physical_type(),
357 parquet::basic::Type::BYTE_ARRAY | parquet::basic::Type::FIXED_LEN_BYTE_ARRAY
358 );
359
360 let binding = if copy_to_vec {
361 quote! { let Some(inner) = rec.#field_name }
362 } else {
363 quote! { let Some(inner) = &rec.#field_name }
364 };
365
366 let some = if is_a_timestamp {
367 quote! { Some(inner.timestamp_millis()) }
368 } else if is_a_date {
369 quote! { Some(inner.signed_duration_since(::chrono::NaiveDate::from_ymd(1970, 1, 1)).num_days() as i32) }
370 } else if is_a_uuid {
371 quote! { Some((&inner.to_string()[..]).into()) }
372 } else if is_a_byte_buf {
373 quote! { Some((&inner[..]).into())}
374 } else {
375 match self.ty.physical_type() {
377 parquet::basic::Type::INT32 => quote! { Some(inner as i32) },
378 parquet::basic::Type::INT64 => quote! { Some(inner as i64) },
379 _ => quote! { Some(inner) },
380 }
381 };
382
383 quote! {
384 let vals: Vec<_> = records.iter().filter_map(|rec| {
385 if #binding {
386 #some
387 } else {
388 None
389 }
390 }).collect();
391 }
392 }
393
394 fn copied_direct_vals(&self) -> proc_macro2::TokenStream {
396 let field_name = &self.ident;
397
398 let access = match self.third_party_type {
399 Some(ThirdPartyType::ChronoNaiveDateTime) => {
400 quote! { rec.#field_name.timestamp_millis() }
401 }
402 Some(ThirdPartyType::ChronoNaiveDate) => {
403 quote! { rec.#field_name.signed_duration_since(::chrono::NaiveDate::from_ymd(1970, 1, 1)).num_days() as i32 }
404 }
405 Some(ThirdPartyType::Uuid) => {
406 quote! { rec.#field_name.as_bytes().to_vec().into() }
407 }
408 _ => {
409 if self.is_a_byte_buf {
410 quote! { (&rec.#field_name[..]).into() }
411 } else {
412 match self.ty.physical_type() {
414 parquet::basic::Type::INT32 => quote! { rec.#field_name as i32 },
415 parquet::basic::Type::INT64 => quote! { rec.#field_name as i64 },
416 _ => quote! { rec.#field_name },
417 }
418 }
419 }
420 };
421
422 quote! {
423 let vals: Vec<_> = records.iter().map(|rec| #access).collect();
424 }
425 }
426
427 fn copied_direct_fields(&self) -> proc_macro2::TokenStream {
429 let field_name = &self.ident;
430
431 let value = match self.third_party_type {
432 Some(ThirdPartyType::ChronoNaiveDateTime) => {
433 quote! { ::chrono::naive::NaiveDateTime::from_timestamp_millis(vals[i]).unwrap() }
434 }
435 Some(ThirdPartyType::ChronoNaiveDate) => {
436 quote! {
438 ::chrono::naive::NaiveDate::from_num_days_from_ce_opt(vals[i].saturating_add(719163)).unwrap()
439 }
440 }
441 Some(ThirdPartyType::Uuid) => {
442 quote! { ::uuid::Uuid::from_bytes(vals[i].data().try_into().unwrap()) }
443 }
444 _ => match &self.ty {
445 Type::TypePath(_) => match self.ty.last_part().as_str() {
446 "String" => quote! { String::from(std::str::from_utf8(vals[i].data())
447 .expect("invalid UTF-8 sequence")) },
448 t => {
449 let s: proc_macro2::TokenStream = t.parse().unwrap();
450 quote! { vals[i] as #s }
451 }
452 },
453 Type::Vec(_) => quote! { vals[i].data().to_vec() },
454 f => unimplemented!("Unsupported: {:#?}", f),
455 },
456 };
457
458 quote! {
459 for (i, r) in &mut records[..num_records].iter_mut().enumerate() {
460 r.#field_name = #value;
461 }
462 }
463 }
464
465 fn optional_definition_levels(&self) -> proc_macro2::TokenStream {
466 let field_name = &self.ident;
467
468 quote! {
469 let definition_levels: Vec<i16> = self
470 .iter()
471 .map(|rec| if rec.#field_name.is_some() { 1 } else { 0 })
472 .collect();
473 }
474 }
475}
476
477#[allow(clippy::enum_variant_names)]
478#[allow(clippy::large_enum_variant)]
479#[derive(Debug, PartialEq)]
480enum Type {
481 Array(Box<Type>, syn::Expr),
482 Option(Box<Type>),
483 Slice(Box<Type>),
484 Vec(Box<Type>),
485 TypePath(syn::Type),
486 Reference(Option<syn::Lifetime>, Box<Type>),
487}
488
489impl Type {
490 fn column_writer(&self) -> syn::TypePath {
493 use parquet::basic::Type as BasicType;
494
495 match self.physical_type() {
496 BasicType::BOOLEAN => {
497 syn::parse_quote!(ColumnWriter::BoolColumnWriter)
498 }
499 BasicType::INT32 => syn::parse_quote!(ColumnWriter::Int32ColumnWriter),
500 BasicType::INT64 => syn::parse_quote!(ColumnWriter::Int64ColumnWriter),
501 BasicType::INT96 => syn::parse_quote!(ColumnWriter::Int96ColumnWriter),
502 BasicType::FLOAT => syn::parse_quote!(ColumnWriter::FloatColumnWriter),
503 BasicType::DOUBLE => syn::parse_quote!(ColumnWriter::DoubleColumnWriter),
504 BasicType::BYTE_ARRAY => {
505 syn::parse_quote!(ColumnWriter::ByteArrayColumnWriter)
506 }
507 BasicType::FIXED_LEN_BYTE_ARRAY => {
508 syn::parse_quote!(ColumnWriter::FixedLenByteArrayColumnWriter)
509 }
510 }
511 }
512
513 fn column_reader(&self) -> syn::TypePath {
516 use parquet::basic::Type as BasicType;
517
518 match self.physical_type() {
519 BasicType::BOOLEAN => {
520 syn::parse_quote!(ColumnReader::BoolColumnReader)
521 }
522 BasicType::INT32 => syn::parse_quote!(ColumnReader::Int32ColumnReader),
523 BasicType::INT64 => syn::parse_quote!(ColumnReader::Int64ColumnReader),
524 BasicType::INT96 => syn::parse_quote!(ColumnReader::Int96ColumnReader),
525 BasicType::FLOAT => syn::parse_quote!(ColumnReader::FloatColumnReader),
526 BasicType::DOUBLE => syn::parse_quote!(ColumnReader::DoubleColumnReader),
527 BasicType::BYTE_ARRAY => {
528 syn::parse_quote!(ColumnReader::ByteArrayColumnReader)
529 }
530 BasicType::FIXED_LEN_BYTE_ARRAY => {
531 syn::parse_quote!(ColumnReader::FixedLenByteArrayColumnReader)
532 }
533 }
534 }
535
536 fn leaf_type_recursive(&self) -> &Type {
546 Type::leaf_type_recursive_helper(self, None)
547 }
548
549 fn leaf_type_recursive_helper<'a>(ty: &'a Type, parent_ty: Option<&'a Type>) -> &'a Type {
550 match ty {
551 Type::TypePath(_) => parent_ty.unwrap_or(ty),
552 Type::Option(first_type)
553 | Type::Vec(first_type)
554 | Type::Array(first_type, _)
555 | Type::Slice(first_type)
556 | Type::Reference(_, first_type) => {
557 Type::leaf_type_recursive_helper(first_type, Some(ty))
558 }
559 }
560 }
561
562 fn inner_type(&self) -> &syn::Type {
566 let leaf_type = self.leaf_type_recursive();
567
568 match leaf_type {
569 Type::TypePath(type_) => type_,
570 Type::Option(first_type)
571 | Type::Vec(first_type)
572 | Type::Array(first_type, _)
573 | Type::Slice(first_type)
574 | Type::Reference(_, first_type) => match **first_type {
575 Type::TypePath(ref type_) => type_,
576 _ => unimplemented!("leaf_type() should only return shallow types"),
577 },
578 }
579 }
580
581 fn last_part(&self) -> String {
594 let inner_type = self.inner_type();
595 let inner_type_str = (quote! { #inner_type }).to_string();
596
597 inner_type_str
598 .split("::")
599 .last()
600 .unwrap()
601 .trim()
602 .to_string()
603 }
604
605 fn physical_type(&self) -> parquet::basic::Type {
613 use parquet::basic::Type as BasicType;
614
615 let last_part = self.last_part();
616 let leaf_type = self.leaf_type_recursive();
617
618 match leaf_type {
619 Type::Array(first_type, _length) => {
620 if let Type::TypePath(_) = **first_type {
621 if last_part == "u8" {
622 return BasicType::FIXED_LEN_BYTE_ARRAY;
623 }
624 }
625 }
626 Type::Vec(first_type) | Type::Slice(first_type) => {
627 if let Type::TypePath(_) = **first_type {
628 if last_part == "u8" {
629 return BasicType::BYTE_ARRAY;
630 }
631 }
632 }
633 _ => (),
634 }
635
636 match last_part.trim() {
637 "bool" => BasicType::BOOLEAN,
638 "u8" | "u16" | "u32" => BasicType::INT32,
639 "i8" | "i16" | "i32" | "NaiveDate" => BasicType::INT32,
640 "u64" | "i64" | "NaiveDateTime" => BasicType::INT64,
641 "usize" | "isize" => {
642 if usize::BITS == 64 {
643 BasicType::INT64
644 } else {
645 BasicType::INT32
646 }
647 }
648 "f32" => BasicType::FLOAT,
649 "f64" => BasicType::DOUBLE,
650 "String" | "str" | "Arc < str >" => BasicType::BYTE_ARRAY,
651 "Uuid" => BasicType::FIXED_LEN_BYTE_ARRAY,
652 f => unimplemented!("{} currently is not supported", f),
653 }
654 }
655
656 fn length(&self) -> Option<syn::Expr> {
657 let last_part = self.last_part();
658 let leaf_type = self.leaf_type_recursive();
659
660 if let Type::Array(first_type, length) = leaf_type {
662 if let Type::TypePath(_) = **first_type {
663 if last_part == "u8" {
664 return Some(length.clone());
665 }
666 }
667 }
668
669 match last_part.trim() {
670 "Uuid" => Some(syn::parse_quote!(16)),
672 _ => None,
673 }
674 }
675
676 fn logical_type(&self) -> proc_macro2::TokenStream {
677 let last_part = self.last_part();
678 let leaf_type = self.leaf_type_recursive();
679
680 match leaf_type {
681 Type::Array(first_type, _length) => {
682 if let Type::TypePath(_) = **first_type {
683 if last_part == "u8" {
684 return quote! { None };
685 }
686 }
687 }
688 Type::Vec(first_type) | Type::Slice(first_type) => {
689 if let Type::TypePath(_) = **first_type {
690 if last_part == "u8" {
691 return quote! { None };
692 }
693 }
694 }
695 _ => (),
696 }
697
698 match last_part.trim() {
699 "bool" => quote! { None },
700 "u8" => quote! { Some(LogicalType::integer(8, false)) },
701 "u16" => quote! { Some(LogicalType::integer(16, false)) },
702 "u32" => quote! { Some(LogicalType::integer(32, false)) },
703 "u64" => quote! { Some(LogicalType::integer(64, false)) },
704 "i8" => quote! { Some(LogicalType::integer(8, true)) },
705 "i16" => quote! { Some(LogicalType::integer(16, true)) },
706 "i32" | "i64" => quote! { None },
707 "usize" => {
708 quote! { Some(LogicalType::integer(usize::BITS as i8, false)) }
709 }
710 "isize" => {
711 quote! { Some(LogicalType::integer(usize::BITS as i8, true)) }
712 }
713 "NaiveDate" => quote! { Some(LogicalType::Date) },
714 "NaiveDateTime" => quote! { None },
715 "f32" | "f64" => quote! { None },
716 "String" | "str" | "Arc < str >" => quote! { Some(LogicalType::String) },
717 "Uuid" => quote! { Some(LogicalType::Uuid) },
718 f => unimplemented!("{} currently is not supported", f),
719 }
720 }
721
722 fn converted_type(&self) -> Option<proc_macro2::TokenStream> {
723 let last_part = self.last_part();
724
725 match last_part.trim() {
726 "NaiveDateTime" => Some(quote! { ::parquet::basic::ConvertedType::TIMESTAMP_MILLIS }),
727 _ => None,
728 }
729 }
730
731 fn repetition(&self) -> proc_macro2::TokenStream {
732 match self {
733 Type::Option(_) => quote! { ::parquet::basic::Repetition::OPTIONAL },
734 Type::Reference(_, ty) => ty.repetition(),
735 _ => quote! { ::parquet::basic::Repetition::REQUIRED },
736 }
737 }
738
739 fn from(f: &syn::Field) -> Self {
742 Type::from_type(f, &f.ty)
743 }
744
745 fn from_type(f: &syn::Field, ty: &syn::Type) -> Self {
746 match ty {
747 syn::Type::Path(p) => Type::from_type_path(f, p),
748 syn::Type::Reference(tr) => Type::from_type_reference(f, tr),
749 syn::Type::Array(ta) => Type::from_type_array(f, ta),
750 syn::Type::Slice(ts) => Type::from_type_slice(f, ts),
751 other => unimplemented!(
752 "Unable to derive {:?} - it is currently an unsupported type\n{:#?}",
753 f.ident.as_ref().unwrap(),
754 other
755 ),
756 }
757 }
758
759 fn from_type_path(f: &syn::Field, p: &syn::TypePath) -> Self {
760 let last_segment = p.path.segments.last().unwrap();
761
762 let is_vec = last_segment.ident == syn::Ident::new("Vec", proc_macro2::Span::call_site());
763 let is_option =
764 last_segment.ident == syn::Ident::new("Option", proc_macro2::Span::call_site());
765
766 if is_vec || is_option {
767 let generic_type = match &last_segment.arguments {
768 syn::PathArguments::AngleBracketed(angle_args) => {
769 assert_eq!(angle_args.args.len(), 1);
770 let first_arg = &angle_args.args[0];
771
772 match first_arg {
773 syn::GenericArgument::Type(typath) => typath.clone(),
774 other => unimplemented!("Unsupported: {:#?}", other),
775 }
776 }
777 other => unimplemented!("Unsupported: {:#?}", other),
778 };
779
780 if is_vec {
781 Type::Vec(Box::new(Type::from_type(f, &generic_type)))
782 } else {
783 Type::Option(Box::new(Type::from_type(f, &generic_type)))
784 }
785 } else {
786 Type::TypePath(syn::Type::Path(p.clone()))
787 }
788 }
789
790 fn from_type_reference(f: &syn::Field, tr: &syn::TypeReference) -> Self {
791 let lifetime = tr.lifetime.clone();
792 let inner_type = Type::from_type(f, tr.elem.as_ref());
793 Type::Reference(lifetime, Box::new(inner_type))
794 }
795
796 fn from_type_array(f: &syn::Field, ta: &syn::TypeArray) -> Self {
797 let inner_type = Type::from_type(f, ta.elem.as_ref());
798 Type::Array(Box::new(inner_type), ta.len.clone())
799 }
800
801 fn from_type_slice(f: &syn::Field, ts: &syn::TypeSlice) -> Self {
802 let inner_type = Type::from_type(f, ts.elem.as_ref());
803 Type::Slice(Box::new(inner_type))
804 }
805}
806
807#[cfg(test)]
808mod test {
809 use super::*;
810 use syn::{Data, DataStruct, DeriveInput};
811
812 fn extract_fields(input: proc_macro2::TokenStream) -> Vec<syn::Field> {
813 let input: DeriveInput = syn::parse2(input).unwrap();
814
815 let fields = match input.data {
816 Data::Struct(DataStruct { fields, .. }) => fields,
817 _ => panic!("Input must be a struct"),
818 };
819
820 fields.iter().map(|field| field.to_owned()).collect()
821 }
822
823 #[test]
824 fn test_generating_a_simple_writer_snippet() {
825 let snippet: proc_macro2::TokenStream = quote! {
826 struct ABoringStruct {
827 counter: usize,
828 }
829 };
830
831 let fields = extract_fields(snippet);
832 let counter = Field::from(&fields[0]);
833
834 let snippet = counter.writer_snippet().to_string();
835 assert_eq!(snippet,
836 (quote!{
837 {
838 let vals : Vec < _ > = records . iter ( ) . map ( | rec | rec . counter as i64 ) . collect ( );
839
840 if let ColumnWriter::Int64ColumnWriter ( typed ) = column_writer.untyped() {
841 typed . write_batch ( & vals [ .. ] , None , None ) ?;
842 } else {
843 panic!("Schema and struct disagree on type for {}" , stringify!{ counter } )
844 }
845 }
846 }).to_string()
847 )
848 }
849
850 #[test]
851 fn test_generating_a_simple_reader_snippet() {
852 let snippet: proc_macro2::TokenStream = quote! {
853 struct ABoringStruct {
854 counter: usize,
855 }
856 };
857
858 let fields = extract_fields(snippet);
859 let counter = Field::from(&fields[0]);
860
861 let snippet = counter.reader_snippet().to_string();
862 assert_eq!(
863 snippet,
864 (quote! {
865 {
866 let mut vals = Vec::new();
867 if let ColumnReader::Int64ColumnReader(mut typed) = column_reader {
868 let mut definition_levels = Vec::new();
869 let (total_num, valid_num, decoded_num) = typed.read_records(
870 num_records, Some(&mut definition_levels), None, &mut vals)?;
871 if valid_num != decoded_num {
872 panic!("Support only valid records, found {} null records in column type {}",
873 decoded_num - valid_num, stringify!{counter});
874 }
875 } else {
876 panic!("Schema and struct disagree on type for {}", stringify!{counter});
877 }
878 for (i, r) in &mut records[..num_records].iter_mut().enumerate() {
879 r.counter = vals[i] as usize;
880 }
881 }
882 })
883 .to_string()
884 )
885 }
886
887 #[test]
888 fn test_parquet_type_with_raw_identifier() {
889 let snippet: proc_macro2::TokenStream = quote! {
890 struct ABoringStruct {
891 r#type: i32,
892 }
893 };
894
895 let fields = extract_fields(snippet);
896 let r#type = Field::from(&fields[0]);
897
898 let snippet = r#type.parquet_type().to_string();
900 assert!(
901 snippet.contains("primitive_type_builder (\"type\""),
902 "{snippet}"
903 );
904 }
905
906 #[test]
907 fn test_optional_to_writer_snippet() {
908 let struct_def: proc_macro2::TokenStream = quote! {
909 struct StringBorrower<'a> {
910 optional_str: Option<&'a str>,
911 optional_string: Option<&String>,
912 optional_dumb_int: Option<&i32>,
913 }
914 };
915
916 let fields = extract_fields(struct_def);
917
918 let optional = Field::from(&fields[0]);
919 let snippet = optional.writer_snippet();
920 assert_eq!(snippet.to_string(),
921 (quote! {
922 {
923 let definition_levels : Vec < i16 > = self . iter ( ) . map ( | rec | if rec . optional_str . is_some ( ) { 1 } else { 0 } ) . collect ( ) ;
924
925 let vals: Vec <_> = records.iter().filter_map( |rec| {
926 if let Some ( inner ) = &rec . optional_str {
927 Some ( (&inner[..]).into() )
928 } else {
929 None
930 }
931 }).collect();
932
933 if let ColumnWriter::ByteArrayColumnWriter ( typed ) = column_writer.untyped() {
934 typed . write_batch ( & vals [ .. ] , Some(&definition_levels[..]) , None ) ? ;
935 } else {
936 panic!("Schema and struct disagree on type for {}" , stringify ! { optional_str } )
937 }
938 }
939 }
940 ).to_string());
941
942 let optional = Field::from(&fields[1]);
943 let snippet = optional.writer_snippet();
944 assert_eq!(snippet.to_string(),
945 (quote!{
946 {
947 let definition_levels : Vec < i16 > = self . iter ( ) . map ( | rec | if rec . optional_string . is_some ( ) { 1 } else { 0 } ) . collect ( ) ;
948
949 let vals: Vec <_> = records.iter().filter_map( |rec| {
950 if let Some ( inner ) = &rec . optional_string {
951 Some ( (&inner[..]).into() )
952 } else {
953 None
954 }
955 }).collect();
956
957 if let ColumnWriter::ByteArrayColumnWriter ( typed ) = column_writer.untyped() {
958 typed . write_batch ( & vals [ .. ] , Some(&definition_levels[..]) , None ) ? ;
959 } else {
960 panic!("Schema and struct disagree on type for {}" , stringify ! { optional_string } )
961 }
962 }
963 }).to_string());
964
965 let optional = Field::from(&fields[2]);
966 let snippet = optional.writer_snippet();
967 assert_eq!(snippet.to_string(),
968 (quote!{
969 {
970 let definition_levels : Vec < i16 > = self . iter ( ) . map ( | rec | if rec . optional_dumb_int . is_some ( ) { 1 } else { 0 } ) . collect ( ) ;
971
972 let vals: Vec <_> = records.iter().filter_map( |rec| {
973 if let Some ( inner ) = rec . optional_dumb_int {
974 Some ( inner as i32 )
975 } else {
976 None
977 }
978 }).collect();
979
980 if let ColumnWriter::Int32ColumnWriter ( typed ) = column_writer.untyped() {
981 typed . write_batch ( & vals [ .. ] , Some(&definition_levels[..]) , None ) ? ;
982 } else {
983 panic!("Schema and struct disagree on type for {}" , stringify ! { optional_dumb_int } )
984 }
985 }
986 }).to_string());
987 }
988
989 #[test]
990 fn test_converting_to_column_writer_type() {
991 let snippet: proc_macro2::TokenStream = quote! {
992 struct ABasicStruct {
993 yes_no: bool,
994 name: String,
995 }
996 };
997
998 let fields = extract_fields(snippet);
999 let processed: Vec<_> = fields.iter().map(Field::from).collect();
1000
1001 let column_writers: Vec<_> = processed
1002 .iter()
1003 .map(|field| field.ty.column_writer())
1004 .collect();
1005
1006 assert_eq!(
1007 column_writers,
1008 vec![
1009 syn::parse_quote!(ColumnWriter::BoolColumnWriter),
1010 syn::parse_quote!(ColumnWriter::ByteArrayColumnWriter)
1011 ]
1012 );
1013 }
1014
1015 #[test]
1016 fn test_converting_to_column_reader_type() {
1017 let snippet: proc_macro2::TokenStream = quote! {
1018 struct ABasicStruct {
1019 yes_no: bool,
1020 name: String,
1021 }
1022 };
1023
1024 let fields = extract_fields(snippet);
1025 let processed: Vec<_> = fields.iter().map(Field::from).collect();
1026
1027 let column_readers: Vec<_> = processed
1028 .iter()
1029 .map(|field| field.ty.column_reader())
1030 .collect();
1031
1032 assert_eq!(
1033 column_readers,
1034 vec![
1035 syn::parse_quote!(ColumnReader::BoolColumnReader),
1036 syn::parse_quote!(ColumnReader::ByteArrayColumnReader)
1037 ]
1038 );
1039 }
1040
1041 #[test]
1042 fn convert_basic_struct() {
1043 let snippet: proc_macro2::TokenStream = quote! {
1044 struct ABasicStruct {
1045 yes_no: bool,
1046 name: String,
1047 length: usize
1048 }
1049 };
1050
1051 let fields = extract_fields(snippet);
1052 let processed: Vec<_> = fields.iter().map(Field::from).collect();
1053 assert_eq!(processed.len(), 3);
1054
1055 assert_eq!(
1056 processed,
1057 vec![
1058 Field {
1059 ident: syn::Ident::new("yes_no", proc_macro2::Span::call_site()),
1060 ty: Type::TypePath(syn::parse_quote!(bool)),
1061 is_a_byte_buf: false,
1062 third_party_type: None,
1063 },
1064 Field {
1065 ident: syn::Ident::new("name", proc_macro2::Span::call_site()),
1066 ty: Type::TypePath(syn::parse_quote!(String)),
1067 is_a_byte_buf: true,
1068 third_party_type: None,
1069 },
1070 Field {
1071 ident: syn::Ident::new("length", proc_macro2::Span::call_site()),
1072 ty: Type::TypePath(syn::parse_quote!(usize)),
1073 is_a_byte_buf: false,
1074 third_party_type: None,
1075 }
1076 ]
1077 )
1078 }
1079
1080 #[test]
1081 fn test_get_inner_type() {
1082 let snippet: proc_macro2::TokenStream = quote! {
1083 struct LotsOfInnerTypes {
1084 a_vec: Vec<u8>,
1085 a_option: ::std::option::Option<bool>,
1086 a_silly_string: ::std::string::String,
1087 a_complicated_thing: ::std::option::Option<::std::result::Result<(),()>>,
1088 }
1089 };
1090
1091 let fields = extract_fields(snippet);
1092 let converted_fields: Vec<_> = fields.iter().map(Type::from).collect();
1093 let inner_types: Vec<_> = converted_fields
1094 .iter()
1095 .map(|field| field.inner_type())
1096 .collect();
1097 let inner_types_strs: Vec<_> = inner_types
1098 .iter()
1099 .map(|ty| (quote! { #ty }).to_string())
1100 .collect();
1101
1102 assert_eq!(
1103 inner_types_strs,
1104 vec![
1105 "u8",
1106 "bool",
1107 ":: std :: string :: String",
1108 ":: std :: result :: Result < () , () >"
1109 ]
1110 )
1111 }
1112
1113 #[test]
1114 fn test_physical_type() {
1115 use parquet::basic::Type as BasicType;
1116 let snippet: proc_macro2::TokenStream = quote! {
1117 struct LotsOfInnerTypes {
1118 a_buf: ::std::vec::Vec<u8>,
1119 a_number: i32,
1120 a_verbose_option: ::std::option::Option<bool>,
1121 a_silly_string: String,
1122 a_fix_byte_buf: [u8; 10],
1123 a_complex_option: ::std::option::Option<&Vec<u8>>,
1124 a_complex_vec: &::std::vec::Vec<&Option<u8>>,
1125 a_uuid: ::uuid::Uuid,
1126 }
1127 };
1128
1129 let fields = extract_fields(snippet);
1130 let converted_fields: Vec<_> = fields.iter().map(Type::from).collect();
1131 let physical_types: Vec<_> = converted_fields
1132 .iter()
1133 .map(|ty| ty.physical_type())
1134 .collect();
1135
1136 assert_eq!(
1137 physical_types,
1138 vec![
1139 BasicType::BYTE_ARRAY,
1140 BasicType::INT32,
1141 BasicType::BOOLEAN,
1142 BasicType::BYTE_ARRAY,
1143 BasicType::FIXED_LEN_BYTE_ARRAY,
1144 BasicType::BYTE_ARRAY,
1145 BasicType::INT32,
1146 BasicType::FIXED_LEN_BYTE_ARRAY,
1147 ]
1148 )
1149 }
1150
1151 #[test]
1152 fn test_type_length() {
1153 let snippet: proc_macro2::TokenStream = quote! {
1154 struct LotsOfInnerTypes {
1155 a_buf: ::std::vec::Vec<u8>,
1156 a_number: i32,
1157 a_verbose_option: ::std::option::Option<bool>,
1158 a_silly_string: String,
1159 a_fix_byte_buf: [u8; 10],
1160 a_complex_option: ::std::option::Option<&Vec<u8>>,
1161 a_complex_vec: &::std::vec::Vec<&Option<u8>>,
1162 a_uuid: ::uuid::Uuid,
1163 }
1164 };
1165
1166 let fields = extract_fields(snippet);
1167 let converted_fields: Vec<_> = fields.iter().map(Type::from).collect();
1168 let lengths: Vec<_> = converted_fields.iter().map(|ty| ty.length()).collect();
1169
1170 assert_eq!(
1171 lengths,
1172 vec![
1173 None,
1174 None,
1175 None,
1176 None,
1177 Some(syn::parse_quote!(10)),
1178 None,
1179 None,
1180 Some(syn::parse_quote!(16)),
1181 ]
1182 )
1183 }
1184
1185 #[test]
1186 fn test_convert_comprehensive_owned_struct() {
1187 let snippet: proc_macro2::TokenStream = quote! {
1188 struct VecHolder {
1189 a_vec: ::std::vec::Vec<u8>,
1190 a_option: ::std::option::Option<bool>,
1191 a_silly_string: ::std::string::String,
1192 a_complicated_thing: ::std::option::Option<::std::result::Result<(),()>>,
1193 }
1194 };
1195
1196 let fields = extract_fields(snippet);
1197 let converted_fields: Vec<_> = fields.iter().map(Type::from).collect();
1198
1199 assert_eq!(
1200 converted_fields,
1201 vec![
1202 Type::Vec(Box::new(Type::TypePath(syn::parse_quote!(u8)))),
1203 Type::Option(Box::new(Type::TypePath(syn::parse_quote!(bool)))),
1204 Type::TypePath(syn::parse_quote!(::std::string::String)),
1205 Type::Option(Box::new(Type::TypePath(
1206 syn::parse_quote!(::std::result::Result<(),()>)
1207 ))),
1208 ]
1209 );
1210 }
1211
1212 #[test]
1213 fn test_convert_borrowed_struct() {
1214 let snippet: proc_macro2::TokenStream = quote! {
1215 struct Borrower<'a> {
1216 a_str: &'a str,
1217 a_borrowed_option: &'a Option<bool>,
1218 so_many_borrows: &'a Option<&'a str>,
1219 }
1220 };
1221
1222 let fields = extract_fields(snippet);
1223 let types: Vec<_> = fields.iter().map(Type::from).collect();
1224
1225 assert_eq!(
1226 types,
1227 vec![
1228 Type::Reference(
1229 Some(syn::Lifetime::new("'a", proc_macro2::Span::call_site())),
1230 Box::new(Type::TypePath(syn::parse_quote!(str)))
1231 ),
1232 Type::Reference(
1233 Some(syn::Lifetime::new("'a", proc_macro2::Span::call_site())),
1234 Box::new(Type::Option(Box::new(Type::TypePath(syn::parse_quote!(
1235 bool
1236 )))))
1237 ),
1238 Type::Reference(
1239 Some(syn::Lifetime::new("'a", proc_macro2::Span::call_site())),
1240 Box::new(Type::Option(Box::new(Type::Reference(
1241 Some(syn::Lifetime::new("'a", proc_macro2::Span::call_site())),
1242 Box::new(Type::TypePath(syn::parse_quote!(str)))
1243 ))))
1244 ),
1245 ]
1246 );
1247 }
1248
1249 #[test]
1250 fn test_chrono_timestamp_millis_write() {
1251 let snippet: proc_macro2::TokenStream = quote! {
1252 struct ATimestampStruct {
1253 henceforth: chrono::NaiveDateTime,
1254 maybe_happened: Option<&chrono::NaiveDateTime>,
1255 }
1256 };
1257
1258 let fields = extract_fields(snippet);
1259 let when = Field::from(&fields[0]);
1260 assert_eq!(when.writer_snippet().to_string(),(quote!{
1261 {
1262 let vals : Vec<_> = records.iter().map(|rec| rec.henceforth.timestamp_millis() ).collect();
1263 if let ColumnWriter::Int64ColumnWriter(typed) = column_writer.untyped() {
1264 typed.write_batch(&vals[..], None, None) ?;
1265 } else {
1266 panic!("Schema and struct disagree on type for {}" , stringify!{ henceforth })
1267 }
1268 }
1269 }).to_string());
1270
1271 let maybe_happened = Field::from(&fields[1]);
1272 assert_eq!(maybe_happened.writer_snippet().to_string(),(quote!{
1273 {
1274 let definition_levels : Vec<i16> = self.iter().map(|rec| if rec.maybe_happened.is_some() { 1 } else { 0 }).collect();
1275 let vals : Vec<_> = records.iter().filter_map(|rec| {
1276 if let Some(inner) = rec.maybe_happened {
1277 Some(inner.timestamp_millis())
1278 } else {
1279 None
1280 }
1281 }).collect();
1282
1283 if let ColumnWriter::Int64ColumnWriter(typed) = column_writer.untyped() {
1284 typed.write_batch(&vals[..], Some(&definition_levels[..]), None) ?;
1285 } else {
1286 panic!("Schema and struct disagree on type for {}" , stringify!{ maybe_happened })
1287 }
1288 }
1289 }).to_string());
1290 }
1291
1292 #[test]
1293 fn test_chrono_timestamp_millis_read() {
1294 let snippet: proc_macro2::TokenStream = quote! {
1295 struct ATimestampStruct {
1296 henceforth: chrono::NaiveDateTime,
1297 }
1298 };
1299
1300 let fields = extract_fields(snippet);
1301 let when = Field::from(&fields[0]);
1302 assert_eq!(when.reader_snippet().to_string(),(quote!{
1303 {
1304 let mut vals = Vec::new();
1305 if let ColumnReader::Int64ColumnReader(mut typed) = column_reader {
1306 let mut definition_levels = Vec::new();
1307 let (total_num, valid_num, decoded_num) = typed.read_records(
1308 num_records, Some(&mut definition_levels), None, &mut vals)?;
1309 if valid_num != decoded_num {
1310 panic!("Support only valid records, found {} null records in column type {}",
1311 decoded_num - valid_num, stringify!{henceforth});
1312 }
1313 } else {
1314 panic!("Schema and struct disagree on type for {}", stringify!{ henceforth });
1315 }
1316 for (i, r) in &mut records[..num_records].iter_mut().enumerate() {
1317 r.henceforth = ::chrono::naive::NaiveDateTime::from_timestamp_millis(vals[i]).unwrap();
1318 }
1319 }
1320 }).to_string());
1321 }
1322
1323 #[test]
1324 fn test_chrono_date_write() {
1325 let snippet: proc_macro2::TokenStream = quote! {
1326 struct ATimestampStruct {
1327 henceforth: chrono::NaiveDate,
1328 maybe_happened: Option<&chrono::NaiveDate>,
1329 }
1330 };
1331
1332 let fields = extract_fields(snippet);
1333 let when = Field::from(&fields[0]);
1334 assert_eq!(when.writer_snippet().to_string(),(quote!{
1335 {
1336 let vals : Vec<_> = records.iter().map(|rec| rec.henceforth.signed_duration_since(::chrono::NaiveDate::from_ymd(1970, 1, 1)).num_days() as i32).collect();
1337 if let ColumnWriter::Int32ColumnWriter(typed) = column_writer.untyped() {
1338 typed.write_batch(&vals[..], None, None) ?;
1339 } else {
1340 panic!("Schema and struct disagree on type for {}" , stringify!{ henceforth })
1341 }
1342 }
1343 }).to_string());
1344
1345 let maybe_happened = Field::from(&fields[1]);
1346 assert_eq!(maybe_happened.writer_snippet().to_string(),(quote!{
1347 {
1348 let definition_levels : Vec<i16> = self.iter().map(|rec| if rec.maybe_happened.is_some() { 1 } else { 0 }).collect();
1349 let vals : Vec<_> = records.iter().filter_map(|rec| {
1350 if let Some(inner) = rec.maybe_happened {
1351 Some(inner.signed_duration_since(::chrono::NaiveDate::from_ymd(1970, 1, 1)).num_days() as i32)
1352 } else {
1353 None
1354 }
1355 }).collect();
1356
1357 if let ColumnWriter::Int32ColumnWriter(typed) = column_writer.untyped() {
1358 typed.write_batch(&vals[..], Some(&definition_levels[..]), None) ?;
1359 } else {
1360 panic!("Schema and struct disagree on type for {}" , stringify!{ maybe_happened })
1361 }
1362 }
1363 }).to_string());
1364 }
1365
1366 #[test]
1367 fn test_chrono_date_read() {
1368 let snippet: proc_macro2::TokenStream = quote! {
1369 struct ATimestampStruct {
1370 henceforth: chrono::NaiveDate,
1371 }
1372 };
1373
1374 let fields = extract_fields(snippet);
1375 let when = Field::from(&fields[0]);
1376 assert_eq!(when.reader_snippet().to_string(),(quote!{
1377 {
1378 let mut vals = Vec::new();
1379 if let ColumnReader::Int32ColumnReader(mut typed) = column_reader {
1380 let mut definition_levels = Vec::new();
1381 let (total_num, valid_num, decoded_num) = typed.read_records(
1382 num_records, Some(&mut definition_levels), None, &mut vals)?;
1383 if valid_num != decoded_num {
1384 panic!("Support only valid records, found {} null records in column type {}",
1385 decoded_num - valid_num, stringify!{henceforth});
1386 }
1387 } else {
1388 panic!("Schema and struct disagree on type for {}", stringify!{ henceforth });
1389 }
1390 for (i, r) in &mut records[..num_records].iter_mut().enumerate() {
1391 r.henceforth = ::chrono::naive::NaiveDate::from_num_days_from_ce_opt(vals[i].saturating_add(719163)).unwrap();
1392 }
1393 }
1394 }).to_string());
1395 }
1396
1397 #[test]
1398 fn test_uuid_write() {
1399 let snippet: proc_macro2::TokenStream = quote! {
1400 struct AUuidStruct {
1401 unique_id: uuid::Uuid,
1402 maybe_unique_id: Option<&uuid::Uuid>,
1403 }
1404 };
1405
1406 let fields = extract_fields(snippet);
1407 let when = Field::from(&fields[0]);
1408 assert_eq!(when.writer_snippet().to_string(),(quote!{
1409 {
1410 let vals : Vec<_> = records.iter().map(|rec| rec.unique_id.as_bytes().to_vec().into() ).collect();
1411 if let ColumnWriter::FixedLenByteArrayColumnWriter(typed) = column_writer.untyped() {
1412 typed.write_batch(&vals[..], None, None) ?;
1413 } else {
1414 panic!("Schema and struct disagree on type for {}" , stringify!{ unique_id })
1415 }
1416 }
1417 }).to_string());
1418
1419 let maybe_happened = Field::from(&fields[1]);
1420 assert_eq!(maybe_happened.writer_snippet().to_string(),(quote!{
1421 {
1422 let definition_levels : Vec<i16> = self.iter().map(|rec| if rec.maybe_unique_id.is_some() { 1 } else { 0 }).collect();
1423 let vals : Vec<_> = records.iter().filter_map(|rec| {
1424 if let Some(inner) = &rec.maybe_unique_id {
1425 Some((&inner.to_string()[..]).into())
1426 } else {
1427 None
1428 }
1429 }).collect();
1430
1431 if let ColumnWriter::FixedLenByteArrayColumnWriter(typed) = column_writer.untyped() {
1432 typed.write_batch(&vals[..], Some(&definition_levels[..]), None) ?;
1433 } else {
1434 panic!("Schema and struct disagree on type for {}" , stringify!{ maybe_unique_id })
1435 }
1436 }
1437 }).to_string());
1438 }
1439
1440 #[test]
1441 fn test_uuid_read() {
1442 let snippet: proc_macro2::TokenStream = quote! {
1443 struct AUuidStruct {
1444 unique_id: uuid::Uuid,
1445 }
1446 };
1447
1448 let fields = extract_fields(snippet);
1449 let when = Field::from(&fields[0]);
1450 assert_eq!(when.reader_snippet().to_string(),(quote!{
1451 {
1452 let mut vals = Vec::new();
1453 if let ColumnReader::FixedLenByteArrayColumnReader(mut typed) = column_reader {
1454 let mut definition_levels = Vec::new();
1455 let (total_num, valid_num, decoded_num) = typed.read_records(
1456 num_records, Some(&mut definition_levels), None, &mut vals)?;
1457 if valid_num != decoded_num {
1458 panic!("Support only valid records, found {} null records in column type {}",
1459 decoded_num - valid_num, stringify!{unique_id});
1460 }
1461 } else {
1462 panic!("Schema and struct disagree on type for {}", stringify!{ unique_id });
1463 }
1464 for (i, r) in &mut records[..num_records].iter_mut().enumerate() {
1465 r.unique_id = ::uuid::Uuid::from_bytes(vals[i].data().try_into().unwrap());
1466 }
1467 }
1468 }).to_string());
1469 }
1470
1471 #[test]
1472 fn test_converted_type() {
1473 let snippet: proc_macro2::TokenStream = quote! {
1474 struct ATimeStruct {
1475 time: chrono::NaiveDateTime,
1476 }
1477 };
1478
1479 let fields = extract_fields(snippet);
1480
1481 let time = Field::from(&fields[0]);
1482
1483 let converted_type = time.ty.converted_type();
1484 assert_eq!(
1485 converted_type.unwrap().to_string(),
1486 quote! { ::parquet::basic::ConvertedType::TIMESTAMP_MILLIS }.to_string()
1487 );
1488 }
1489}