1use crate::compression::CompressionCodec;
19use crate::reader::Decoder;
20use crate::reader::block::{BlockDecoder, BlockDecoderState};
21use arrow_array::RecordBatch;
22use arrow_schema::ArrowError;
23use bytes::Bytes;
24use futures::future::BoxFuture;
25use futures::{FutureExt, Stream};
26use std::mem;
27use std::ops::Range;
28use std::pin::Pin;
29use std::task::{Context, Poll};
30
31mod async_file_reader;
32mod builder;
33
34pub use async_file_reader::AsyncFileReader;
35pub use builder::ReaderBuilder;
36
37#[cfg(feature = "object_store")]
38mod store;
39
40use crate::errors::AvroError;
41#[cfg(feature = "object_store")]
42pub use store::AvroObjectReader;
43
44enum FetchNextBehaviour {
45 ReadSyncMarker,
47 DecodeVLQHeader,
49 ContinueDecoding,
51}
52
53enum ReaderState<R> {
54 InvalidState,
56 Idle { reader: R },
58 FetchingData {
60 future: BoxFuture<'static, Result<(R, Bytes), AvroError>>,
61 next_behaviour: FetchNextBehaviour,
62 },
63 DecodingBlock { data: Bytes, reader: R },
65 ReadingBatches {
67 data: Bytes,
68 block_data: Bytes,
69 remaining_in_block: usize,
70 reader: R,
71 },
72 Flushing,
75 Finished,
77}
78
79pub struct AsyncAvroFileReader<R> {
133 range: Range<u64>,
135 file_size: u64,
136
137 decoder: Decoder,
139 block_decoder: BlockDecoder,
140 codec: Option<CompressionCodec>,
141 sync_marker: [u8; 16],
142
143 reader_state: ReaderState<R>,
145 finishing_partial_block: bool,
146}
147
148impl<R> AsyncAvroFileReader<R> {
149 pub fn builder(reader: R, file_size: u64, batch_size: usize) -> ReaderBuilder<R> {
151 ReaderBuilder::new(reader, file_size, batch_size)
152 }
153
154 fn new(
155 range: Range<u64>,
156 file_size: u64,
157 decoder: Decoder,
158 codec: Option<CompressionCodec>,
159 sync_marker: [u8; 16],
160 reader_state: ReaderState<R>,
161 ) -> Self {
162 Self {
163 range,
164 file_size,
165
166 decoder,
167 block_decoder: Default::default(),
168 codec,
169 sync_marker,
170
171 reader_state,
172 finishing_partial_block: false,
173 }
174 }
175
176 fn remaining_block_range(&self) -> Result<Range<u64>, AvroError> {
180 let remaining = self.block_decoder.bytes_remaining() as u64
181 + match self.block_decoder.state() {
182 BlockDecoderState::Data => 16, BlockDecoderState::Sync => 0,
184 state => {
185 return Err(AvroError::General(format!(
186 "remaining_block_range called in unexpected state: {state:?}"
187 )));
188 }
189 };
190
191 let fetch_end = self.range.end + remaining;
192 if fetch_end > self.file_size {
193 return Err(AvroError::EOF(
194 "Avro block requires more bytes than what exists in the file".into(),
195 ));
196 }
197
198 Ok(self.range.end..fetch_end)
199 }
200
201 #[inline]
203 fn finish_with_error(
204 &mut self,
205 error: AvroError,
206 ) -> Poll<Option<Result<RecordBatch, AvroError>>> {
207 self.reader_state = ReaderState::Finished;
208 Poll::Ready(Some(Err(error)))
209 }
210
211 #[inline]
212 fn start_flushing(&mut self) {
213 self.reader_state = ReaderState::Flushing;
214 }
215
216 #[inline]
218 fn poll_flush(&mut self) -> Poll<Option<Result<RecordBatch, AvroError>>> {
219 match self.decoder.flush() {
220 Ok(Some(batch)) => {
221 self.reader_state = ReaderState::Flushing;
222 Poll::Ready(Some(Ok(batch)))
223 }
224 Ok(None) => {
225 self.reader_state = ReaderState::Finished;
226 Poll::Ready(None)
227 }
228 Err(e) => self.finish_with_error(e),
229 }
230 }
231}
232
233impl<R: AsyncFileReader + Unpin + 'static> AsyncAvroFileReader<R> {
234 async fn fetch_bytes(mut reader: R, range: Range<u64>) -> Result<(R, Bytes), AvroError> {
237 let data = reader.get_bytes(range).await?;
238 Ok((reader, data))
239 }
240
241 #[forbid(clippy::question_mark_used)]
242 fn read_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<RecordBatch, AvroError>>> {
243 loop {
244 match mem::replace(&mut self.reader_state, ReaderState::InvalidState) {
245 ReaderState::Idle { reader } => {
246 let range = self.range.clone();
247 if range.start >= range.end {
248 return self.finish_with_error(AvroError::InvalidArgument(format!(
249 "Invalid range specified for Avro file: start {} >= end {}, file_size: {}",
250 range.start, range.end, self.file_size
251 )));
252 }
253
254 let future = Self::fetch_bytes(reader, range).boxed();
255 self.reader_state = ReaderState::FetchingData {
256 future,
257 next_behaviour: FetchNextBehaviour::ReadSyncMarker,
258 };
259 }
260 ReaderState::FetchingData {
261 mut future,
262 next_behaviour,
263 } => {
264 let (reader, data_chunk) = match future.poll_unpin(cx) {
265 Poll::Ready(Ok(data)) => data,
266 Poll::Ready(Err(e)) => return self.finish_with_error(e),
267 Poll::Pending => {
268 self.reader_state = ReaderState::FetchingData {
269 future,
270 next_behaviour,
271 };
272 return Poll::Pending;
273 }
274 };
275
276 match next_behaviour {
277 FetchNextBehaviour::ReadSyncMarker => {
278 let sync_marker_pos = data_chunk
279 .windows(16)
280 .position(|slice| slice == self.sync_marker);
281 let block_start = match sync_marker_pos {
282 Some(pos) => pos + 16, None => {
284 self.reader_state = ReaderState::Finished;
286 return Poll::Ready(None);
287 }
288 };
289
290 self.reader_state = ReaderState::DecodingBlock {
291 reader,
292 data: data_chunk.slice(block_start..),
293 };
294 }
295 FetchNextBehaviour::DecodeVLQHeader => {
296 let mut data = data_chunk;
297
298 while !matches!(self.block_decoder.state(), BlockDecoderState::Data) {
300 if data.is_empty() {
301 return self.finish_with_error(AvroError::EOF(
302 "Unexpected EOF while reading Avro block header".into(),
303 ));
304 }
305 let consumed = match self.block_decoder.decode(&data[..1]) {
306 Ok(consumed) => consumed,
307 Err(e) => return self.finish_with_error(e),
308 };
309 if consumed == 0 {
310 return self.finish_with_error(AvroError::General(
311 "BlockDecoder failed to consume byte during VLQ header parsing"
312 .into(),
313 ));
314 }
315 data = data.slice(consumed..);
316 }
317
318 let bytes_remaining = self.block_decoder.bytes_remaining();
320 let data_to_use = data.slice(..data.len().min(bytes_remaining));
321 let consumed = match self.block_decoder.decode(&data_to_use) {
322 Ok(consumed) => consumed,
323 Err(e) => return self.finish_with_error(e),
324 };
325 if consumed != data_to_use.len() {
326 return self.finish_with_error(AvroError::General(
327 "BlockDecoder failed to consume all bytes after VLQ header parsing"
328 .into(),
329 ));
330 }
331
332 let range_to_fetch = match self.remaining_block_range() {
334 Ok(range) if range.is_empty() => {
335 self.reader_state = ReaderState::DecodingBlock {
337 reader,
338 data: Bytes::new(),
339 };
340 continue;
341 }
342 Ok(range) => range,
343 Err(e) => return self.finish_with_error(e),
344 };
345
346 let future = Self::fetch_bytes(reader, range_to_fetch).boxed();
347 self.reader_state = ReaderState::FetchingData {
348 future,
349 next_behaviour: FetchNextBehaviour::ContinueDecoding,
350 };
351 continue;
352 }
353 FetchNextBehaviour::ContinueDecoding => {
354 self.reader_state = ReaderState::DecodingBlock {
355 reader,
356 data: data_chunk,
357 };
358 }
359 }
360 }
361 ReaderState::InvalidState => {
362 return self.finish_with_error(AvroError::General(
363 "AsyncAvroFileReader in invalid state".into(),
364 ));
365 }
366 ReaderState::DecodingBlock { reader, mut data } => {
367 let consumed = match self.block_decoder.decode(&data) {
369 Ok(consumed) => consumed,
370 Err(e) => return self.finish_with_error(e),
371 };
372 data = data.slice(consumed..);
373
374 if let Some(block) = self.block_decoder.flush() {
376 let block_count = block.count;
378 let block_data = Bytes::from_owner(if let Some(ref codec) = self.codec {
379 match codec.decompress(&block.data) {
380 Ok(decompressed) => decompressed,
381 Err(e) => return self.finish_with_error(e),
382 }
383 } else {
384 block.data
385 });
386
387 self.reader_state = ReaderState::ReadingBatches {
389 reader,
390 data,
391 block_data,
392 remaining_in_block: block_count,
393 };
394 continue;
395 }
396
397 if !data.is_empty() {
399 return self.finish_with_error(AvroError::General(
400 "BlockDecoder failed to make progress decoding Avro block".into(),
401 ));
402 }
403
404 if matches!(self.block_decoder.state(), BlockDecoderState::Finished) {
405 self.finishing_partial_block = false;
407 self.start_flushing();
408 continue;
409 }
410
411 if self.finishing_partial_block {
414 return self.finish_with_error(AvroError::EOF(
415 "Unexpected EOF while reading last Avro block".into(),
416 ));
417 }
418
419 self.finishing_partial_block = true;
425
426 if matches!(
428 self.block_decoder.state(),
429 BlockDecoderState::Count | BlockDecoderState::Size
430 ) {
431 const MAX_VLQ_HEADER_SIZE: u64 = 20;
434 let fetch_end = (self.range.end + MAX_VLQ_HEADER_SIZE).min(self.file_size);
435
436 if fetch_end == self.range.end {
438 return self.finish_with_error(AvroError::EOF(
439 "Unexpected EOF while reading Avro block header".into(),
440 ));
441 }
442
443 let range_to_fetch = self.range.end..fetch_end;
444 self.range.end = fetch_end; let future = Self::fetch_bytes(reader, range_to_fetch).boxed();
447 self.reader_state = ReaderState::FetchingData {
448 future,
449 next_behaviour: FetchNextBehaviour::DecodeVLQHeader,
450 };
451 continue;
452 }
453
454 let range_to_fetch = match self.remaining_block_range() {
456 Ok(range) => range,
457 Err(e) => return self.finish_with_error(e),
458 };
459
460 let future = Self::fetch_bytes(reader, range_to_fetch).boxed();
461 self.reader_state = ReaderState::FetchingData {
462 future,
463 next_behaviour: FetchNextBehaviour::ContinueDecoding,
464 };
465 continue;
466 }
467 ReaderState::ReadingBatches {
468 reader,
469 data,
470 mut block_data,
471 mut remaining_in_block,
472 } => {
473 let (consumed, records_decoded) =
474 match self.decoder.decode_block(&block_data, remaining_in_block) {
475 Ok((consumed, records_decoded)) => (consumed, records_decoded),
476 Err(e) => return self.finish_with_error(e),
477 };
478
479 remaining_in_block -= records_decoded;
480
481 if remaining_in_block == 0 {
482 if data.is_empty() {
483 self.start_flushing();
485 } else {
486 self.reader_state = ReaderState::DecodingBlock { reader, data };
488 }
489 } else {
490 block_data = block_data.slice(consumed..);
492 self.reader_state = ReaderState::ReadingBatches {
493 reader,
494 data,
495 block_data,
496 remaining_in_block,
497 };
498 }
499
500 if self.decoder.batch_is_full() {
503 return match self.decoder.flush() {
504 Ok(Some(batch)) => Poll::Ready(Some(Ok(batch))),
505 Ok(None) => self.finish_with_error(AvroError::General(
506 "Decoder reported a full batch, but flush returned None".into(),
507 )),
508 Err(e) => self.finish_with_error(e),
509 };
510 }
511 }
512 ReaderState::Flushing => {
513 return self.poll_flush();
514 }
515 ReaderState::Finished => {
516 self.reader_state = ReaderState::Finished;
518 return Poll::Ready(None);
519 }
520 }
521 }
522 }
523}
524
525impl<R: AsyncFileReader + Unpin + 'static> Stream for AsyncAvroFileReader<R> {
527 type Item = Result<RecordBatch, ArrowError>;
528
529 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
530 self.read_next(cx).map_err(Into::into)
531 }
532}
533
534#[cfg(all(test, feature = "object_store"))]
535mod tests {
536 use super::*;
537 use crate::schema::{AvroSchema, SCHEMA_METADATA_KEY};
538 use arrow_array::cast::AsArray;
539 use arrow_array::types::{Int32Type, Int64Type};
540 use arrow_array::*;
541 use arrow_schema::{DataType, Field, Schema, SchemaRef, TimeUnit};
542 use futures::{StreamExt, TryStreamExt};
543 use object_store::local::LocalFileSystem;
544 use object_store::path::Path;
545 use object_store::{ObjectStore, ObjectStoreExt};
546 use std::collections::HashMap;
547 use std::sync::Arc;
548
549 fn arrow_test_data(file: &str) -> String {
550 let base =
551 std::env::var("ARROW_TEST_DATA").unwrap_or_else(|_| "../testing/data".to_string());
552 format!("{}/{}", base, file)
553 }
554
555 fn get_alltypes_schema() -> SchemaRef {
556 let schema = Schema::new(vec![
557 Field::new("id", DataType::Int32, true),
558 Field::new("bool_col", DataType::Boolean, true),
559 Field::new("tinyint_col", DataType::Int32, true),
560 Field::new("smallint_col", DataType::Int32, true),
561 Field::new("int_col", DataType::Int32, true),
562 Field::new("bigint_col", DataType::Int64, true),
563 Field::new("float_col", DataType::Float32, true),
564 Field::new("double_col", DataType::Float64, true),
565 Field::new("date_string_col", DataType::Binary, true),
566 Field::new("string_col", DataType::Binary, true),
567 Field::new(
568 "timestamp_col",
569 DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
570 true,
571 ),
572 ])
573 .with_metadata(HashMap::from([(
574 SCHEMA_METADATA_KEY.into(),
575 r#"{
576 "type": "record",
577 "name": "topLevelRecord",
578 "fields": [
579 {
580 "name": "id",
581 "type": [
582 "int",
583 "null"
584 ]
585 },
586 {
587 "name": "bool_col",
588 "type": [
589 "boolean",
590 "null"
591 ]
592 },
593 {
594 "name": "tinyint_col",
595 "type": [
596 "int",
597 "null"
598 ]
599 },
600 {
601 "name": "smallint_col",
602 "type": [
603 "int",
604 "null"
605 ]
606 },
607 {
608 "name": "int_col",
609 "type": [
610 "int",
611 "null"
612 ]
613 },
614 {
615 "name": "bigint_col",
616 "type": [
617 "long",
618 "null"
619 ]
620 },
621 {
622 "name": "float_col",
623 "type": [
624 "float",
625 "null"
626 ]
627 },
628 {
629 "name": "double_col",
630 "type": [
631 "double",
632 "null"
633 ]
634 },
635 {
636 "name": "date_string_col",
637 "type": [
638 "bytes",
639 "null"
640 ]
641 },
642 {
643 "name": "string_col",
644 "type": [
645 "bytes",
646 "null"
647 ]
648 },
649 {
650 "name": "timestamp_col",
651 "type": [
652 {
653 "type": "long",
654 "logicalType": "timestamp-micros"
655 },
656 "null"
657 ]
658 }
659 ]
660}
661"#
662 .into(),
663 )]));
664 Arc::new(schema)
665 }
666
667 fn get_alltypes_with_nulls_schema() -> SchemaRef {
668 let schema = Schema::new(vec![
669 Field::new("string_col", DataType::Binary, true),
670 Field::new("int_col", DataType::Int32, true),
671 Field::new("bool_col", DataType::Boolean, true),
672 Field::new("bigint_col", DataType::Int64, true),
673 Field::new("float_col", DataType::Float32, true),
674 Field::new("double_col", DataType::Float64, true),
675 Field::new("bytes_col", DataType::Binary, true),
676 ])
677 .with_metadata(HashMap::from([(
678 SCHEMA_METADATA_KEY.into(),
679 r#"{
680 "type": "record",
681 "name": "topLevelRecord",
682 "fields": [
683 {
684 "name": "string_col",
685 "type": [
686 "null",
687 "string"
688 ],
689 "default": null
690 },
691 {
692 "name": "int_col",
693 "type": [
694 "null",
695 "int"
696 ],
697 "default": null
698 },
699 {
700 "name": "bool_col",
701 "type": [
702 "null",
703 "boolean"
704 ],
705 "default": null
706 },
707 {
708 "name": "bigint_col",
709 "type": [
710 "null",
711 "long"
712 ],
713 "default": null
714 },
715 {
716 "name": "float_col",
717 "type": [
718 "null",
719 "float"
720 ],
721 "default": null
722 },
723 {
724 "name": "double_col",
725 "type": [
726 "null",
727 "double"
728 ],
729 "default": null
730 },
731 {
732 "name": "bytes_col",
733 "type": [
734 "null",
735 "bytes"
736 ],
737 "default": null
738 }
739 ]
740}"#
741 .into(),
742 )]));
743
744 Arc::new(schema)
745 }
746
747 fn get_nested_records_schema() -> SchemaRef {
748 let schema = Schema::new(vec![
749 Field::new(
750 "f1",
751 DataType::Struct(
752 vec![
753 Field::new("f1_1", DataType::Utf8, false),
754 Field::new("f1_2", DataType::Int32, false),
755 Field::new(
756 "f1_3",
757 DataType::Struct(
758 vec![Field::new("f1_3_1", DataType::Float64, false)].into(),
759 ),
760 false,
761 ),
762 ]
763 .into(),
764 ),
765 false,
766 ),
767 Field::new(
768 "f2",
769 DataType::List(Arc::new(Field::new(
770 "item",
771 DataType::Struct(
772 vec![
773 Field::new("f2_1", DataType::Boolean, false),
774 Field::new("f2_2", DataType::Float32, false),
775 ]
776 .into(),
777 ),
778 false,
779 ))),
780 false,
781 ),
782 Field::new(
783 "f3",
784 DataType::Struct(vec![Field::new("f3_1", DataType::Utf8, false)].into()),
785 true,
786 ),
787 Field::new(
788 "f4",
789 DataType::List(Arc::new(Field::new(
790 "item",
791 DataType::Struct(vec![Field::new("f4_1", DataType::Int64, false)].into()),
792 true,
793 ))),
794 false,
795 ),
796 ])
797 .with_metadata(HashMap::from([(
798 SCHEMA_METADATA_KEY.into(),
799 r#"{
800 "type": "record",
801 "namespace": "ns1",
802 "name": "record1",
803 "fields": [
804 {
805 "name": "f1",
806 "type": {
807 "type": "record",
808 "namespace": "ns2",
809 "name": "record2",
810 "fields": [
811 {
812 "name": "f1_1",
813 "type": "string"
814 },
815 {
816 "name": "f1_2",
817 "type": "int"
818 },
819 {
820 "name": "f1_3",
821 "type": {
822 "type": "record",
823 "namespace": "ns3",
824 "name": "record3",
825 "fields": [
826 {
827 "name": "f1_3_1",
828 "type": "double"
829 }
830 ]
831 }
832 }
833 ]
834 }
835 },
836 {
837 "name": "f2",
838 "type": {
839 "type": "array",
840 "items": {
841 "type": "record",
842 "namespace": "ns4",
843 "name": "record4",
844 "fields": [
845 {
846 "name": "f2_1",
847 "type": "boolean"
848 },
849 {
850 "name": "f2_2",
851 "type": "float"
852 }
853 ]
854 }
855 }
856 },
857 {
858 "name": "f3",
859 "type": [
860 "null",
861 {
862 "type": "record",
863 "namespace": "ns5",
864 "name": "record5",
865 "fields": [
866 {
867 "name": "f3_1",
868 "type": "string"
869 }
870 ]
871 }
872 ],
873 "default": null
874 },
875 {
876 "name": "f4",
877 "type": {
878 "type": "array",
879 "items": [
880 "null",
881 {
882 "type": "record",
883 "namespace": "ns6",
884 "name": "record6",
885 "fields": [
886 {
887 "name": "f4_1",
888 "type": "long"
889 }
890 ]
891 }
892 ]
893 }
894 }
895 ]
896}
897"#
898 .into(),
899 )]));
900
901 Arc::new(schema)
902 }
903
904 async fn read_async_file(
905 path: &str,
906 batch_size: usize,
907 range: Option<Range<u64>>,
908 schema: Option<SchemaRef>,
909 projection: Option<Vec<usize>>,
910 ) -> Result<Vec<RecordBatch>, ArrowError> {
911 let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
912 let location = Path::from_filesystem_path(path).unwrap();
913
914 let file_size = store.head(&location).await.unwrap().size;
915
916 let file_reader = AvroObjectReader::new(store, location);
917 let mut builder = AsyncAvroFileReader::builder(file_reader, file_size, batch_size);
918
919 if let Some(s) = schema {
920 let reader_schema = AvroSchema::try_from(s.as_ref())?;
921 builder = builder.with_reader_schema(reader_schema);
922 }
923
924 if let Some(proj) = projection {
925 builder = builder.with_projection(proj);
926 }
927
928 if let Some(range) = range {
929 builder = builder.with_range(range);
930 }
931
932 let reader = builder.try_build().await?;
933 reader.try_collect().await
934 }
935
936 #[tokio::test]
937 async fn test_full_file_read() {
938 let file = arrow_test_data("avro/alltypes_plain.avro");
939 let schema = get_alltypes_schema();
940 let batches = read_async_file(&file, 1024, None, Some(schema), None)
941 .await
942 .unwrap();
943 let batch = &batches[0];
944
945 assert_eq!(batch.num_rows(), 8);
946 assert_eq!(batch.num_columns(), 11);
947
948 let id_array = batch
949 .column(0)
950 .as_any()
951 .downcast_ref::<Int32Array>()
952 .unwrap();
953 assert_eq!(id_array.value(0), 4);
954 assert_eq!(id_array.value(7), 1);
955 }
956
957 #[tokio::test]
958 async fn test_small_batch_size() {
959 let file = arrow_test_data("avro/alltypes_plain.avro");
960 let schema = get_alltypes_schema();
961 let batches = read_async_file(&file, 2, None, Some(schema), None)
962 .await
963 .unwrap();
964 assert_eq!(batches.len(), 4);
965
966 let batch = &batches[0];
967
968 assert_eq!(batch.num_rows(), 2);
969 assert_eq!(batch.num_columns(), 11);
970 }
971
972 #[tokio::test]
973 async fn test_batch_size_one() {
974 let file = arrow_test_data("avro/alltypes_plain.avro");
975 let schema = get_alltypes_schema();
976 let batches = read_async_file(&file, 1, None, Some(schema), None)
977 .await
978 .unwrap();
979 let batch = &batches[0];
980
981 assert_eq!(batches.len(), 8);
982 assert_eq!(batch.num_rows(), 1);
983 }
984
985 #[tokio::test]
986 async fn test_batch_size_larger_than_file() {
987 let file = arrow_test_data("avro/alltypes_plain.avro");
988 let schema = get_alltypes_schema();
989 let batches = read_async_file(&file, 10000, None, Some(schema), None)
990 .await
991 .unwrap();
992 let batch = &batches[0];
993
994 assert_eq!(batch.num_rows(), 8);
995 }
996
997 #[tokio::test]
998 async fn test_empty_range() {
999 let file = arrow_test_data("avro/alltypes_plain.avro");
1000 let range = 100..100;
1001 let schema = get_alltypes_schema();
1002 let batches = read_async_file(&file, 1024, Some(range), Some(schema), None)
1003 .await
1004 .unwrap();
1005 assert_eq!(batches.len(), 0);
1006 }
1007
1008 #[tokio::test]
1009 async fn test_range_starting_at_zero() {
1010 let file = arrow_test_data("avro/alltypes_plain.avro");
1012 let store = Arc::new(LocalFileSystem::new());
1013 let location = Path::from_filesystem_path(&file).unwrap();
1014 let meta = store.head(&location).await.unwrap();
1015
1016 let range = 0..meta.size;
1017 let schema = get_alltypes_schema();
1018 let batches = read_async_file(&file, 1024, Some(range), Some(schema), None)
1019 .await
1020 .unwrap();
1021 let batch = &batches[0];
1022
1023 assert_eq!(batch.num_rows(), 8);
1024 }
1025
1026 #[tokio::test]
1027 async fn test_range_after_header() {
1028 let file = arrow_test_data("avro/alltypes_plain.avro");
1029 let store = Arc::new(LocalFileSystem::new());
1030 let location = Path::from_filesystem_path(&file).unwrap();
1031 let meta = store.head(&location).await.unwrap();
1032
1033 let range = 100..meta.size;
1034 let schema = get_alltypes_schema();
1035 let batches = read_async_file(&file, 1024, Some(range), Some(schema), None)
1036 .await
1037 .unwrap();
1038 let batch = &batches[0];
1039
1040 assert!(batch.num_rows() > 0);
1041 }
1042
1043 #[tokio::test]
1044 async fn test_range_no_sync_marker() {
1045 let file = arrow_test_data("avro/alltypes_plain.avro");
1047 let range = 50..150;
1048 let schema = get_alltypes_schema();
1049 let batches = read_async_file(&file, 1024, Some(range), Some(schema), None)
1050 .await
1051 .unwrap();
1052 assert_eq!(batches.len(), 0);
1053 }
1054
1055 #[tokio::test]
1056 async fn test_range_starting_mid_file() {
1057 let file = arrow_test_data("avro/alltypes_plain.avro");
1058
1059 let range = 700..768; let schema = get_alltypes_schema();
1061 let batches = read_async_file(&file, 1024, Some(range), Some(schema), None)
1062 .await
1063 .unwrap();
1064 assert_eq!(batches.len(), 0);
1065 }
1066
1067 #[tokio::test]
1068 async fn test_range_ending_at_file_size() {
1069 let file = arrow_test_data("avro/alltypes_plain.avro");
1070 let store = Arc::new(LocalFileSystem::new());
1071 let location = Path::from_filesystem_path(&file).unwrap();
1072 let meta = store.head(&location).await.unwrap();
1073
1074 let range = 200..meta.size;
1075 let schema = get_alltypes_schema();
1076 let batches = read_async_file(&file, 1024, Some(range), Some(schema), None)
1077 .await
1078 .unwrap();
1079 let batch = &batches[0];
1080
1081 assert_eq!(batch.num_rows(), 8);
1082 }
1083
1084 #[tokio::test]
1085 async fn test_incomplete_block_requires_fetch() {
1086 let file = arrow_test_data("avro/alltypes_plain.avro");
1088 let range = 0..1200;
1089 let schema = get_alltypes_schema();
1090 let batches = read_async_file(&file, 1024, Some(range), Some(schema), None)
1091 .await
1092 .unwrap();
1093 let batch = &batches[0];
1094
1095 assert_eq!(batch.num_rows(), 8)
1096 }
1097
1098 #[tokio::test]
1099 async fn test_partial_vlq_header_requires_fetch() {
1100 let file = arrow_test_data("avro/alltypes_plain.avro");
1102 let range = 16..676; let schema = get_alltypes_schema();
1104 let batches = read_async_file(&file, 1024, Some(range), Some(schema), None)
1105 .await
1106 .unwrap();
1107 let batch = &batches[0];
1108
1109 assert_eq!(batch.num_rows(), 8)
1110 }
1111
1112 #[cfg(feature = "snappy")]
1113 #[tokio::test]
1114 async fn test_snappy_compressed_with_range() {
1115 {
1116 let file = arrow_test_data("avro/alltypes_plain.snappy.avro");
1117 let store = Arc::new(LocalFileSystem::new());
1118 let location = Path::from_filesystem_path(&file).unwrap();
1119 let meta = store.head(&location).await.unwrap();
1120
1121 let range = 200..meta.size;
1122 let schema = get_alltypes_schema();
1123 let batches = read_async_file(&file, 1024, Some(range), Some(schema), None)
1124 .await
1125 .unwrap();
1126 let batch = &batches[0];
1127
1128 assert!(batch.num_rows() > 0);
1129 }
1130 }
1131
1132 #[tokio::test]
1133 async fn test_nulls() {
1134 let file = arrow_test_data("avro/alltypes_nulls_plain.avro");
1135 let schema = get_alltypes_with_nulls_schema();
1136 let batches = read_async_file(&file, 1024, None, Some(schema), None)
1137 .await
1138 .unwrap();
1139 let batch = &batches[0];
1140
1141 assert_eq!(batch.num_rows(), 1);
1142 for col in batch.columns() {
1143 assert!(col.is_null(0));
1144 }
1145 }
1146
1147 #[tokio::test]
1148 async fn test_nested_records() {
1149 let file = arrow_test_data("avro/nested_records.avro");
1150 let schema = get_nested_records_schema();
1151 let batches = read_async_file(&file, 1024, None, Some(schema), None)
1152 .await
1153 .unwrap();
1154 let batch = &batches[0];
1155
1156 assert_eq!(batch.num_rows(), 2);
1157 assert!(batch.num_columns() > 0);
1158 }
1159
1160 #[tokio::test]
1161 async fn test_stream_produces_multiple_batches() {
1162 let file = arrow_test_data("avro/alltypes_plain.avro");
1163 let store = Arc::new(LocalFileSystem::new());
1164 let location = Path::from_filesystem_path(&file).unwrap();
1165
1166 let file_size = store.head(&location).await.unwrap().size;
1167
1168 let file_reader = AvroObjectReader::new(store, location);
1169 let schema = get_alltypes_schema();
1170 let reader_schema = AvroSchema::try_from(schema.as_ref()).unwrap();
1171 let reader = AsyncAvroFileReader::builder(
1172 file_reader,
1173 file_size,
1174 2, )
1176 .with_reader_schema(reader_schema)
1177 .try_build()
1178 .await
1179 .unwrap();
1180
1181 let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
1182
1183 assert!(batches.len() > 1);
1184 let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
1185 assert_eq!(total_rows, 8);
1186 }
1187
1188 #[tokio::test]
1189 async fn test_stream_early_termination() {
1190 let file = arrow_test_data("avro/alltypes_plain.avro");
1191 let store = Arc::new(LocalFileSystem::new());
1192 let location = Path::from_filesystem_path(&file).unwrap();
1193
1194 let file_size = store.head(&location).await.unwrap().size;
1195
1196 let file_reader = AvroObjectReader::new(store, location);
1197 let schema = get_alltypes_schema();
1198 let reader_schema = AvroSchema::try_from(schema.as_ref()).unwrap();
1199 let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1)
1200 .with_reader_schema(reader_schema)
1201 .try_build()
1202 .await
1203 .unwrap();
1204
1205 let first_batch = reader.take(1).try_collect::<Vec<_>>().await.unwrap();
1206
1207 assert_eq!(first_batch.len(), 1);
1208 assert!(first_batch[0].num_rows() > 0);
1209 }
1210
1211 #[tokio::test]
1212 async fn test_various_batch_sizes() {
1213 let file = arrow_test_data("avro/alltypes_plain.avro");
1214
1215 for batch_size in [1, 2, 3, 5, 7, 11, 100] {
1216 let schema = get_alltypes_schema();
1217 let batches = read_async_file(&file, batch_size, None, Some(schema), None)
1218 .await
1219 .unwrap();
1220 let batch = &batches[0];
1221
1222 assert_eq!(
1224 batch.num_rows(),
1225 batch_size.min(8),
1226 "Failed with batch_size={}",
1227 batch_size
1228 );
1229 }
1230 }
1231
1232 #[tokio::test]
1233 async fn test_range_larger_than_file() {
1234 let file = arrow_test_data("avro/alltypes_plain.avro");
1235 let store = Arc::new(LocalFileSystem::new());
1236 let location = Path::from_filesystem_path(&file).unwrap();
1237 let meta = store.head(&location).await.unwrap();
1238
1239 let range = 100..(meta.size + 1000);
1241 let schema = get_alltypes_schema();
1242 let batches = read_async_file(&file, 1024, Some(range), Some(schema), None)
1243 .await
1244 .unwrap();
1245 let batch = &batches[0];
1246
1247 assert_eq!(batch.num_rows(), 8);
1249 }
1250
1251 #[tokio::test]
1252 async fn test_roundtrip_write_then_async_read() {
1253 use crate::writer::AvroWriter;
1254 use arrow_array::{Float64Array, StringArray};
1255 use std::fs::File;
1256 use std::io::BufWriter;
1257 use tempfile::tempdir;
1258
1259 let schema = Arc::new(Schema::new(vec![
1261 Field::new("id", DataType::Int32, false),
1262 Field::new("name", DataType::Utf8, true),
1263 Field::new("score", DataType::Float64, true),
1264 Field::new("count", DataType::Int64, false),
1265 ]));
1266
1267 let dir = tempdir().unwrap();
1268 let file_path = dir.path().join("roundtrip_test.avro");
1269
1270 {
1272 let file = File::create(&file_path).unwrap();
1273 let writer = BufWriter::new(file);
1274 let mut avro_writer = AvroWriter::new(writer, schema.as_ref().clone()).unwrap();
1275
1276 let batch1 = RecordBatch::try_new(
1278 schema.clone(),
1279 vec![
1280 Arc::new(Int32Array::from(vec![1, 2, 3])),
1281 Arc::new(StringArray::from(vec![
1282 Some("alice"),
1283 None,
1284 Some("charlie"),
1285 ])),
1286 Arc::new(Float64Array::from(vec![Some(95.5), Some(87.3), None])),
1287 Arc::new(Int64Array::from(vec![10, 20, 30])),
1288 ],
1289 )
1290 .unwrap();
1291 avro_writer.write(&batch1).unwrap();
1292
1293 let batch2 = RecordBatch::try_new(
1295 schema.clone(),
1296 vec![
1297 Arc::new(Int32Array::from(vec![4, 5])),
1298 Arc::new(StringArray::from(vec![Some("diana"), Some("eve")])),
1299 Arc::new(Float64Array::from(vec![None, Some(88.0)])),
1300 Arc::new(Int64Array::from(vec![40, 50])),
1301 ],
1302 )
1303 .unwrap();
1304 avro_writer.write(&batch2).unwrap();
1305
1306 avro_writer.finish().unwrap();
1307 }
1308
1309 let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
1311 let location = Path::from_filesystem_path(&file_path).unwrap();
1312 let file_size = store.head(&location).await.unwrap().size;
1313
1314 let file_reader = AvroObjectReader::new(store, location);
1315 let reader = AsyncAvroFileReader::builder(file_reader, file_size, 2)
1316 .try_build()
1317 .await
1318 .unwrap();
1319
1320 let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
1321
1322 assert!(
1324 batches.len() > 1,
1325 "Expected multiple batches with batch_size=2"
1326 );
1327
1328 let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
1330 assert_eq!(total_rows, 5);
1331
1332 let combined = arrow::compute::concat_batches(&batches[0].schema(), &batches).unwrap();
1334 assert_eq!(combined.num_rows(), 5);
1335 assert_eq!(combined.num_columns(), 4);
1336
1337 let id_array = combined
1339 .column(0)
1340 .as_any()
1341 .downcast_ref::<Int32Array>()
1342 .unwrap();
1343 assert_eq!(id_array.values(), &[1, 2, 3, 4, 5]);
1344
1345 let name_col = combined.column(1);
1348 let name_array = name_col.as_string::<i32>();
1349 assert_eq!(name_array.value(0), "alice");
1350 assert!(name_col.is_null(1)); assert_eq!(name_array.value(2), "charlie");
1352
1353 let score_array = combined
1355 .column(2)
1356 .as_any()
1357 .downcast_ref::<Float64Array>()
1358 .unwrap();
1359 assert!(!score_array.is_null(0));
1360 assert!((score_array.value(0) - 95.5).abs() < f64::EPSILON);
1361 assert!(score_array.is_null(2)); assert!(score_array.is_null(3)); assert!(!score_array.is_null(4));
1364 assert!((score_array.value(4) - 88.0).abs() < f64::EPSILON);
1365
1366 let count_array = combined
1368 .column(3)
1369 .as_any()
1370 .downcast_ref::<Int64Array>()
1371 .unwrap();
1372 assert_eq!(count_array.values(), &[10, 20, 30, 40, 50]);
1373 }
1374
1375 #[tokio::test]
1376 async fn test_alltypes_no_schema_no_projection() {
1377 let file = arrow_test_data("avro/alltypes_plain.avro");
1379 let batches = read_async_file(&file, 1024, None, None, None)
1380 .await
1381 .unwrap();
1382 let batch = &batches[0];
1383
1384 assert_eq!(batch.num_rows(), 8);
1385 assert_eq!(batch.num_columns(), 11);
1386 assert_eq!(batch.schema().field(0).name(), "id");
1387 }
1388
1389 #[tokio::test]
1390 async fn test_alltypes_no_schema_with_projection() {
1391 let file = arrow_test_data("avro/alltypes_plain.avro");
1393 let batches = read_async_file(&file, 1024, None, None, Some(vec![2, 0, 5]))
1395 .await
1396 .unwrap();
1397 let batch = &batches[0];
1398
1399 assert_eq!(batch.num_rows(), 8);
1400 assert_eq!(batch.num_columns(), 3);
1401 assert_eq!(batch.schema().field(0).name(), "tinyint_col");
1402 assert_eq!(batch.schema().field(1).name(), "id");
1403 assert_eq!(batch.schema().field(2).name(), "bigint_col");
1404
1405 let tinyint_col = batch.column(0).as_primitive::<Int32Type>();
1407 assert_eq!(tinyint_col.values(), &[0, 1, 0, 1, 0, 1, 0, 1]);
1408
1409 let id = batch.column(1).as_primitive::<Int32Type>();
1410 assert_eq!(id.values(), &[4, 5, 6, 7, 2, 3, 0, 1]);
1411
1412 let bigint_col = batch.column(2).as_primitive::<Int64Type>();
1413 assert_eq!(bigint_col.values(), &[0, 10, 0, 10, 0, 10, 0, 10]);
1414 }
1415
1416 #[tokio::test]
1417 async fn test_alltypes_with_schema_no_projection() {
1418 let file = arrow_test_data("avro/alltypes_plain.avro");
1420 let schema = get_alltypes_schema();
1421 let batches = read_async_file(&file, 1024, None, Some(schema), None)
1422 .await
1423 .unwrap();
1424 let batch = &batches[0];
1425
1426 assert_eq!(batch.num_rows(), 8);
1427 assert_eq!(batch.num_columns(), 11);
1428 }
1429
1430 #[tokio::test]
1431 async fn test_alltypes_with_schema_with_projection() {
1432 let file = arrow_test_data("avro/alltypes_plain.avro");
1434 let schema = get_alltypes_schema();
1435 let batches = read_async_file(&file, 1024, None, Some(schema), Some(vec![1, 0]))
1437 .await
1438 .unwrap();
1439 let batch = &batches[0];
1440
1441 assert_eq!(batch.num_rows(), 8);
1442 assert_eq!(batch.num_columns(), 2);
1443 assert_eq!(batch.schema().field(0).name(), "bool_col");
1444 assert_eq!(batch.schema().field(1).name(), "id");
1445
1446 let bool_col = batch.column(0).as_boolean();
1447 assert!(bool_col.value(0));
1448 assert!(!bool_col.value(1));
1449
1450 let id = batch.column(1).as_primitive::<Int32Type>();
1451 assert_eq!(id.values(), &[4, 5, 6, 7, 2, 3, 0, 1]);
1452 }
1453
1454 #[tokio::test]
1455 async fn test_nested_no_schema_no_projection() {
1456 let file = arrow_test_data("avro/nested_records.avro");
1458 let batches = read_async_file(&file, 1024, None, None, None)
1459 .await
1460 .unwrap();
1461 let batch = &batches[0];
1462
1463 assert_eq!(batch.num_rows(), 2);
1464 assert_eq!(batch.num_columns(), 4);
1465 assert_eq!(batch.schema().field(0).name(), "f1");
1466 assert_eq!(batch.schema().field(1).name(), "f2");
1467 assert_eq!(batch.schema().field(2).name(), "f3");
1468 assert_eq!(batch.schema().field(3).name(), "f4");
1469 }
1470
1471 #[tokio::test]
1472 async fn test_nested_no_schema_with_projection() {
1473 let file = arrow_test_data("avro/nested_records.avro");
1475 let batches = read_async_file(&file, 1024, None, None, Some(vec![2, 0]))
1477 .await
1478 .unwrap();
1479 let batch = &batches[0];
1480
1481 assert_eq!(batch.num_rows(), 2);
1482 assert_eq!(batch.num_columns(), 2);
1483 assert_eq!(batch.schema().field(0).name(), "f3");
1484 assert_eq!(batch.schema().field(1).name(), "f1");
1485 }
1486
1487 #[tokio::test]
1488 async fn test_nested_with_schema_no_projection() {
1489 let file = arrow_test_data("avro/nested_records.avro");
1491 let schema = get_nested_records_schema();
1492 let batches = read_async_file(&file, 1024, None, Some(schema), None)
1493 .await
1494 .unwrap();
1495 let batch = &batches[0];
1496
1497 assert_eq!(batch.num_rows(), 2);
1498 assert_eq!(batch.num_columns(), 4);
1499 }
1500
1501 #[tokio::test]
1502 async fn test_nested_with_schema_with_projection() {
1503 let file = arrow_test_data("avro/nested_records.avro");
1505 let schema = get_nested_records_schema();
1506 let batches = read_async_file(&file, 1024, None, Some(schema), Some(vec![3, 1, 0]))
1508 .await
1509 .unwrap();
1510 let batch = &batches[0];
1511
1512 assert_eq!(batch.num_rows(), 2);
1513 assert_eq!(batch.num_columns(), 3);
1514 assert_eq!(batch.schema().field(0).name(), "f4");
1515 assert_eq!(batch.schema().field(1).name(), "f2");
1516 assert_eq!(batch.schema().field(2).name(), "f1");
1517 }
1518
1519 #[tokio::test]
1520 async fn test_projection_error_out_of_bounds() {
1521 let file = arrow_test_data("avro/alltypes_plain.avro");
1522 let err = read_async_file(&file, 1024, None, None, Some(vec![100]))
1524 .await
1525 .unwrap_err();
1526 assert!(matches!(err, ArrowError::AvroError(_)));
1527 assert!(err.to_string().contains("out of bounds"));
1528 }
1529
1530 #[tokio::test]
1531 async fn test_projection_error_duplicate_index() {
1532 let file = arrow_test_data("avro/alltypes_plain.avro");
1533 let err = read_async_file(&file, 1024, None, None, Some(vec![0, 0]))
1535 .await
1536 .unwrap_err();
1537 assert!(matches!(err, ArrowError::AvroError(_)));
1538 assert!(err.to_string().contains("Duplicate projection index"));
1539 }
1540
1541 #[tokio::test]
1542 async fn test_with_header_size_hint_small() {
1543 let file = arrow_test_data("avro/alltypes_plain.avro");
1545 let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
1546 let location = Path::from_filesystem_path(&file).unwrap();
1547 let file_size = store.head(&location).await.unwrap().size;
1548
1549 let file_reader = AvroObjectReader::new(store, location);
1550 let schema = get_alltypes_schema();
1551 let reader_schema = AvroSchema::try_from(schema.as_ref()).unwrap();
1552
1553 let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024)
1555 .with_reader_schema(reader_schema)
1556 .with_header_size_hint(64)
1557 .try_build()
1558 .await
1559 .unwrap();
1560
1561 let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
1562 let batch = &batches[0];
1563
1564 assert_eq!(batch.num_rows(), 8);
1565 assert_eq!(batch.num_columns(), 11);
1566 }
1567
1568 #[tokio::test]
1569 async fn test_with_header_size_hint_large() {
1570 let file = arrow_test_data("avro/alltypes_plain.avro");
1572 let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
1573 let location = Path::from_filesystem_path(&file).unwrap();
1574 let file_size = store.head(&location).await.unwrap().size;
1575
1576 let file_reader = AvroObjectReader::new(store, location);
1577 let schema = get_alltypes_schema();
1578 let reader_schema = AvroSchema::try_from(schema.as_ref()).unwrap();
1579
1580 let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024)
1582 .with_reader_schema(reader_schema)
1583 .with_header_size_hint(64 * 1024)
1584 .try_build()
1585 .await
1586 .unwrap();
1587
1588 let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
1589 let batch = &batches[0];
1590
1591 assert_eq!(batch.num_rows(), 8);
1592 assert_eq!(batch.num_columns(), 11);
1593 }
1594
1595 #[tokio::test]
1596 async fn test_with_utf8_view_enabled() {
1597 let file = arrow_test_data("avro/nested_records.avro");
1599 let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
1600 let location = Path::from_filesystem_path(&file).unwrap();
1601 let file_size = store.head(&location).await.unwrap().size;
1602
1603 let file_reader = AvroObjectReader::new(store, location);
1604
1605 let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024)
1606 .with_utf8_view(true)
1607 .try_build()
1608 .await
1609 .unwrap();
1610
1611 let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
1612 let batch = &batches[0];
1613
1614 assert_eq!(batch.num_rows(), 2);
1615
1616 let f1_col = batch.column(0);
1619 let f1_struct = f1_col.as_struct();
1620 let f1_1_field = f1_struct.column_by_name("f1_1").unwrap();
1621
1622 assert_eq!(f1_1_field.data_type(), &DataType::Utf8View);
1624 }
1625
1626 #[tokio::test]
1627 async fn test_with_utf8_view_disabled() {
1628 let file = arrow_test_data("avro/nested_records.avro");
1630 let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
1631 let location = Path::from_filesystem_path(&file).unwrap();
1632 let file_size = store.head(&location).await.unwrap().size;
1633
1634 let file_reader = AvroObjectReader::new(store, location);
1635
1636 let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024)
1637 .with_utf8_view(false)
1638 .try_build()
1639 .await
1640 .unwrap();
1641
1642 let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
1643 let batch = &batches[0];
1644
1645 assert_eq!(batch.num_rows(), 2);
1646
1647 let f1_col = batch.column(0);
1650 let f1_struct = f1_col.as_struct();
1651 let f1_1_field = f1_struct.column_by_name("f1_1").unwrap();
1652
1653 assert_eq!(f1_1_field.data_type(), &DataType::Utf8);
1654 }
1655
1656 #[tokio::test]
1657 async fn test_with_strict_mode_disabled_allows_null_second() {
1658 let file = arrow_test_data("avro/alltypes_nulls_plain.avro");
1661 let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
1662 let location = Path::from_filesystem_path(&file).unwrap();
1663 let file_size = store.head(&location).await.unwrap().size;
1664
1665 let file_reader = AvroObjectReader::new(store, location);
1666
1667 let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024)
1669 .with_strict_mode(false)
1670 .try_build()
1671 .await
1672 .unwrap();
1673
1674 let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
1675 assert_eq!(batches.len(), 1);
1676 assert_eq!(batches[0].num_rows(), 1);
1677 }
1678
1679 #[tokio::test]
1680 async fn test_with_strict_mode_enabled_rejects_null_second() {
1681 let file = arrow_test_data("avro/alltypes_plain.avro");
1684 let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
1685 let location = Path::from_filesystem_path(&file).unwrap();
1686 let file_size = store.head(&location).await.unwrap().size;
1687
1688 let file_reader = AvroObjectReader::new(store, location);
1689
1690 let result = AsyncAvroFileReader::builder(file_reader, file_size, 1024)
1692 .with_strict_mode(true)
1693 .try_build()
1694 .await;
1695
1696 match result {
1697 Ok(_) => panic!("Expected error for strict_mode with ['T', 'null'] union"),
1698 Err(err) => {
1699 assert!(
1700 err.to_string().contains("disallowed in strict_mode"),
1701 "Expected strict_mode error, got: {}",
1702 err
1703 );
1704 }
1705 }
1706 }
1707
1708 #[tokio::test]
1709 async fn test_with_strict_mode_enabled_valid_schema() {
1710 let file = arrow_test_data("avro/nested_records.avro");
1713 let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
1714 let location = Path::from_filesystem_path(&file).unwrap();
1715 let file_size = store.head(&location).await.unwrap().size;
1716
1717 let file_reader = AvroObjectReader::new(store, location);
1718
1719 let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024)
1721 .with_strict_mode(true)
1722 .try_build()
1723 .await
1724 .unwrap();
1725
1726 let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
1727 assert_eq!(batches.len(), 1);
1728 assert_eq!(batches[0].num_rows(), 2);
1729 }
1730
1731 #[tokio::test]
1732 async fn test_builder_options_combined() {
1733 let file = arrow_test_data("avro/nested_records.avro");
1735 let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
1736 let location = Path::from_filesystem_path(&file).unwrap();
1737 let file_size = store.head(&location).await.unwrap().size;
1738
1739 let file_reader = AvroObjectReader::new(store, location);
1740
1741 let reader = AsyncAvroFileReader::builder(file_reader, file_size, 2)
1742 .with_header_size_hint(128)
1743 .with_utf8_view(true)
1744 .with_strict_mode(true)
1745 .with_projection(vec![0, 2]) .try_build()
1747 .await
1748 .unwrap();
1749
1750 let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
1751 let batch = &batches[0];
1752
1753 assert_eq!(batch.num_columns(), 2);
1755 assert_eq!(batch.schema().field(0).name(), "f1");
1756 assert_eq!(batch.schema().field(1).name(), "f3");
1757
1758 let f1_col = batch.column(0);
1760 let f1_struct = f1_col.as_struct();
1761 let f1_1_field = f1_struct.column_by_name("f1_1").unwrap();
1762 assert_eq!(f1_1_field.data_type(), &DataType::Utf8View);
1763 }
1764}