1#![doc(
22 html_logo_url = "https://raw.githubusercontent.com/apache/parquet-format/25f05e73d8cd7f5c83532ce51cb4f4de8ba5f2a2/logo/parquet-logos_1.svg",
23 html_favicon_url = "https://raw.githubusercontent.com/apache/parquet-format/25f05e73d8cd7f5c83532ce51cb4f4de8ba5f2a2/logo/parquet-logos_1.svg"
24)]
25#![cfg_attr(docsrs, feature(doc_auto_cfg))]
26#![warn(missing_docs)]
27#![recursion_limit = "128"]
28
29extern crate proc_macro;
30extern crate proc_macro2;
31extern crate syn;
32#[macro_use]
33extern crate quote;
34
35extern crate parquet;
36
37use ::syn::{parse_macro_input, Data, DataStruct, DeriveInput};
38
39mod parquet_field;
40
41#[proc_macro_derive(ParquetRecordWriter)]
93pub fn parquet_record_writer(input: proc_macro::TokenStream) -> proc_macro::TokenStream {
94 let input: DeriveInput = parse_macro_input!(input as DeriveInput);
95 let fields = match input.data {
96 Data::Struct(DataStruct { fields, .. }) => fields,
97 Data::Enum(_) => unimplemented!("Enum currently is not supported"),
98 Data::Union(_) => unimplemented!("Union currently is not supported"),
99 };
100
101 let field_infos: Vec<_> = fields.iter().map(parquet_field::Field::from).collect();
102
103 let writer_snippets: Vec<proc_macro2::TokenStream> =
104 field_infos.iter().map(|x| x.writer_snippet()).collect();
105
106 let derived_for = input.ident;
107 let generics = input.generics;
108
109 let field_types: Vec<proc_macro2::TokenStream> =
110 field_infos.iter().map(|x| x.parquet_type()).collect();
111
112 (quote! {
113 impl #generics ::parquet::record::RecordWriter<#derived_for #generics> for &[#derived_for #generics] {
114 fn write_to_row_group<W: ::std::io::Write + Send>(
115 &self,
116 row_group_writer: &mut ::parquet::file::writer::SerializedRowGroupWriter<'_, W>
117 ) -> Result<(), ::parquet::errors::ParquetError> {
118 use ::parquet::column::writer::ColumnWriter;
119
120 let mut row_group_writer = row_group_writer;
121 let records = &self; #(
124 {
125 let mut some_column_writer = row_group_writer.next_column().unwrap();
126 if let Some(mut column_writer) = some_column_writer {
127 #writer_snippets
128 column_writer.close()?;
129 } else {
130 return Err(::parquet::errors::ParquetError::General("Failed to get next column".into()))
131 }
132 }
133 );*
134
135 Ok(())
136 }
137
138 fn schema(&self) -> Result<::parquet::schema::types::TypePtr, ::parquet::errors::ParquetError> {
139 use ::parquet::schema::types::Type as ParquetType;
140 use ::parquet::schema::types::TypePtr;
141 use ::parquet::basic::LogicalType;
142
143 let mut fields: ::std::vec::Vec<TypePtr> = ::std::vec::Vec::new();
144 #(
145 #field_types
146 );*;
147 let group = ParquetType::group_type_builder("rust_schema")
148 .with_fields(fields)
149 .build()?;
150 Ok(group.into())
151 }
152 }
153 }).into()
154}
155
156#[proc_macro_derive(ParquetRecordReader)]
191pub fn parquet_record_reader(input: proc_macro::TokenStream) -> proc_macro::TokenStream {
192 let input: DeriveInput = parse_macro_input!(input as DeriveInput);
193 let fields = match input.data {
194 Data::Struct(DataStruct { fields, .. }) => fields,
195 Data::Enum(_) => unimplemented!("Enum currently is not supported"),
196 Data::Union(_) => unimplemented!("Union currently is not supported"),
197 };
198
199 let field_infos: Vec<_> = fields.iter().map(parquet_field::Field::from).collect();
200 let field_names: Vec<_> = fields.iter().map(|f| f.ident.clone()).collect();
201 let reader_snippets: Vec<proc_macro2::TokenStream> =
202 field_infos.iter().map(|x| x.reader_snippet()).collect();
203
204 let derived_for = input.ident;
205 let generics = input.generics;
206
207 (quote! {
208
209 impl #generics ::parquet::record::RecordReader<#derived_for #generics> for Vec<#derived_for #generics> {
210 fn read_from_row_group(
211 &mut self,
212 row_group_reader: &mut dyn ::parquet::file::reader::RowGroupReader,
213 num_records: usize,
214 ) -> Result<(), ::parquet::errors::ParquetError> {
215 use ::parquet::column::reader::ColumnReader;
216
217 let mut row_group_reader = row_group_reader;
218
219 let mut name_to_index = std::collections::HashMap::new();
221 for (idx, col) in row_group_reader.metadata().schema_descr().columns().iter().enumerate() {
222 name_to_index.insert(col.name().to_string(), idx);
223 }
224
225 for _ in 0..num_records {
226 self.push(#derived_for {
227 #(
228 #field_names: Default::default()
229 ),*
230 })
231 }
232
233 let records = self; #(
236 {
237 let idx: usize = match name_to_index.get(stringify!(#field_names)) {
238 Some(&col_idx) => col_idx,
239 None => {
240 let error_msg = format!("column name '{}' is not found in parquet file!", stringify!(#field_names));
241 return Err(::parquet::errors::ParquetError::General(error_msg));
242 }
243 };
244 if let Ok(mut column_reader) = row_group_reader.get_column_reader(idx) {
245 #reader_snippets
246 } else {
247 return Err(::parquet::errors::ParquetError::General("Failed to get next column".into()))
248 }
249 }
250 );*
251
252 Ok(())
253 }
254 }
255 }).into()
256}