1use crate::compression::CompressionCodec;
24use crate::reader::Decoder;
25use crate::reader::block::{BlockDecoder, BlockDecoderState};
26use arrow_array::RecordBatch;
27use arrow_schema::{ArrowError, SchemaRef};
28use bytes::Bytes;
29use futures::future::BoxFuture;
30use futures::{FutureExt, Stream};
31use std::mem;
32use std::ops::Range;
33use std::pin::Pin;
34use std::task::{Context, Poll};
35
36mod async_file_reader;
37mod builder;
38
39pub use async_file_reader::AsyncFileReader;
40pub use builder::{ReaderBuilder, read_header_info};
41
42#[cfg(feature = "object_store")]
43mod store;
44
45use crate::errors::AvroError;
46#[cfg(feature = "object_store")]
47pub use store::AvroObjectReader;
48
49enum FetchNextBehaviour {
50 ReadSyncMarker,
52 DecodeVLQHeader,
54 ContinueDecoding,
56}
57
58enum ReaderState<R> {
59 InvalidState,
61 Idle { reader: R },
63 FetchingData {
65 future: BoxFuture<'static, Result<(R, Bytes), AvroError>>,
66 next_behaviour: FetchNextBehaviour,
67 },
68 DecodingBlock { data: Bytes, reader: R },
70 ReadingBatches {
72 data: Bytes,
73 block_data: Bytes,
74 remaining_in_block: usize,
75 reader: R,
76 },
77 Flushing,
80 Finished,
82}
83
84pub struct AsyncAvroFileReader<R> {
138 range: Range<u64>,
140 file_size: u64,
141
142 decoder: Decoder,
144 block_decoder: BlockDecoder,
145 codec: Option<CompressionCodec>,
146 sync_marker: [u8; 16],
147
148 reader_state: ReaderState<R>,
150 finishing_partial_block: bool,
151}
152
153impl<R> AsyncAvroFileReader<R> {
154 pub fn builder(reader: R, file_size: u64, batch_size: usize) -> ReaderBuilder<R> {
156 ReaderBuilder::new(reader, file_size, batch_size)
157 }
158
159 fn new(
160 range: Range<u64>,
161 file_size: u64,
162 decoder: Decoder,
163 codec: Option<CompressionCodec>,
164 sync_marker: [u8; 16],
165 reader_state: ReaderState<R>,
166 ) -> Self {
167 Self {
168 range,
169 file_size,
170
171 decoder,
172 block_decoder: Default::default(),
173 codec,
174 sync_marker,
175
176 reader_state,
177 finishing_partial_block: false,
178 }
179 }
180
181 pub fn schema(&self) -> SchemaRef {
185 self.decoder.schema()
186 }
187
188 fn remaining_block_range(&self) -> Result<Range<u64>, AvroError> {
192 let remaining = self.block_decoder.bytes_remaining() as u64
193 + match self.block_decoder.state() {
194 BlockDecoderState::Data => 16, BlockDecoderState::Sync => 0,
196 state => {
197 return Err(AvroError::General(format!(
198 "remaining_block_range called in unexpected state: {state:?}"
199 )));
200 }
201 };
202
203 let fetch_end = self.range.end + remaining;
204 if fetch_end > self.file_size {
205 return Err(AvroError::EOF(
206 "Avro block requires more bytes than what exists in the file".into(),
207 ));
208 }
209
210 Ok(self.range.end..fetch_end)
211 }
212
213 #[inline]
215 fn finish_with_error(
216 &mut self,
217 error: AvroError,
218 ) -> Poll<Option<Result<RecordBatch, AvroError>>> {
219 self.reader_state = ReaderState::Finished;
220 Poll::Ready(Some(Err(error)))
221 }
222
223 #[inline]
224 fn start_flushing(&mut self) {
225 self.reader_state = ReaderState::Flushing;
226 }
227
228 #[inline]
230 fn poll_flush(&mut self) -> Poll<Option<Result<RecordBatch, AvroError>>> {
231 match self.decoder.flush() {
232 Ok(Some(batch)) => {
233 self.reader_state = ReaderState::Flushing;
234 Poll::Ready(Some(Ok(batch)))
235 }
236 Ok(None) => {
237 self.reader_state = ReaderState::Finished;
238 Poll::Ready(None)
239 }
240 Err(e) => self.finish_with_error(e),
241 }
242 }
243}
244
245impl<R: AsyncFileReader + Unpin + 'static> AsyncAvroFileReader<R> {
246 async fn fetch_bytes(mut reader: R, range: Range<u64>) -> Result<(R, Bytes), AvroError> {
249 let data = reader.get_bytes(range).await?;
250 Ok((reader, data))
251 }
252
253 #[forbid(clippy::question_mark_used)]
254 fn read_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<RecordBatch, AvroError>>> {
255 loop {
256 match mem::replace(&mut self.reader_state, ReaderState::InvalidState) {
257 ReaderState::Idle { reader } => {
258 let range = self.range.clone();
259 if range.start >= range.end {
260 return self.finish_with_error(AvroError::InvalidArgument(format!(
261 "Invalid range specified for Avro file: start {} >= end {}, file_size: {}",
262 range.start, range.end, self.file_size
263 )));
264 }
265
266 let future = Self::fetch_bytes(reader, range).boxed();
267 self.reader_state = ReaderState::FetchingData {
268 future,
269 next_behaviour: FetchNextBehaviour::ReadSyncMarker,
270 };
271 }
272 ReaderState::FetchingData {
273 mut future,
274 next_behaviour,
275 } => {
276 let (reader, data_chunk) = match future.poll_unpin(cx) {
277 Poll::Ready(Ok(data)) => data,
278 Poll::Ready(Err(e)) => return self.finish_with_error(e),
279 Poll::Pending => {
280 self.reader_state = ReaderState::FetchingData {
281 future,
282 next_behaviour,
283 };
284 return Poll::Pending;
285 }
286 };
287
288 match next_behaviour {
289 FetchNextBehaviour::ReadSyncMarker => {
290 let sync_marker_pos = data_chunk
291 .windows(16)
292 .position(|slice| slice == self.sync_marker);
293 let block_start = match sync_marker_pos {
294 Some(pos) => pos + 16, None => {
296 self.reader_state = ReaderState::Finished;
298 return Poll::Ready(None);
299 }
300 };
301
302 self.reader_state = ReaderState::DecodingBlock {
303 reader,
304 data: data_chunk.slice(block_start..),
305 };
306 }
307 FetchNextBehaviour::DecodeVLQHeader => {
308 let mut data = data_chunk;
309
310 while !matches!(self.block_decoder.state(), BlockDecoderState::Data) {
312 if data.is_empty() {
313 return self.finish_with_error(AvroError::EOF(
314 "Unexpected EOF while reading Avro block header".into(),
315 ));
316 }
317 let consumed = match self.block_decoder.decode(&data[..1]) {
318 Ok(consumed) => consumed,
319 Err(e) => return self.finish_with_error(e),
320 };
321 if consumed == 0 {
322 return self.finish_with_error(AvroError::General(
323 "BlockDecoder failed to consume byte during VLQ header parsing"
324 .into(),
325 ));
326 }
327 data = data.slice(consumed..);
328 }
329
330 let bytes_remaining = self.block_decoder.bytes_remaining();
332 let data_to_use = data.slice(..data.len().min(bytes_remaining));
333 let consumed = match self.block_decoder.decode(&data_to_use) {
334 Ok(consumed) => consumed,
335 Err(e) => return self.finish_with_error(e),
336 };
337 if consumed != data_to_use.len() {
338 return self.finish_with_error(AvroError::General(
339 "BlockDecoder failed to consume all bytes after VLQ header parsing"
340 .into(),
341 ));
342 }
343
344 let range_to_fetch = match self.remaining_block_range() {
346 Ok(range) if range.is_empty() => {
347 self.reader_state = ReaderState::DecodingBlock {
349 reader,
350 data: Bytes::new(),
351 };
352 continue;
353 }
354 Ok(range) => range,
355 Err(e) => return self.finish_with_error(e),
356 };
357
358 let future = Self::fetch_bytes(reader, range_to_fetch).boxed();
359 self.reader_state = ReaderState::FetchingData {
360 future,
361 next_behaviour: FetchNextBehaviour::ContinueDecoding,
362 };
363 continue;
364 }
365 FetchNextBehaviour::ContinueDecoding => {
366 self.reader_state = ReaderState::DecodingBlock {
367 reader,
368 data: data_chunk,
369 };
370 }
371 }
372 }
373 ReaderState::InvalidState => {
374 return self.finish_with_error(AvroError::General(
375 "AsyncAvroFileReader in invalid state".into(),
376 ));
377 }
378 ReaderState::DecodingBlock { reader, mut data } => {
379 let consumed = match self.block_decoder.decode(&data) {
381 Ok(consumed) => consumed,
382 Err(e) => return self.finish_with_error(e),
383 };
384 data = data.slice(consumed..);
385
386 if let Some(block) = self.block_decoder.flush() {
388 let block_count = block.count;
390 let block_data = Bytes::from_owner(if let Some(ref codec) = self.codec {
391 match codec.decompress(&block.data) {
392 Ok(decompressed) => decompressed,
393 Err(e) => return self.finish_with_error(e),
394 }
395 } else {
396 block.data
397 });
398
399 self.reader_state = ReaderState::ReadingBatches {
401 reader,
402 data,
403 block_data,
404 remaining_in_block: block_count,
405 };
406 continue;
407 }
408
409 if !data.is_empty() {
411 return self.finish_with_error(AvroError::General(
412 "BlockDecoder failed to make progress decoding Avro block".into(),
413 ));
414 }
415
416 if matches!(self.block_decoder.state(), BlockDecoderState::Finished) {
417 self.finishing_partial_block = false;
419 self.start_flushing();
420 continue;
421 }
422
423 if self.finishing_partial_block {
426 return self.finish_with_error(AvroError::EOF(
427 "Unexpected EOF while reading last Avro block".into(),
428 ));
429 }
430
431 self.finishing_partial_block = true;
437
438 if matches!(
440 self.block_decoder.state(),
441 BlockDecoderState::Count | BlockDecoderState::Size
442 ) {
443 const MAX_VLQ_HEADER_SIZE: u64 = 20;
446 let fetch_end = (self.range.end + MAX_VLQ_HEADER_SIZE).min(self.file_size);
447
448 if fetch_end == self.range.end {
450 return self.finish_with_error(AvroError::EOF(
451 "Unexpected EOF while reading Avro block header".into(),
452 ));
453 }
454
455 let range_to_fetch = self.range.end..fetch_end;
456 self.range.end = fetch_end; let future = Self::fetch_bytes(reader, range_to_fetch).boxed();
459 self.reader_state = ReaderState::FetchingData {
460 future,
461 next_behaviour: FetchNextBehaviour::DecodeVLQHeader,
462 };
463 continue;
464 }
465
466 let range_to_fetch = match self.remaining_block_range() {
468 Ok(range) => range,
469 Err(e) => return self.finish_with_error(e),
470 };
471
472 let future = Self::fetch_bytes(reader, range_to_fetch).boxed();
473 self.reader_state = ReaderState::FetchingData {
474 future,
475 next_behaviour: FetchNextBehaviour::ContinueDecoding,
476 };
477 continue;
478 }
479 ReaderState::ReadingBatches {
480 reader,
481 data,
482 mut block_data,
483 mut remaining_in_block,
484 } => {
485 let (consumed, records_decoded) =
486 match self.decoder.decode_block(&block_data, remaining_in_block) {
487 Ok((consumed, records_decoded)) => (consumed, records_decoded),
488 Err(e) => return self.finish_with_error(e),
489 };
490
491 remaining_in_block -= records_decoded;
492
493 if remaining_in_block == 0 {
494 if data.is_empty() {
495 self.start_flushing();
497 } else {
498 self.reader_state = ReaderState::DecodingBlock { reader, data };
500 }
501 } else {
502 block_data = block_data.slice(consumed..);
504 self.reader_state = ReaderState::ReadingBatches {
505 reader,
506 data,
507 block_data,
508 remaining_in_block,
509 };
510 }
511
512 if self.decoder.batch_is_full() {
515 return match self.decoder.flush() {
516 Ok(Some(batch)) => Poll::Ready(Some(Ok(batch))),
517 Ok(None) => self.finish_with_error(AvroError::General(
518 "Decoder reported a full batch, but flush returned None".into(),
519 )),
520 Err(e) => self.finish_with_error(e),
521 };
522 }
523 }
524 ReaderState::Flushing => {
525 return self.poll_flush();
526 }
527 ReaderState::Finished => {
528 self.reader_state = ReaderState::Finished;
530 return Poll::Ready(None);
531 }
532 }
533 }
534 }
535}
536
537impl<R: AsyncFileReader + Unpin + 'static> Stream for AsyncAvroFileReader<R> {
539 type Item = Result<RecordBatch, ArrowError>;
540
541 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
542 self.read_next(cx).map_err(Into::into)
543 }
544}
545
546#[cfg(all(test, feature = "object_store"))]
547mod tests {
548 use super::*;
549 use crate::codec::Tz;
550 use crate::schema::{
551 AVRO_NAME_METADATA_KEY, AVRO_NAMESPACE_METADATA_KEY, AvroSchema, SCHEMA_METADATA_KEY,
552 };
553 use arrow_array::cast::AsArray;
554 use arrow_array::types::{Int32Type, Int64Type};
555 use arrow_array::*;
556 use arrow_schema::{DataType, Field, Schema, SchemaRef, TimeUnit};
557 use futures::{StreamExt, TryStreamExt};
558 use object_store::local::LocalFileSystem;
559 use object_store::path::Path;
560 use object_store::{ObjectStore, ObjectStoreExt};
561 use std::collections::HashMap;
562 use std::sync::Arc;
563
564 fn arrow_test_data(file: &str) -> String {
565 let base =
566 std::env::var("ARROW_TEST_DATA").unwrap_or_else(|_| "../testing/data".to_string());
567 format!("{}/{}", base, file)
568 }
569
570 fn get_alltypes_schema() -> SchemaRef {
571 get_alltypes_schema_with_tz("+00:00")
572 }
573
574 fn get_alltypes_schema_with_tz(tz_id: &str) -> SchemaRef {
575 let schema = Schema::new(vec![
576 Field::new("id", DataType::Int32, true),
577 Field::new("bool_col", DataType::Boolean, true),
578 Field::new("tinyint_col", DataType::Int32, true),
579 Field::new("smallint_col", DataType::Int32, true),
580 Field::new("int_col", DataType::Int32, true),
581 Field::new("bigint_col", DataType::Int64, true),
582 Field::new("float_col", DataType::Float32, true),
583 Field::new("double_col", DataType::Float64, true),
584 Field::new("date_string_col", DataType::Binary, true),
585 Field::new("string_col", DataType::Binary, true),
586 Field::new(
587 "timestamp_col",
588 DataType::Timestamp(TimeUnit::Microsecond, Some(tz_id.into())),
589 true,
590 ),
591 ])
592 .with_metadata(HashMap::from([(
593 SCHEMA_METADATA_KEY.into(),
594 r#"{
595 "type": "record",
596 "name": "topLevelRecord",
597 "fields": [
598 {
599 "name": "id",
600 "type": [
601 "int",
602 "null"
603 ]
604 },
605 {
606 "name": "bool_col",
607 "type": [
608 "boolean",
609 "null"
610 ]
611 },
612 {
613 "name": "tinyint_col",
614 "type": [
615 "int",
616 "null"
617 ]
618 },
619 {
620 "name": "smallint_col",
621 "type": [
622 "int",
623 "null"
624 ]
625 },
626 {
627 "name": "int_col",
628 "type": [
629 "int",
630 "null"
631 ]
632 },
633 {
634 "name": "bigint_col",
635 "type": [
636 "long",
637 "null"
638 ]
639 },
640 {
641 "name": "float_col",
642 "type": [
643 "float",
644 "null"
645 ]
646 },
647 {
648 "name": "double_col",
649 "type": [
650 "double",
651 "null"
652 ]
653 },
654 {
655 "name": "date_string_col",
656 "type": [
657 "bytes",
658 "null"
659 ]
660 },
661 {
662 "name": "string_col",
663 "type": [
664 "bytes",
665 "null"
666 ]
667 },
668 {
669 "name": "timestamp_col",
670 "type": [
671 {
672 "type": "long",
673 "logicalType": "timestamp-micros"
674 },
675 "null"
676 ]
677 }
678 ]
679}
680"#
681 .into(),
682 )]));
683 Arc::new(schema)
684 }
685
686 fn get_alltypes_with_nulls_schema() -> SchemaRef {
687 let schema = Schema::new(vec![
688 Field::new("string_col", DataType::Binary, true),
689 Field::new("int_col", DataType::Int32, true),
690 Field::new("bool_col", DataType::Boolean, true),
691 Field::new("bigint_col", DataType::Int64, true),
692 Field::new("float_col", DataType::Float32, true),
693 Field::new("double_col", DataType::Float64, true),
694 Field::new("bytes_col", DataType::Binary, true),
695 ])
696 .with_metadata(HashMap::from([(
697 SCHEMA_METADATA_KEY.into(),
698 r#"{
699 "type": "record",
700 "name": "topLevelRecord",
701 "fields": [
702 {
703 "name": "string_col",
704 "type": [
705 "null",
706 "string"
707 ],
708 "default": null
709 },
710 {
711 "name": "int_col",
712 "type": [
713 "null",
714 "int"
715 ],
716 "default": null
717 },
718 {
719 "name": "bool_col",
720 "type": [
721 "null",
722 "boolean"
723 ],
724 "default": null
725 },
726 {
727 "name": "bigint_col",
728 "type": [
729 "null",
730 "long"
731 ],
732 "default": null
733 },
734 {
735 "name": "float_col",
736 "type": [
737 "null",
738 "float"
739 ],
740 "default": null
741 },
742 {
743 "name": "double_col",
744 "type": [
745 "null",
746 "double"
747 ],
748 "default": null
749 },
750 {
751 "name": "bytes_col",
752 "type": [
753 "null",
754 "bytes"
755 ],
756 "default": null
757 }
758 ]
759}"#
760 .into(),
761 )]));
762
763 Arc::new(schema)
764 }
765
766 fn get_nested_records_schema() -> SchemaRef {
767 let schema = Schema::new(vec![
768 Field::new(
769 "f1",
770 DataType::Struct(
771 vec![
772 Field::new("f1_1", DataType::Utf8, false),
773 Field::new("f1_2", DataType::Int32, false),
774 Field::new(
775 "f1_3",
776 DataType::Struct(
777 vec![Field::new("f1_3_1", DataType::Float64, false)].into(),
778 ),
779 false,
780 )
781 .with_metadata(HashMap::from([
782 (AVRO_NAMESPACE_METADATA_KEY.to_owned(), "ns3".to_owned()),
783 (AVRO_NAME_METADATA_KEY.to_owned(), "record3".to_owned()),
784 ])),
785 ]
786 .into(),
787 ),
788 false,
789 )
790 .with_metadata(HashMap::from([
791 (AVRO_NAMESPACE_METADATA_KEY.to_owned(), "ns2".to_owned()),
792 (AVRO_NAME_METADATA_KEY.to_owned(), "record2".to_owned()),
793 ])),
794 Field::new(
795 "f2",
796 DataType::List(Arc::new(
797 Field::new(
798 "item",
799 DataType::Struct(
800 vec![
801 Field::new("f2_1", DataType::Boolean, false),
802 Field::new("f2_2", DataType::Float32, false),
803 ]
804 .into(),
805 ),
806 false,
807 )
808 .with_metadata(HashMap::from([
809 (AVRO_NAMESPACE_METADATA_KEY.to_owned(), "ns4".to_owned()),
810 (AVRO_NAME_METADATA_KEY.to_owned(), "record4".to_owned()),
811 ])),
812 )),
813 false,
814 ),
815 Field::new(
816 "f3",
817 DataType::Struct(vec![Field::new("f3_1", DataType::Utf8, false)].into()),
818 true,
819 )
820 .with_metadata(HashMap::from([
821 (AVRO_NAMESPACE_METADATA_KEY.to_owned(), "ns5".to_owned()),
822 (AVRO_NAME_METADATA_KEY.to_owned(), "record5".to_owned()),
823 ])),
824 Field::new(
825 "f4",
826 DataType::List(Arc::new(
827 Field::new(
828 "item",
829 DataType::Struct(vec![Field::new("f4_1", DataType::Int64, false)].into()),
830 true,
831 )
832 .with_metadata(HashMap::from([
833 (AVRO_NAMESPACE_METADATA_KEY.to_owned(), "ns6".to_owned()),
834 (AVRO_NAME_METADATA_KEY.to_owned(), "record6".to_owned()),
835 ])),
836 )),
837 false,
838 ),
839 ])
840 .with_metadata(HashMap::from([(
841 SCHEMA_METADATA_KEY.into(),
842 r#"{
843 "type": "record",
844 "namespace": "ns1",
845 "name": "record1",
846 "fields": [
847 {
848 "name": "f1",
849 "type": {
850 "type": "record",
851 "namespace": "ns2",
852 "name": "record2",
853 "fields": [
854 {
855 "name": "f1_1",
856 "type": "string"
857 },
858 {
859 "name": "f1_2",
860 "type": "int"
861 },
862 {
863 "name": "f1_3",
864 "type": {
865 "type": "record",
866 "namespace": "ns3",
867 "name": "record3",
868 "fields": [
869 {
870 "name": "f1_3_1",
871 "type": "double"
872 }
873 ]
874 }
875 }
876 ]
877 }
878 },
879 {
880 "name": "f2",
881 "type": {
882 "type": "array",
883 "items": {
884 "type": "record",
885 "namespace": "ns4",
886 "name": "record4",
887 "fields": [
888 {
889 "name": "f2_1",
890 "type": "boolean"
891 },
892 {
893 "name": "f2_2",
894 "type": "float"
895 }
896 ]
897 }
898 }
899 },
900 {
901 "name": "f3",
902 "type": [
903 "null",
904 {
905 "type": "record",
906 "namespace": "ns5",
907 "name": "record5",
908 "fields": [
909 {
910 "name": "f3_1",
911 "type": "string"
912 }
913 ]
914 }
915 ],
916 "default": null
917 },
918 {
919 "name": "f4",
920 "type": {
921 "type": "array",
922 "items": [
923 "null",
924 {
925 "type": "record",
926 "namespace": "ns6",
927 "name": "record6",
928 "fields": [
929 {
930 "name": "f4_1",
931 "type": "long"
932 }
933 ]
934 }
935 ]
936 }
937 }
938 ]
939}
940"#
941 .into(),
942 )]));
943
944 Arc::new(schema)
945 }
946
947 async fn read_async_file(
948 path: &str,
949 batch_size: usize,
950 range: Option<Range<u64>>,
951 schema: Option<SchemaRef>,
952 projection: Option<Vec<usize>>,
953 ) -> Result<Vec<RecordBatch>, ArrowError> {
954 let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
955 let location = Path::from_filesystem_path(path).unwrap();
956
957 let file_size = store.head(&location).await.unwrap().size;
958
959 let file_reader = AvroObjectReader::new(store, location);
960 let mut builder = AsyncAvroFileReader::builder(file_reader, file_size, batch_size);
961
962 if let Some(s) = schema {
963 let reader_schema = AvroSchema::try_from(s.as_ref())?;
964 builder = builder.with_reader_schema(reader_schema);
965 }
966
967 if let Some(proj) = projection {
968 builder = builder.with_projection(proj);
969 }
970
971 if let Some(range) = range {
972 builder = builder.with_range(range);
973 }
974
975 let reader = builder.try_build().await?;
976 reader.try_collect().await
977 }
978
979 #[tokio::test]
980 async fn test_full_file_read() {
981 let file = arrow_test_data("avro/alltypes_plain.avro");
982 let schema = get_alltypes_schema();
983 let batches = read_async_file(&file, 1024, None, Some(schema), None)
984 .await
985 .unwrap();
986 let batch = &batches[0];
987
988 assert_eq!(batch.num_rows(), 8);
989 assert_eq!(batch.num_columns(), 11);
990
991 let id_array = batch
992 .column(0)
993 .as_any()
994 .downcast_ref::<Int32Array>()
995 .unwrap();
996 assert_eq!(id_array.value(0), 4);
997 assert_eq!(id_array.value(7), 1);
998 }
999
1000 #[tokio::test]
1001 async fn test_small_batch_size() {
1002 let file = arrow_test_data("avro/alltypes_plain.avro");
1003 let schema = get_alltypes_schema();
1004 let batches = read_async_file(&file, 2, None, Some(schema), None)
1005 .await
1006 .unwrap();
1007 assert_eq!(batches.len(), 4);
1008
1009 let batch = &batches[0];
1010
1011 assert_eq!(batch.num_rows(), 2);
1012 assert_eq!(batch.num_columns(), 11);
1013 }
1014
1015 #[tokio::test]
1016 async fn test_batch_size_one() {
1017 let file = arrow_test_data("avro/alltypes_plain.avro");
1018 let schema = get_alltypes_schema();
1019 let batches = read_async_file(&file, 1, None, Some(schema), None)
1020 .await
1021 .unwrap();
1022 let batch = &batches[0];
1023
1024 assert_eq!(batches.len(), 8);
1025 assert_eq!(batch.num_rows(), 1);
1026 }
1027
1028 #[tokio::test]
1029 async fn test_batch_size_larger_than_file() {
1030 let file = arrow_test_data("avro/alltypes_plain.avro");
1031 let schema = get_alltypes_schema();
1032 let batches = read_async_file(&file, 10000, None, Some(schema), None)
1033 .await
1034 .unwrap();
1035 let batch = &batches[0];
1036
1037 assert_eq!(batch.num_rows(), 8);
1038 }
1039
1040 #[tokio::test]
1041 async fn test_empty_range() {
1042 let file = arrow_test_data("avro/alltypes_plain.avro");
1043 let range = 100..100;
1044 let schema = get_alltypes_schema();
1045 let batches = read_async_file(&file, 1024, Some(range), Some(schema), None)
1046 .await
1047 .unwrap();
1048 assert_eq!(batches.len(), 0);
1049 }
1050
1051 #[tokio::test]
1052 async fn test_range_starting_at_zero() {
1053 let file = arrow_test_data("avro/alltypes_plain.avro");
1055 let store = Arc::new(LocalFileSystem::new());
1056 let location = Path::from_filesystem_path(&file).unwrap();
1057 let meta = store.head(&location).await.unwrap();
1058
1059 let range = 0..meta.size;
1060 let schema = get_alltypes_schema();
1061 let batches = read_async_file(&file, 1024, Some(range), Some(schema), None)
1062 .await
1063 .unwrap();
1064 let batch = &batches[0];
1065
1066 assert_eq!(batch.num_rows(), 8);
1067 }
1068
1069 #[tokio::test]
1070 async fn test_range_after_header() {
1071 let file = arrow_test_data("avro/alltypes_plain.avro");
1072 let store = Arc::new(LocalFileSystem::new());
1073 let location = Path::from_filesystem_path(&file).unwrap();
1074 let meta = store.head(&location).await.unwrap();
1075
1076 let range = 100..meta.size;
1077 let schema = get_alltypes_schema();
1078 let batches = read_async_file(&file, 1024, Some(range), Some(schema), None)
1079 .await
1080 .unwrap();
1081 let batch = &batches[0];
1082
1083 assert!(batch.num_rows() > 0);
1084 }
1085
1086 #[tokio::test]
1087 async fn test_range_no_sync_marker() {
1088 let file = arrow_test_data("avro/alltypes_plain.avro");
1090 let range = 50..150;
1091 let schema = get_alltypes_schema();
1092 let batches = read_async_file(&file, 1024, Some(range), Some(schema), None)
1093 .await
1094 .unwrap();
1095 assert_eq!(batches.len(), 0);
1096 }
1097
1098 #[tokio::test]
1099 async fn test_range_starting_mid_file() {
1100 let file = arrow_test_data("avro/alltypes_plain.avro");
1101
1102 let range = 700..768; let schema = get_alltypes_schema();
1104 let batches = read_async_file(&file, 1024, Some(range), Some(schema), None)
1105 .await
1106 .unwrap();
1107 assert_eq!(batches.len(), 0);
1108 }
1109
1110 #[tokio::test]
1111 async fn test_range_ending_at_file_size() {
1112 let file = arrow_test_data("avro/alltypes_plain.avro");
1113 let store = Arc::new(LocalFileSystem::new());
1114 let location = Path::from_filesystem_path(&file).unwrap();
1115 let meta = store.head(&location).await.unwrap();
1116
1117 let range = 200..meta.size;
1118 let schema = get_alltypes_schema();
1119 let batches = read_async_file(&file, 1024, Some(range), Some(schema), None)
1120 .await
1121 .unwrap();
1122 let batch = &batches[0];
1123
1124 assert_eq!(batch.num_rows(), 8);
1125 }
1126
1127 #[tokio::test]
1128 async fn test_incomplete_block_requires_fetch() {
1129 let file = arrow_test_data("avro/alltypes_plain.avro");
1131 let range = 0..1200;
1132 let schema = get_alltypes_schema();
1133 let batches = read_async_file(&file, 1024, Some(range), Some(schema), None)
1134 .await
1135 .unwrap();
1136 let batch = &batches[0];
1137
1138 assert_eq!(batch.num_rows(), 8)
1139 }
1140
1141 #[tokio::test]
1142 async fn test_partial_vlq_header_requires_fetch() {
1143 let file = arrow_test_data("avro/alltypes_plain.avro");
1145 let range = 16..676; let schema = get_alltypes_schema();
1147 let batches = read_async_file(&file, 1024, Some(range), Some(schema), None)
1148 .await
1149 .unwrap();
1150 let batch = &batches[0];
1151
1152 assert_eq!(batch.num_rows(), 8)
1153 }
1154
1155 #[cfg(feature = "snappy")]
1156 #[tokio::test]
1157 async fn test_snappy_compressed_with_range() {
1158 {
1159 let file = arrow_test_data("avro/alltypes_plain.snappy.avro");
1160 let store = Arc::new(LocalFileSystem::new());
1161 let location = Path::from_filesystem_path(&file).unwrap();
1162 let meta = store.head(&location).await.unwrap();
1163
1164 let range = 200..meta.size;
1165 let schema = get_alltypes_schema();
1166 let batches = read_async_file(&file, 1024, Some(range), Some(schema), None)
1167 .await
1168 .unwrap();
1169 let batch = &batches[0];
1170
1171 assert!(batch.num_rows() > 0);
1172 }
1173 }
1174
1175 #[tokio::test]
1176 async fn test_nulls() {
1177 let file = arrow_test_data("avro/alltypes_nulls_plain.avro");
1178 let schema = get_alltypes_with_nulls_schema();
1179 let batches = read_async_file(&file, 1024, None, Some(schema), None)
1180 .await
1181 .unwrap();
1182 let batch = &batches[0];
1183
1184 assert_eq!(batch.num_rows(), 1);
1185 for col in batch.columns() {
1186 assert!(col.is_null(0));
1187 }
1188 }
1189
1190 #[tokio::test]
1191 async fn test_nested_records() {
1192 let file = arrow_test_data("avro/nested_records.avro");
1193 let schema = get_nested_records_schema();
1194 let batches = read_async_file(&file, 1024, None, Some(schema), None)
1195 .await
1196 .unwrap();
1197 let batch = &batches[0];
1198
1199 assert_eq!(batch.num_rows(), 2);
1200 assert!(batch.num_columns() > 0);
1201 }
1202
1203 #[tokio::test]
1204 async fn test_stream_produces_multiple_batches() {
1205 let file = arrow_test_data("avro/alltypes_plain.avro");
1206 let store = Arc::new(LocalFileSystem::new());
1207 let location = Path::from_filesystem_path(&file).unwrap();
1208
1209 let file_size = store.head(&location).await.unwrap().size;
1210
1211 let file_reader = AvroObjectReader::new(store, location);
1212 let schema = get_alltypes_schema();
1213 let reader_schema = AvroSchema::try_from(schema.as_ref()).unwrap();
1214 let reader = AsyncAvroFileReader::builder(
1215 file_reader,
1216 file_size,
1217 2, )
1219 .with_reader_schema(reader_schema)
1220 .try_build()
1221 .await
1222 .unwrap();
1223
1224 let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
1225
1226 assert!(batches.len() > 1);
1227 let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
1228 assert_eq!(total_rows, 8);
1229 }
1230
1231 #[tokio::test]
1232 async fn test_stream_early_termination() {
1233 let file = arrow_test_data("avro/alltypes_plain.avro");
1234 let store = Arc::new(LocalFileSystem::new());
1235 let location = Path::from_filesystem_path(&file).unwrap();
1236
1237 let file_size = store.head(&location).await.unwrap().size;
1238
1239 let file_reader = AvroObjectReader::new(store, location);
1240 let schema = get_alltypes_schema();
1241 let reader_schema = AvroSchema::try_from(schema.as_ref()).unwrap();
1242 let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1)
1243 .with_reader_schema(reader_schema)
1244 .try_build()
1245 .await
1246 .unwrap();
1247
1248 let first_batch = reader.take(1).try_collect::<Vec<_>>().await.unwrap();
1249
1250 assert_eq!(first_batch.len(), 1);
1251 assert!(first_batch[0].num_rows() > 0);
1252 }
1253
1254 #[tokio::test]
1255 async fn test_various_batch_sizes() {
1256 let file = arrow_test_data("avro/alltypes_plain.avro");
1257
1258 for batch_size in [1, 2, 3, 5, 7, 11, 100] {
1259 let schema = get_alltypes_schema();
1260 let batches = read_async_file(&file, batch_size, None, Some(schema), None)
1261 .await
1262 .unwrap();
1263 let batch = &batches[0];
1264
1265 assert_eq!(
1267 batch.num_rows(),
1268 batch_size.min(8),
1269 "Failed with batch_size={}",
1270 batch_size
1271 );
1272 }
1273 }
1274
1275 #[tokio::test]
1276 async fn test_range_larger_than_file() {
1277 let file = arrow_test_data("avro/alltypes_plain.avro");
1278 let store = Arc::new(LocalFileSystem::new());
1279 let location = Path::from_filesystem_path(&file).unwrap();
1280 let meta = store.head(&location).await.unwrap();
1281
1282 let range = 100..(meta.size + 1000);
1284 let schema = get_alltypes_schema();
1285 let batches = read_async_file(&file, 1024, Some(range), Some(schema), None)
1286 .await
1287 .unwrap();
1288 let batch = &batches[0];
1289
1290 assert_eq!(batch.num_rows(), 8);
1292 }
1293
1294 #[tokio::test]
1295 async fn test_builder_with_header_info() {
1296 let file = arrow_test_data("avro/alltypes_plain.avro");
1297 let store = Arc::new(LocalFileSystem::new());
1298 let location = Path::from_filesystem_path(&file).unwrap();
1299
1300 let file_size = store.head(&location).await.unwrap().size;
1301
1302 let mut file_reader = AvroObjectReader::new(store, location);
1303
1304 let header_info = read_header_info(&mut file_reader, file_size, None)
1305 .await
1306 .unwrap();
1307
1308 assert_eq!(header_info.header_len(), 675);
1309
1310 let writer_schema = header_info.writer_schema().unwrap();
1311 let expected_avro_json: serde_json::Value = serde_json::from_str(
1312 get_alltypes_schema()
1313 .metadata()
1314 .get(SCHEMA_METADATA_KEY)
1315 .unwrap(),
1316 )
1317 .unwrap();
1318 let actual_avro_json: serde_json::Value =
1319 serde_json::from_str(&writer_schema.json_string).unwrap();
1320 assert_eq!(actual_avro_json, expected_avro_json);
1321
1322 let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024)
1323 .build_with_header(header_info)
1324 .unwrap();
1325
1326 let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
1327
1328 let batch = &batches[0];
1329 assert_eq!(batch.num_rows(), 8)
1330 }
1331
1332 #[tokio::test]
1333 async fn test_roundtrip_write_then_async_read() {
1334 use crate::writer::AvroWriter;
1335 use arrow_array::{Float64Array, StringArray};
1336 use std::fs::File;
1337 use std::io::BufWriter;
1338 use tempfile::tempdir;
1339
1340 let schema = Arc::new(Schema::new(vec![
1342 Field::new("id", DataType::Int32, false),
1343 Field::new("name", DataType::Utf8, true),
1344 Field::new("score", DataType::Float64, true),
1345 Field::new("count", DataType::Int64, false),
1346 ]));
1347
1348 let dir = tempdir().unwrap();
1349 let file_path = dir.path().join("roundtrip_test.avro");
1350
1351 {
1353 let file = File::create(&file_path).unwrap();
1354 let writer = BufWriter::new(file);
1355 let mut avro_writer = AvroWriter::new(writer, schema.as_ref().clone()).unwrap();
1356
1357 let batch1 = RecordBatch::try_new(
1359 schema.clone(),
1360 vec![
1361 Arc::new(Int32Array::from(vec![1, 2, 3])),
1362 Arc::new(StringArray::from(vec![
1363 Some("alice"),
1364 None,
1365 Some("charlie"),
1366 ])),
1367 Arc::new(Float64Array::from(vec![Some(95.5), Some(87.3), None])),
1368 Arc::new(Int64Array::from(vec![10, 20, 30])),
1369 ],
1370 )
1371 .unwrap();
1372 avro_writer.write(&batch1).unwrap();
1373
1374 let batch2 = RecordBatch::try_new(
1376 schema.clone(),
1377 vec![
1378 Arc::new(Int32Array::from(vec![4, 5])),
1379 Arc::new(StringArray::from(vec![Some("diana"), Some("eve")])),
1380 Arc::new(Float64Array::from(vec![None, Some(88.0)])),
1381 Arc::new(Int64Array::from(vec![40, 50])),
1382 ],
1383 )
1384 .unwrap();
1385 avro_writer.write(&batch2).unwrap();
1386
1387 avro_writer.finish().unwrap();
1388 }
1389
1390 let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
1392 let location = Path::from_filesystem_path(&file_path).unwrap();
1393 let file_size = store.head(&location).await.unwrap().size;
1394
1395 let file_reader = AvroObjectReader::new(store, location);
1396 let reader = AsyncAvroFileReader::builder(file_reader, file_size, 2)
1397 .try_build()
1398 .await
1399 .unwrap();
1400
1401 let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
1402
1403 assert!(
1405 batches.len() > 1,
1406 "Expected multiple batches with batch_size=2"
1407 );
1408
1409 let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
1411 assert_eq!(total_rows, 5);
1412
1413 let combined = arrow::compute::concat_batches(&batches[0].schema(), &batches).unwrap();
1415 assert_eq!(combined.num_rows(), 5);
1416 assert_eq!(combined.num_columns(), 4);
1417
1418 let id_array = combined
1420 .column(0)
1421 .as_any()
1422 .downcast_ref::<Int32Array>()
1423 .unwrap();
1424 assert_eq!(id_array.values(), &[1, 2, 3, 4, 5]);
1425
1426 let name_col = combined.column(1);
1429 let name_array = name_col.as_string::<i32>();
1430 assert_eq!(name_array.value(0), "alice");
1431 assert!(name_col.is_null(1)); assert_eq!(name_array.value(2), "charlie");
1433
1434 let score_array = combined
1436 .column(2)
1437 .as_any()
1438 .downcast_ref::<Float64Array>()
1439 .unwrap();
1440 assert!(!score_array.is_null(0));
1441 assert!((score_array.value(0) - 95.5).abs() < f64::EPSILON);
1442 assert!(score_array.is_null(2)); assert!(score_array.is_null(3)); assert!(!score_array.is_null(4));
1445 assert!((score_array.value(4) - 88.0).abs() < f64::EPSILON);
1446
1447 let count_array = combined
1449 .column(3)
1450 .as_any()
1451 .downcast_ref::<Int64Array>()
1452 .unwrap();
1453 assert_eq!(count_array.values(), &[10, 20, 30, 40, 50]);
1454 }
1455
1456 #[tokio::test]
1457 async fn test_alltypes_no_schema_no_projection() {
1458 let file = arrow_test_data("avro/alltypes_plain.avro");
1460 let batches = read_async_file(&file, 1024, None, None, None)
1461 .await
1462 .unwrap();
1463 let batch = &batches[0];
1464
1465 assert_eq!(batch.num_rows(), 8);
1466 assert_eq!(batch.num_columns(), 11);
1467 assert_eq!(batch.schema().field(0).name(), "id");
1468 }
1469
1470 #[tokio::test]
1471 async fn test_alltypes_no_schema_with_projection() {
1472 let file = arrow_test_data("avro/alltypes_plain.avro");
1474 let batches = read_async_file(&file, 1024, None, None, Some(vec![2, 0, 5]))
1476 .await
1477 .unwrap();
1478 let batch = &batches[0];
1479
1480 assert_eq!(batch.num_rows(), 8);
1481 assert_eq!(batch.num_columns(), 3);
1482 assert_eq!(batch.schema().field(0).name(), "tinyint_col");
1483 assert_eq!(batch.schema().field(1).name(), "id");
1484 assert_eq!(batch.schema().field(2).name(), "bigint_col");
1485
1486 let tinyint_col = batch.column(0).as_primitive::<Int32Type>();
1488 assert_eq!(tinyint_col.values(), &[0, 1, 0, 1, 0, 1, 0, 1]);
1489
1490 let id = batch.column(1).as_primitive::<Int32Type>();
1491 assert_eq!(id.values(), &[4, 5, 6, 7, 2, 3, 0, 1]);
1492
1493 let bigint_col = batch.column(2).as_primitive::<Int64Type>();
1494 assert_eq!(bigint_col.values(), &[0, 10, 0, 10, 0, 10, 0, 10]);
1495 }
1496
1497 #[tokio::test]
1498 async fn test_alltypes_with_schema_no_projection() {
1499 let file = arrow_test_data("avro/alltypes_plain.avro");
1501 let schema = get_alltypes_schema();
1502 let batches = read_async_file(&file, 1024, None, Some(schema), None)
1503 .await
1504 .unwrap();
1505 let batch = &batches[0];
1506
1507 assert_eq!(batch.num_rows(), 8);
1508 assert_eq!(batch.num_columns(), 11);
1509 }
1510
1511 #[tokio::test]
1512 async fn test_alltypes_with_schema_with_projection() {
1513 let file = arrow_test_data("avro/alltypes_plain.avro");
1515 let schema = get_alltypes_schema();
1516 let batches = read_async_file(&file, 1024, None, Some(schema), Some(vec![1, 0]))
1518 .await
1519 .unwrap();
1520 let batch = &batches[0];
1521
1522 assert_eq!(batch.num_rows(), 8);
1523 assert_eq!(batch.num_columns(), 2);
1524 assert_eq!(batch.schema().field(0).name(), "bool_col");
1525 assert_eq!(batch.schema().field(1).name(), "id");
1526
1527 let bool_col = batch.column(0).as_boolean();
1528 assert!(bool_col.value(0));
1529 assert!(!bool_col.value(1));
1530
1531 let id = batch.column(1).as_primitive::<Int32Type>();
1532 assert_eq!(id.values(), &[4, 5, 6, 7, 2, 3, 0, 1]);
1533 }
1534
1535 #[tokio::test]
1536 async fn test_nested_no_schema_no_projection() {
1537 let file = arrow_test_data("avro/nested_records.avro");
1539 let batches = read_async_file(&file, 1024, None, None, None)
1540 .await
1541 .unwrap();
1542 let batch = &batches[0];
1543
1544 assert_eq!(batch.num_rows(), 2);
1545 assert_eq!(batch.num_columns(), 4);
1546 assert_eq!(batch.schema().field(0).name(), "f1");
1547 assert_eq!(batch.schema().field(1).name(), "f2");
1548 assert_eq!(batch.schema().field(2).name(), "f3");
1549 assert_eq!(batch.schema().field(3).name(), "f4");
1550 }
1551
1552 #[tokio::test]
1553 async fn test_nested_no_schema_with_projection() {
1554 let file = arrow_test_data("avro/nested_records.avro");
1556 let batches = read_async_file(&file, 1024, None, None, Some(vec![2, 0]))
1558 .await
1559 .unwrap();
1560 let batch = &batches[0];
1561
1562 assert_eq!(batch.num_rows(), 2);
1563 assert_eq!(batch.num_columns(), 2);
1564 assert_eq!(batch.schema().field(0).name(), "f3");
1565 assert_eq!(batch.schema().field(1).name(), "f1");
1566 }
1567
1568 #[tokio::test]
1569 async fn test_nested_with_schema_no_projection() {
1570 let file = arrow_test_data("avro/nested_records.avro");
1572 let schema = get_nested_records_schema();
1573 let batches = read_async_file(&file, 1024, None, Some(schema), None)
1574 .await
1575 .unwrap();
1576 let batch = &batches[0];
1577
1578 assert_eq!(batch.num_rows(), 2);
1579 assert_eq!(batch.num_columns(), 4);
1580 }
1581
1582 #[tokio::test]
1583 async fn test_nested_with_schema_with_projection() {
1584 let file = arrow_test_data("avro/nested_records.avro");
1586 let schema = get_nested_records_schema();
1587 let batches = read_async_file(&file, 1024, None, Some(schema), Some(vec![3, 1, 0]))
1589 .await
1590 .unwrap();
1591 let batch = &batches[0];
1592
1593 assert_eq!(batch.num_rows(), 2);
1594 assert_eq!(batch.num_columns(), 3);
1595 assert_eq!(batch.schema().field(0).name(), "f4");
1596 assert_eq!(batch.schema().field(1).name(), "f2");
1597 assert_eq!(batch.schema().field(2).name(), "f1");
1598 }
1599
1600 #[tokio::test]
1601 async fn test_projection_error_out_of_bounds() {
1602 let file = arrow_test_data("avro/alltypes_plain.avro");
1603 let err = read_async_file(&file, 1024, None, None, Some(vec![100]))
1605 .await
1606 .unwrap_err();
1607 assert!(matches!(err, ArrowError::AvroError(_)));
1608 assert!(err.to_string().contains("out of bounds"));
1609 }
1610
1611 #[tokio::test]
1612 async fn test_projection_error_duplicate_index() {
1613 let file = arrow_test_data("avro/alltypes_plain.avro");
1614 let err = read_async_file(&file, 1024, None, None, Some(vec![0, 0]))
1616 .await
1617 .unwrap_err();
1618 assert!(matches!(err, ArrowError::AvroError(_)));
1619 assert!(err.to_string().contains("Duplicate projection index"));
1620 }
1621
1622 #[tokio::test]
1623 async fn test_arrow_schema_from_reader_no_reader_schema() {
1624 let file = arrow_test_data("avro/alltypes_plain.avro");
1625 let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
1626 let location = Path::from_filesystem_path(&file).unwrap();
1627 let file_size = store.head(&location).await.unwrap().size;
1628
1629 let file_reader = AvroObjectReader::new(store, location);
1630 let expected_schema = get_alltypes_schema()
1631 .as_ref()
1632 .clone()
1633 .with_metadata(Default::default());
1634
1635 let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024)
1637 .try_build()
1638 .await
1639 .unwrap();
1640
1641 assert_eq!(reader.schema().as_ref(), &expected_schema);
1642
1643 let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
1644 let batch = &batches[0];
1645
1646 assert_eq!(batch.schema().as_ref(), &expected_schema);
1647 }
1648
1649 #[tokio::test]
1650 async fn test_arrow_schema_from_reader_with_reader_schema() {
1651 let file = arrow_test_data("avro/alltypes_plain.avro");
1652 let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
1653 let location = Path::from_filesystem_path(&file).unwrap();
1654 let file_size = store.head(&location).await.unwrap().size;
1655
1656 let file_reader = AvroObjectReader::new(store, location);
1657 let schema = get_alltypes_schema()
1658 .project(&[0, 1, 7])
1659 .unwrap()
1660 .with_metadata(Default::default());
1661 let reader_schema = AvroSchema::try_from(&schema).unwrap();
1662 let expected_schema = schema.clone();
1663
1664 let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024)
1666 .with_reader_schema(reader_schema)
1667 .try_build()
1668 .await
1669 .unwrap();
1670
1671 assert_eq!(reader.schema().as_ref(), &expected_schema);
1672
1673 let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
1674 let batch = &batches[0];
1675
1676 assert_eq!(batch.schema().as_ref(), &expected_schema);
1677 }
1678
1679 #[tokio::test]
1680 async fn test_arrow_schema_from_reader_nested_records() {
1681 let file = arrow_test_data("avro/nested_records.avro");
1682 let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
1683 let location = Path::from_filesystem_path(&file).unwrap();
1684 let file_size = store.head(&location).await.unwrap().size;
1685
1686 let file_reader = AvroObjectReader::new(store, location);
1687
1688 let expected_schema = get_nested_records_schema()
1691 .as_ref()
1692 .clone()
1693 .with_metadata(Default::default());
1694
1695 let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024)
1696 .try_build()
1697 .await
1698 .unwrap();
1699
1700 assert_eq!(reader.schema().as_ref(), &expected_schema);
1701
1702 let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
1703 let batch = &batches[0];
1704
1705 assert_eq!(batch.schema().as_ref(), &expected_schema);
1706 }
1707
1708 #[tokio::test]
1709 async fn test_with_header_size_hint_small() {
1710 let file = arrow_test_data("avro/alltypes_plain.avro");
1712 let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
1713 let location = Path::from_filesystem_path(&file).unwrap();
1714 let file_size = store.head(&location).await.unwrap().size;
1715
1716 let file_reader = AvroObjectReader::new(store, location);
1717 let schema = get_alltypes_schema();
1718 let reader_schema = AvroSchema::try_from(schema.as_ref()).unwrap();
1719
1720 let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024)
1722 .with_reader_schema(reader_schema)
1723 .with_header_size_hint(64)
1724 .try_build()
1725 .await
1726 .unwrap();
1727
1728 let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
1729 let batch = &batches[0];
1730
1731 assert_eq!(batch.num_rows(), 8);
1732 assert_eq!(batch.num_columns(), 11);
1733 }
1734
1735 #[tokio::test]
1736 async fn test_with_header_size_hint_large() {
1737 let file = arrow_test_data("avro/alltypes_plain.avro");
1739 let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
1740 let location = Path::from_filesystem_path(&file).unwrap();
1741 let file_size = store.head(&location).await.unwrap().size;
1742
1743 let file_reader = AvroObjectReader::new(store, location);
1744 let schema = get_alltypes_schema();
1745 let reader_schema = AvroSchema::try_from(schema.as_ref()).unwrap();
1746
1747 let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024)
1749 .with_reader_schema(reader_schema)
1750 .with_header_size_hint(64 * 1024)
1751 .try_build()
1752 .await
1753 .unwrap();
1754
1755 let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
1756 let batch = &batches[0];
1757
1758 assert_eq!(batch.num_rows(), 8);
1759 assert_eq!(batch.num_columns(), 11);
1760 }
1761
1762 #[tokio::test]
1763 async fn test_with_tz_utc() {
1764 let file = arrow_test_data("avro/alltypes_plain.avro");
1765 let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
1766 let location = Path::from_filesystem_path(&file).unwrap();
1767 let file_size = store.head(&location).await.unwrap().size;
1768
1769 let file_reader = AvroObjectReader::new(store, location);
1770 let schema = get_alltypes_schema_with_tz("UTC");
1771 let reader_schema = AvroSchema::try_from(schema.as_ref()).unwrap();
1772
1773 let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024)
1775 .with_reader_schema(reader_schema)
1776 .with_tz(Tz::Utc)
1777 .try_build()
1778 .await
1779 .unwrap();
1780
1781 let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
1782 let batch = &batches[0];
1783
1784 assert_eq!(batch.num_columns(), 11);
1785
1786 let schema = batch.schema();
1787 let ts_field = schema.field_with_name("timestamp_col").unwrap();
1788 assert!(
1789 matches!(
1790 ts_field.data_type(),
1791 DataType::Timestamp(TimeUnit::Microsecond, Some(tz)) if tz.as_ref() == "UTC"
1792 ),
1793 "expected Timestamp(Microsecond, Some(\"UTC\")), got {:?}",
1794 ts_field.data_type()
1795 );
1796 }
1797
1798 #[tokio::test]
1799 async fn test_with_utf8_view_enabled() {
1800 let file = arrow_test_data("avro/nested_records.avro");
1802 let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
1803 let location = Path::from_filesystem_path(&file).unwrap();
1804 let file_size = store.head(&location).await.unwrap().size;
1805
1806 let file_reader = AvroObjectReader::new(store, location);
1807
1808 let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024)
1809 .with_utf8_view(true)
1810 .try_build()
1811 .await
1812 .unwrap();
1813
1814 let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
1815 let batch = &batches[0];
1816
1817 assert_eq!(batch.num_rows(), 2);
1818
1819 let f1_col = batch.column(0);
1822 let f1_struct = f1_col.as_struct();
1823 let f1_1_field = f1_struct.column_by_name("f1_1").unwrap();
1824
1825 assert_eq!(f1_1_field.data_type(), &DataType::Utf8View);
1827 }
1828
1829 #[tokio::test]
1830 async fn test_with_utf8_view_disabled() {
1831 let file = arrow_test_data("avro/nested_records.avro");
1833 let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
1834 let location = Path::from_filesystem_path(&file).unwrap();
1835 let file_size = store.head(&location).await.unwrap().size;
1836
1837 let file_reader = AvroObjectReader::new(store, location);
1838
1839 let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024)
1840 .with_utf8_view(false)
1841 .try_build()
1842 .await
1843 .unwrap();
1844
1845 let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
1846 let batch = &batches[0];
1847
1848 assert_eq!(batch.num_rows(), 2);
1849
1850 let f1_col = batch.column(0);
1853 let f1_struct = f1_col.as_struct();
1854 let f1_1_field = f1_struct.column_by_name("f1_1").unwrap();
1855
1856 assert_eq!(f1_1_field.data_type(), &DataType::Utf8);
1857 }
1858
1859 #[tokio::test]
1860 async fn test_with_strict_mode_disabled_allows_null_second() {
1861 let file = arrow_test_data("avro/alltypes_nulls_plain.avro");
1864 let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
1865 let location = Path::from_filesystem_path(&file).unwrap();
1866 let file_size = store.head(&location).await.unwrap().size;
1867
1868 let file_reader = AvroObjectReader::new(store, location);
1869
1870 let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024)
1872 .with_strict_mode(false)
1873 .try_build()
1874 .await
1875 .unwrap();
1876
1877 let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
1878 assert_eq!(batches.len(), 1);
1879 assert_eq!(batches[0].num_rows(), 1);
1880 }
1881
1882 #[tokio::test]
1883 async fn test_with_strict_mode_enabled_rejects_null_second() {
1884 let file = arrow_test_data("avro/alltypes_plain.avro");
1887 let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
1888 let location = Path::from_filesystem_path(&file).unwrap();
1889 let file_size = store.head(&location).await.unwrap().size;
1890
1891 let file_reader = AvroObjectReader::new(store, location);
1892
1893 let result = AsyncAvroFileReader::builder(file_reader, file_size, 1024)
1895 .with_strict_mode(true)
1896 .try_build()
1897 .await;
1898
1899 match result {
1900 Ok(_) => panic!("Expected error for strict_mode with ['T', 'null'] union"),
1901 Err(err) => {
1902 assert!(
1903 err.to_string().contains("disallowed in strict_mode"),
1904 "Expected strict_mode error, got: {}",
1905 err
1906 );
1907 }
1908 }
1909 }
1910
1911 #[tokio::test]
1912 async fn test_with_strict_mode_enabled_valid_schema() {
1913 let file = arrow_test_data("avro/nested_records.avro");
1916 let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
1917 let location = Path::from_filesystem_path(&file).unwrap();
1918 let file_size = store.head(&location).await.unwrap().size;
1919
1920 let file_reader = AvroObjectReader::new(store, location);
1921
1922 let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024)
1924 .with_strict_mode(true)
1925 .try_build()
1926 .await
1927 .unwrap();
1928
1929 let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
1930 assert_eq!(batches.len(), 1);
1931 assert_eq!(batches[0].num_rows(), 2);
1932 }
1933
1934 #[tokio::test]
1935 async fn test_builder_options_combined() {
1936 let file = arrow_test_data("avro/nested_records.avro");
1938 let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
1939 let location = Path::from_filesystem_path(&file).unwrap();
1940 let file_size = store.head(&location).await.unwrap().size;
1941
1942 let file_reader = AvroObjectReader::new(store, location);
1943
1944 let reader = AsyncAvroFileReader::builder(file_reader, file_size, 2)
1945 .with_header_size_hint(128)
1946 .with_utf8_view(true)
1947 .with_strict_mode(true)
1948 .with_projection(vec![0, 2]) .try_build()
1950 .await
1951 .unwrap();
1952
1953 let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
1954 let batch = &batches[0];
1955
1956 assert_eq!(batch.num_columns(), 2);
1958 assert_eq!(batch.schema().field(0).name(), "f1");
1959 assert_eq!(batch.schema().field(1).name(), "f3");
1960
1961 let f1_col = batch.column(0);
1963 let f1_struct = f1_col.as_struct();
1964 let f1_1_field = f1_struct.column_by_name("f1_1").unwrap();
1965 assert_eq!(f1_1_field.data_type(), &DataType::Utf8View);
1966 }
1967}