Skip to main content

arrow_avro/reader/async_reader/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::compression::CompressionCodec;
19use crate::reader::Decoder;
20use crate::reader::block::{BlockDecoder, BlockDecoderState};
21use arrow_array::RecordBatch;
22use arrow_schema::ArrowError;
23use bytes::Bytes;
24use futures::future::BoxFuture;
25use futures::{FutureExt, Stream};
26use std::mem;
27use std::ops::Range;
28use std::pin::Pin;
29use std::task::{Context, Poll};
30
31mod async_file_reader;
32mod builder;
33
34pub use async_file_reader::AsyncFileReader;
35pub use builder::ReaderBuilder;
36
37#[cfg(feature = "object_store")]
38mod store;
39
40use crate::errors::AvroError;
41#[cfg(feature = "object_store")]
42pub use store::AvroObjectReader;
43
44enum FetchNextBehaviour {
45    /// Initial read: scan for sync marker, then move to decoding blocks
46    ReadSyncMarker,
47    /// Parse VLQ header bytes one at a time until Data state, then continue decoding
48    DecodeVLQHeader,
49    /// Continue decoding the current block with the fetched data
50    ContinueDecoding,
51}
52
53enum ReaderState<R> {
54    /// Intermediate state to fix ownership issues
55    InvalidState,
56    /// Initial state, fetch initial range
57    Idle { reader: R },
58    /// Fetching data from the reader
59    FetchingData {
60        future: BoxFuture<'static, Result<(R, Bytes), AvroError>>,
61        next_behaviour: FetchNextBehaviour,
62    },
63    /// Decode a block in a loop until completion
64    DecodingBlock { data: Bytes, reader: R },
65    /// Output batches from a decoded block
66    ReadingBatches {
67        data: Bytes,
68        block_data: Bytes,
69        remaining_in_block: usize,
70        reader: R,
71    },
72    /// Successfully finished reading file contents; drain any remaining buffered records
73    /// from the decoder into (possibly partial) output batches.
74    Flushing,
75    /// Done, flush decoder and return
76    Finished,
77}
78
79/// An asynchronous Avro file reader that implements `Stream<Item = Result<RecordBatch, ArrowError>>`.
80/// This uses an [`AsyncFileReader`] to fetch data ranges as needed, starting with fetching the header,
81/// then reading all the blocks in the provided range where:
82/// 1. Reads and decodes data until the header is fully decoded.
83/// 2. Searching from `range.start` for the first sync marker, and starting with the following block.
84///    (If `range.start` is less than the header length, we start at the header length minus the sync marker bytes)
85/// 3. Reading blocks sequentially, decoding them into RecordBatches.
86/// 4. If a block is incomplete (due to range ending mid-block), fetching the remaining bytes from the [`AsyncFileReader`].
87/// 5. If no range was originally provided, reads the full file.
88/// 6. If the range is 0, file_size is 0, or `range.end` is less than the header length, finish immediately.
89///
90/// # Example
91///
92/// ```
93/// #[tokio::main(flavor = "current_thread")]
94/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
95/// use std::io::Cursor;
96/// use std::sync::Arc;
97/// use arrow_array::{ArrayRef, Int32Array, RecordBatch};
98/// use arrow_schema::{DataType, Field, Schema};
99/// use arrow_avro::reader::AsyncAvroFileReader;
100/// use arrow_avro::writer::AvroWriter;
101/// use futures::TryStreamExt;
102///
103/// // Build a minimal Arrow schema and batch
104/// let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
105/// let batch = RecordBatch::try_new(
106///     Arc::new(schema.clone()),
107///     vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef],
108/// )?;
109///
110/// // Write an Avro OCF to memory
111/// let buffer: Vec<u8> = Vec::new();
112/// let mut writer = AvroWriter::new(buffer, schema)?;
113/// writer.write(&batch)?;
114/// writer.finish()?;
115/// let bytes = writer.into_inner();
116///
117/// // Create an async reader from the in-memory bytes
118/// // `tokio::fs::File` also implements `AsyncFileReader` for reading from disk
119/// let file_size = bytes.len();
120/// let cursor = Cursor::new(bytes);
121/// let reader = AsyncAvroFileReader::builder(cursor, file_size as u64, 1024)
122///     .try_build()
123///     .await?;
124///
125/// // Consume the stream of RecordBatches
126/// let batches: Vec<RecordBatch> = reader.try_collect().await?;
127/// assert_eq!(batches.len(), 1);
128/// assert_eq!(batches[0].num_rows(), 3);
129/// Ok(())
130/// }
131/// ```
132pub struct AsyncAvroFileReader<R> {
133    // Members required to fetch data
134    range: Range<u64>,
135    file_size: u64,
136
137    // Members required to actually decode and read data
138    decoder: Decoder,
139    block_decoder: BlockDecoder,
140    codec: Option<CompressionCodec>,
141    sync_marker: [u8; 16],
142
143    // Members keeping the current state of the reader
144    reader_state: ReaderState<R>,
145    finishing_partial_block: bool,
146}
147
148impl<R> AsyncAvroFileReader<R> {
149    /// Returns a builder for a new [`Self`], allowing some optional parameters.
150    pub fn builder(reader: R, file_size: u64, batch_size: usize) -> ReaderBuilder<R> {
151        ReaderBuilder::new(reader, file_size, batch_size)
152    }
153
154    fn new(
155        range: Range<u64>,
156        file_size: u64,
157        decoder: Decoder,
158        codec: Option<CompressionCodec>,
159        sync_marker: [u8; 16],
160        reader_state: ReaderState<R>,
161    ) -> Self {
162        Self {
163            range,
164            file_size,
165
166            decoder,
167            block_decoder: Default::default(),
168            codec,
169            sync_marker,
170
171            reader_state,
172            finishing_partial_block: false,
173        }
174    }
175
176    /// Calculate the byte range needed to complete the current block.
177    /// Only valid when block_decoder is in Data or Sync state.
178    /// Returns the range to fetch, or an error if EOF would be reached.
179    fn remaining_block_range(&self) -> Result<Range<u64>, AvroError> {
180        let remaining = self.block_decoder.bytes_remaining() as u64
181            + match self.block_decoder.state() {
182                BlockDecoderState::Data => 16, // Include sync marker
183                BlockDecoderState::Sync => 0,
184                state => {
185                    return Err(AvroError::General(format!(
186                        "remaining_block_range called in unexpected state: {state:?}"
187                    )));
188                }
189            };
190
191        let fetch_end = self.range.end + remaining;
192        if fetch_end > self.file_size {
193            return Err(AvroError::EOF(
194                "Avro block requires more bytes than what exists in the file".into(),
195            ));
196        }
197
198        Ok(self.range.end..fetch_end)
199    }
200
201    /// Terminate the stream after returning this error once.
202    #[inline]
203    fn finish_with_error(
204        &mut self,
205        error: AvroError,
206    ) -> Poll<Option<Result<RecordBatch, AvroError>>> {
207        self.reader_state = ReaderState::Finished;
208        Poll::Ready(Some(Err(error)))
209    }
210
211    #[inline]
212    fn start_flushing(&mut self) {
213        self.reader_state = ReaderState::Flushing;
214    }
215
216    /// Drain any remaining buffered records from the decoder.
217    #[inline]
218    fn poll_flush(&mut self) -> Poll<Option<Result<RecordBatch, AvroError>>> {
219        match self.decoder.flush() {
220            Ok(Some(batch)) => {
221                self.reader_state = ReaderState::Flushing;
222                Poll::Ready(Some(Ok(batch)))
223            }
224            Ok(None) => {
225                self.reader_state = ReaderState::Finished;
226                Poll::Ready(None)
227            }
228            Err(e) => self.finish_with_error(e),
229        }
230    }
231}
232
233impl<R: AsyncFileReader + Unpin + 'static> AsyncAvroFileReader<R> {
234    // The forbid question mark thing shouldn't apply here, as it is within the future,
235    // so exported this to a separate function.
236    async fn fetch_bytes(mut reader: R, range: Range<u64>) -> Result<(R, Bytes), AvroError> {
237        let data = reader.get_bytes(range).await?;
238        Ok((reader, data))
239    }
240
241    #[forbid(clippy::question_mark_used)]
242    fn read_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<RecordBatch, AvroError>>> {
243        loop {
244            match mem::replace(&mut self.reader_state, ReaderState::InvalidState) {
245                ReaderState::Idle { reader } => {
246                    let range = self.range.clone();
247                    if range.start >= range.end {
248                        return self.finish_with_error(AvroError::InvalidArgument(format!(
249                            "Invalid range specified for Avro file: start {} >= end {}, file_size: {}",
250                            range.start, range.end, self.file_size
251                        )));
252                    }
253
254                    let future = Self::fetch_bytes(reader, range).boxed();
255                    self.reader_state = ReaderState::FetchingData {
256                        future,
257                        next_behaviour: FetchNextBehaviour::ReadSyncMarker,
258                    };
259                }
260                ReaderState::FetchingData {
261                    mut future,
262                    next_behaviour,
263                } => {
264                    let (reader, data_chunk) = match future.poll_unpin(cx) {
265                        Poll::Ready(Ok(data)) => data,
266                        Poll::Ready(Err(e)) => return self.finish_with_error(e),
267                        Poll::Pending => {
268                            self.reader_state = ReaderState::FetchingData {
269                                future,
270                                next_behaviour,
271                            };
272                            return Poll::Pending;
273                        }
274                    };
275
276                    match next_behaviour {
277                        FetchNextBehaviour::ReadSyncMarker => {
278                            let sync_marker_pos = data_chunk
279                                .windows(16)
280                                .position(|slice| slice == self.sync_marker);
281                            let block_start = match sync_marker_pos {
282                                Some(pos) => pos + 16, // Move past the sync marker
283                                None => {
284                                    // Sync marker not found, valid if we arbitrarily split the file at its end.
285                                    self.reader_state = ReaderState::Finished;
286                                    return Poll::Ready(None);
287                                }
288                            };
289
290                            self.reader_state = ReaderState::DecodingBlock {
291                                reader,
292                                data: data_chunk.slice(block_start..),
293                            };
294                        }
295                        FetchNextBehaviour::DecodeVLQHeader => {
296                            let mut data = data_chunk;
297
298                            // Feed bytes one at a time until we reach Data state (VLQ header complete)
299                            while !matches!(self.block_decoder.state(), BlockDecoderState::Data) {
300                                if data.is_empty() {
301                                    return self.finish_with_error(AvroError::EOF(
302                                        "Unexpected EOF while reading Avro block header".into(),
303                                    ));
304                                }
305                                let consumed = match self.block_decoder.decode(&data[..1]) {
306                                    Ok(consumed) => consumed,
307                                    Err(e) => return self.finish_with_error(e),
308                                };
309                                if consumed == 0 {
310                                    return self.finish_with_error(AvroError::General(
311                                        "BlockDecoder failed to consume byte during VLQ header parsing"
312                                            .into(),
313                                    ));
314                                }
315                                data = data.slice(consumed..);
316                            }
317
318                            // Now we know the block size. Slice remaining data to what we need.
319                            let bytes_remaining = self.block_decoder.bytes_remaining();
320                            let data_to_use = data.slice(..data.len().min(bytes_remaining));
321                            let consumed = match self.block_decoder.decode(&data_to_use) {
322                                Ok(consumed) => consumed,
323                                Err(e) => return self.finish_with_error(e),
324                            };
325                            if consumed != data_to_use.len() {
326                                return self.finish_with_error(AvroError::General(
327                                    "BlockDecoder failed to consume all bytes after VLQ header parsing"
328                                        .into(),
329                                ));
330                            }
331
332                            // May need more data to finish the block.
333                            let range_to_fetch = match self.remaining_block_range() {
334                                Ok(range) if range.is_empty() => {
335                                    // All bytes fetched, move to decoding block directly
336                                    self.reader_state = ReaderState::DecodingBlock {
337                                        reader,
338                                        data: Bytes::new(),
339                                    };
340                                    continue;
341                                }
342                                Ok(range) => range,
343                                Err(e) => return self.finish_with_error(e),
344                            };
345
346                            let future = Self::fetch_bytes(reader, range_to_fetch).boxed();
347                            self.reader_state = ReaderState::FetchingData {
348                                future,
349                                next_behaviour: FetchNextBehaviour::ContinueDecoding,
350                            };
351                            continue;
352                        }
353                        FetchNextBehaviour::ContinueDecoding => {
354                            self.reader_state = ReaderState::DecodingBlock {
355                                reader,
356                                data: data_chunk,
357                            };
358                        }
359                    }
360                }
361                ReaderState::InvalidState => {
362                    return self.finish_with_error(AvroError::General(
363                        "AsyncAvroFileReader in invalid state".into(),
364                    ));
365                }
366                ReaderState::DecodingBlock { reader, mut data } => {
367                    // Try to decode another block from the buffered reader.
368                    let consumed = match self.block_decoder.decode(&data) {
369                        Ok(consumed) => consumed,
370                        Err(e) => return self.finish_with_error(e),
371                    };
372                    data = data.slice(consumed..);
373
374                    // If we reached the end of the block, flush it, and move to read batches.
375                    if let Some(block) = self.block_decoder.flush() {
376                        // Successfully decoded a block.
377                        let block_count = block.count;
378                        let block_data = Bytes::from_owner(if let Some(ref codec) = self.codec {
379                            match codec.decompress(&block.data) {
380                                Ok(decompressed) => decompressed,
381                                Err(e) => return self.finish_with_error(e),
382                            }
383                        } else {
384                            block.data
385                        });
386
387                        // Since we have an active block, move to reading batches
388                        self.reader_state = ReaderState::ReadingBatches {
389                            reader,
390                            data,
391                            block_data,
392                            remaining_in_block: block_count,
393                        };
394                        continue;
395                    }
396
397                    // data should always be consumed unless Finished, if it wasn't, something went wrong
398                    if !data.is_empty() {
399                        return self.finish_with_error(AvroError::General(
400                            "BlockDecoder failed to make progress decoding Avro block".into(),
401                        ));
402                    }
403
404                    if matches!(self.block_decoder.state(), BlockDecoderState::Finished) {
405                        // We've already flushed, so if no batch was produced, we are simply done.
406                        self.finishing_partial_block = false;
407                        self.start_flushing();
408                        continue;
409                    }
410
411                    // If we've tried the following stage before, and still can't decode,
412                    // this means the file is truncated or corrupted.
413                    if self.finishing_partial_block {
414                        return self.finish_with_error(AvroError::EOF(
415                            "Unexpected EOF while reading last Avro block".into(),
416                        ));
417                    }
418
419                    // Avro splitting case: block is incomplete, we need to:
420                    // 1. Parse the length so we know how much to read
421                    // 2. Fetch more data from the reader
422                    // 3. Create a new block data from the remaining slice and the newly fetched data
423                    // 4. Continue decoding until end of block
424                    self.finishing_partial_block = true;
425
426                    // Mid-block, but we don't know how many bytes are missing yet
427                    if matches!(
428                        self.block_decoder.state(),
429                        BlockDecoderState::Count | BlockDecoderState::Size
430                    ) {
431                        // Max VLQ header is 20 bytes (10 bytes each for count and size).
432                        // Fetch just enough to complete it.
433                        const MAX_VLQ_HEADER_SIZE: u64 = 20;
434                        let fetch_end = (self.range.end + MAX_VLQ_HEADER_SIZE).min(self.file_size);
435
436                        // If there is nothing more to fetch, error out
437                        if fetch_end == self.range.end {
438                            return self.finish_with_error(AvroError::EOF(
439                                "Unexpected EOF while reading Avro block header".into(),
440                            ));
441                        }
442
443                        let range_to_fetch = self.range.end..fetch_end;
444                        self.range.end = fetch_end; // Track that we've fetched these bytes
445
446                        let future = Self::fetch_bytes(reader, range_to_fetch).boxed();
447                        self.reader_state = ReaderState::FetchingData {
448                            future,
449                            next_behaviour: FetchNextBehaviour::DecodeVLQHeader,
450                        };
451                        continue;
452                    }
453
454                    // Otherwise, we're mid-block but know how many bytes are remaining to fetch.
455                    let range_to_fetch = match self.remaining_block_range() {
456                        Ok(range) => range,
457                        Err(e) => return self.finish_with_error(e),
458                    };
459
460                    let future = Self::fetch_bytes(reader, range_to_fetch).boxed();
461                    self.reader_state = ReaderState::FetchingData {
462                        future,
463                        next_behaviour: FetchNextBehaviour::ContinueDecoding,
464                    };
465                    continue;
466                }
467                ReaderState::ReadingBatches {
468                    reader,
469                    data,
470                    mut block_data,
471                    mut remaining_in_block,
472                } => {
473                    let (consumed, records_decoded) =
474                        match self.decoder.decode_block(&block_data, remaining_in_block) {
475                            Ok((consumed, records_decoded)) => (consumed, records_decoded),
476                            Err(e) => return self.finish_with_error(e),
477                        };
478
479                    remaining_in_block -= records_decoded;
480
481                    if remaining_in_block == 0 {
482                        if data.is_empty() {
483                            // No more data to read, drain remaining buffered records
484                            self.start_flushing();
485                        } else {
486                            // Finished this block, move to decode next block in the next iteration
487                            self.reader_state = ReaderState::DecodingBlock { reader, data };
488                        }
489                    } else {
490                        // Still more records to decode in this block, slice the already-read data and stay in this state
491                        block_data = block_data.slice(consumed..);
492                        self.reader_state = ReaderState::ReadingBatches {
493                            reader,
494                            data,
495                            block_data,
496                            remaining_in_block,
497                        };
498                    }
499
500                    // We have a full batch ready, emit it
501                    // (This is not mutually exclusive with the block being finished, so the state change is valid)
502                    if self.decoder.batch_is_full() {
503                        return match self.decoder.flush() {
504                            Ok(Some(batch)) => Poll::Ready(Some(Ok(batch))),
505                            Ok(None) => self.finish_with_error(AvroError::General(
506                                "Decoder reported a full batch, but flush returned None".into(),
507                            )),
508                            Err(e) => self.finish_with_error(e),
509                        };
510                    }
511                }
512                ReaderState::Flushing => {
513                    return self.poll_flush();
514                }
515                ReaderState::Finished => {
516                    // Terminal: once finished (including after an error), always yield None
517                    self.reader_state = ReaderState::Finished;
518                    return Poll::Ready(None);
519                }
520            }
521        }
522    }
523}
524
525// To maintain compatibility with the expected stream results in the ecosystem, this returns ArrowError.
526impl<R: AsyncFileReader + Unpin + 'static> Stream for AsyncAvroFileReader<R> {
527    type Item = Result<RecordBatch, ArrowError>;
528
529    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
530        self.read_next(cx).map_err(Into::into)
531    }
532}
533
534#[cfg(all(test, feature = "object_store"))]
535mod tests {
536    use super::*;
537    use crate::schema::{AvroSchema, SCHEMA_METADATA_KEY};
538    use arrow_array::cast::AsArray;
539    use arrow_array::types::{Int32Type, Int64Type};
540    use arrow_array::*;
541    use arrow_schema::{DataType, Field, Schema, SchemaRef, TimeUnit};
542    use futures::{StreamExt, TryStreamExt};
543    use object_store::local::LocalFileSystem;
544    use object_store::path::Path;
545    use object_store::{ObjectStore, ObjectStoreExt};
546    use std::collections::HashMap;
547    use std::sync::Arc;
548
549    fn arrow_test_data(file: &str) -> String {
550        let base =
551            std::env::var("ARROW_TEST_DATA").unwrap_or_else(|_| "../testing/data".to_string());
552        format!("{}/{}", base, file)
553    }
554
555    fn get_alltypes_schema() -> SchemaRef {
556        let schema = Schema::new(vec![
557            Field::new("id", DataType::Int32, true),
558            Field::new("bool_col", DataType::Boolean, true),
559            Field::new("tinyint_col", DataType::Int32, true),
560            Field::new("smallint_col", DataType::Int32, true),
561            Field::new("int_col", DataType::Int32, true),
562            Field::new("bigint_col", DataType::Int64, true),
563            Field::new("float_col", DataType::Float32, true),
564            Field::new("double_col", DataType::Float64, true),
565            Field::new("date_string_col", DataType::Binary, true),
566            Field::new("string_col", DataType::Binary, true),
567            Field::new(
568                "timestamp_col",
569                DataType::Timestamp(TimeUnit::Microsecond, Some("+00:00".into())),
570                true,
571            ),
572        ])
573        .with_metadata(HashMap::from([(
574            SCHEMA_METADATA_KEY.into(),
575            r#"{
576    "type": "record",
577    "name": "topLevelRecord",
578    "fields": [
579        {
580            "name": "id",
581            "type": [
582                "int",
583                "null"
584            ]
585        },
586        {
587            "name": "bool_col",
588            "type": [
589                "boolean",
590                "null"
591            ]
592        },
593        {
594            "name": "tinyint_col",
595            "type": [
596                "int",
597                "null"
598            ]
599        },
600        {
601            "name": "smallint_col",
602            "type": [
603                "int",
604                "null"
605            ]
606        },
607        {
608            "name": "int_col",
609            "type": [
610                "int",
611                "null"
612            ]
613        },
614        {
615            "name": "bigint_col",
616            "type": [
617                "long",
618                "null"
619            ]
620        },
621        {
622            "name": "float_col",
623            "type": [
624                "float",
625                "null"
626            ]
627        },
628        {
629            "name": "double_col",
630            "type": [
631                "double",
632                "null"
633            ]
634        },
635        {
636            "name": "date_string_col",
637            "type": [
638                "bytes",
639                "null"
640            ]
641        },
642        {
643            "name": "string_col",
644            "type": [
645                "bytes",
646                "null"
647            ]
648        },
649        {
650            "name": "timestamp_col",
651            "type": [
652                {
653                    "type": "long",
654                    "logicalType": "timestamp-micros"
655                },
656                "null"
657            ]
658        }
659    ]
660}
661"#
662            .into(),
663        )]));
664        Arc::new(schema)
665    }
666
667    fn get_alltypes_with_nulls_schema() -> SchemaRef {
668        let schema = Schema::new(vec![
669            Field::new("string_col", DataType::Binary, true),
670            Field::new("int_col", DataType::Int32, true),
671            Field::new("bool_col", DataType::Boolean, true),
672            Field::new("bigint_col", DataType::Int64, true),
673            Field::new("float_col", DataType::Float32, true),
674            Field::new("double_col", DataType::Float64, true),
675            Field::new("bytes_col", DataType::Binary, true),
676        ])
677        .with_metadata(HashMap::from([(
678            SCHEMA_METADATA_KEY.into(),
679            r#"{
680    "type": "record",
681    "name": "topLevelRecord",
682    "fields": [
683        {
684            "name": "string_col",
685            "type": [
686                "null",
687                "string"
688            ],
689            "default": null
690        },
691        {
692            "name": "int_col",
693            "type": [
694                "null",
695                "int"
696            ],
697            "default": null
698        },
699        {
700            "name": "bool_col",
701            "type": [
702                "null",
703                "boolean"
704            ],
705            "default": null
706        },
707        {
708            "name": "bigint_col",
709            "type": [
710                "null",
711                "long"
712            ],
713            "default": null
714        },
715        {
716            "name": "float_col",
717            "type": [
718                "null",
719                "float"
720            ],
721            "default": null
722        },
723        {
724            "name": "double_col",
725            "type": [
726                "null",
727                "double"
728            ],
729            "default": null
730        },
731        {
732            "name": "bytes_col",
733            "type": [
734                "null",
735                "bytes"
736            ],
737            "default": null
738        }
739    ]
740}"#
741            .into(),
742        )]));
743
744        Arc::new(schema)
745    }
746
747    fn get_nested_records_schema() -> SchemaRef {
748        let schema = Schema::new(vec![
749            Field::new(
750                "f1",
751                DataType::Struct(
752                    vec![
753                        Field::new("f1_1", DataType::Utf8, false),
754                        Field::new("f1_2", DataType::Int32, false),
755                        Field::new(
756                            "f1_3",
757                            DataType::Struct(
758                                vec![Field::new("f1_3_1", DataType::Float64, false)].into(),
759                            ),
760                            false,
761                        ),
762                    ]
763                    .into(),
764                ),
765                false,
766            ),
767            Field::new(
768                "f2",
769                DataType::List(Arc::new(Field::new(
770                    "item",
771                    DataType::Struct(
772                        vec![
773                            Field::new("f2_1", DataType::Boolean, false),
774                            Field::new("f2_2", DataType::Float32, false),
775                        ]
776                        .into(),
777                    ),
778                    false,
779                ))),
780                false,
781            ),
782            Field::new(
783                "f3",
784                DataType::Struct(vec![Field::new("f3_1", DataType::Utf8, false)].into()),
785                true,
786            ),
787            Field::new(
788                "f4",
789                DataType::List(Arc::new(Field::new(
790                    "item",
791                    DataType::Struct(vec![Field::new("f4_1", DataType::Int64, false)].into()),
792                    true,
793                ))),
794                false,
795            ),
796        ])
797        .with_metadata(HashMap::from([(
798            SCHEMA_METADATA_KEY.into(),
799            r#"{
800    "type": "record",
801    "namespace": "ns1",
802    "name": "record1",
803    "fields": [
804        {
805            "name": "f1",
806            "type": {
807                "type": "record",
808                "namespace": "ns2",
809                "name": "record2",
810                "fields": [
811                    {
812                        "name": "f1_1",
813                        "type": "string"
814                    },
815                    {
816                        "name": "f1_2",
817                        "type": "int"
818                    },
819                    {
820                        "name": "f1_3",
821                        "type": {
822                            "type": "record",
823                            "namespace": "ns3",
824                            "name": "record3",
825                            "fields": [
826                                {
827                                    "name": "f1_3_1",
828                                    "type": "double"
829                                }
830                            ]
831                        }
832                    }
833                ]
834            }
835        },
836        {
837            "name": "f2",
838            "type": {
839                "type": "array",
840                "items": {
841                    "type": "record",
842                    "namespace": "ns4",
843                    "name": "record4",
844                    "fields": [
845                        {
846                            "name": "f2_1",
847                            "type": "boolean"
848                        },
849                        {
850                            "name": "f2_2",
851                            "type": "float"
852                        }
853                    ]
854                }
855            }
856        },
857        {
858            "name": "f3",
859            "type": [
860                "null",
861                {
862                    "type": "record",
863                    "namespace": "ns5",
864                    "name": "record5",
865                    "fields": [
866                        {
867                            "name": "f3_1",
868                            "type": "string"
869                        }
870                    ]
871                }
872            ],
873            "default": null
874        },
875        {
876            "name": "f4",
877            "type": {
878                "type": "array",
879                "items": [
880                    "null",
881                    {
882                        "type": "record",
883                        "namespace": "ns6",
884                        "name": "record6",
885                        "fields": [
886                            {
887                                "name": "f4_1",
888                                "type": "long"
889                            }
890                        ]
891                    }
892                ]
893            }
894        }
895    ]
896}
897"#
898            .into(),
899        )]));
900
901        Arc::new(schema)
902    }
903
904    async fn read_async_file(
905        path: &str,
906        batch_size: usize,
907        range: Option<Range<u64>>,
908        schema: Option<SchemaRef>,
909        projection: Option<Vec<usize>>,
910    ) -> Result<Vec<RecordBatch>, ArrowError> {
911        let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
912        let location = Path::from_filesystem_path(path).unwrap();
913
914        let file_size = store.head(&location).await.unwrap().size;
915
916        let file_reader = AvroObjectReader::new(store, location);
917        let mut builder = AsyncAvroFileReader::builder(file_reader, file_size, batch_size);
918
919        if let Some(s) = schema {
920            let reader_schema = AvroSchema::try_from(s.as_ref())?;
921            builder = builder.with_reader_schema(reader_schema);
922        }
923
924        if let Some(proj) = projection {
925            builder = builder.with_projection(proj);
926        }
927
928        if let Some(range) = range {
929            builder = builder.with_range(range);
930        }
931
932        let reader = builder.try_build().await?;
933        reader.try_collect().await
934    }
935
936    #[tokio::test]
937    async fn test_full_file_read() {
938        let file = arrow_test_data("avro/alltypes_plain.avro");
939        let schema = get_alltypes_schema();
940        let batches = read_async_file(&file, 1024, None, Some(schema), None)
941            .await
942            .unwrap();
943        let batch = &batches[0];
944
945        assert_eq!(batch.num_rows(), 8);
946        assert_eq!(batch.num_columns(), 11);
947
948        let id_array = batch
949            .column(0)
950            .as_any()
951            .downcast_ref::<Int32Array>()
952            .unwrap();
953        assert_eq!(id_array.value(0), 4);
954        assert_eq!(id_array.value(7), 1);
955    }
956
957    #[tokio::test]
958    async fn test_small_batch_size() {
959        let file = arrow_test_data("avro/alltypes_plain.avro");
960        let schema = get_alltypes_schema();
961        let batches = read_async_file(&file, 2, None, Some(schema), None)
962            .await
963            .unwrap();
964        assert_eq!(batches.len(), 4);
965
966        let batch = &batches[0];
967
968        assert_eq!(batch.num_rows(), 2);
969        assert_eq!(batch.num_columns(), 11);
970    }
971
972    #[tokio::test]
973    async fn test_batch_size_one() {
974        let file = arrow_test_data("avro/alltypes_plain.avro");
975        let schema = get_alltypes_schema();
976        let batches = read_async_file(&file, 1, None, Some(schema), None)
977            .await
978            .unwrap();
979        let batch = &batches[0];
980
981        assert_eq!(batches.len(), 8);
982        assert_eq!(batch.num_rows(), 1);
983    }
984
985    #[tokio::test]
986    async fn test_batch_size_larger_than_file() {
987        let file = arrow_test_data("avro/alltypes_plain.avro");
988        let schema = get_alltypes_schema();
989        let batches = read_async_file(&file, 10000, None, Some(schema), None)
990            .await
991            .unwrap();
992        let batch = &batches[0];
993
994        assert_eq!(batch.num_rows(), 8);
995    }
996
997    #[tokio::test]
998    async fn test_empty_range() {
999        let file = arrow_test_data("avro/alltypes_plain.avro");
1000        let range = 100..100;
1001        let schema = get_alltypes_schema();
1002        let batches = read_async_file(&file, 1024, Some(range), Some(schema), None)
1003            .await
1004            .unwrap();
1005        assert_eq!(batches.len(), 0);
1006    }
1007
1008    #[tokio::test]
1009    async fn test_range_starting_at_zero() {
1010        // Tests that range starting at 0 correctly skips header
1011        let file = arrow_test_data("avro/alltypes_plain.avro");
1012        let store = Arc::new(LocalFileSystem::new());
1013        let location = Path::from_filesystem_path(&file).unwrap();
1014        let meta = store.head(&location).await.unwrap();
1015
1016        let range = 0..meta.size;
1017        let schema = get_alltypes_schema();
1018        let batches = read_async_file(&file, 1024, Some(range), Some(schema), None)
1019            .await
1020            .unwrap();
1021        let batch = &batches[0];
1022
1023        assert_eq!(batch.num_rows(), 8);
1024    }
1025
1026    #[tokio::test]
1027    async fn test_range_after_header() {
1028        let file = arrow_test_data("avro/alltypes_plain.avro");
1029        let store = Arc::new(LocalFileSystem::new());
1030        let location = Path::from_filesystem_path(&file).unwrap();
1031        let meta = store.head(&location).await.unwrap();
1032
1033        let range = 100..meta.size;
1034        let schema = get_alltypes_schema();
1035        let batches = read_async_file(&file, 1024, Some(range), Some(schema), None)
1036            .await
1037            .unwrap();
1038        let batch = &batches[0];
1039
1040        assert!(batch.num_rows() > 0);
1041    }
1042
1043    #[tokio::test]
1044    async fn test_range_no_sync_marker() {
1045        // Small range unlikely to contain sync marker
1046        let file = arrow_test_data("avro/alltypes_plain.avro");
1047        let range = 50..150;
1048        let schema = get_alltypes_schema();
1049        let batches = read_async_file(&file, 1024, Some(range), Some(schema), None)
1050            .await
1051            .unwrap();
1052        assert_eq!(batches.len(), 0);
1053    }
1054
1055    #[tokio::test]
1056    async fn test_range_starting_mid_file() {
1057        let file = arrow_test_data("avro/alltypes_plain.avro");
1058
1059        let range = 700..768; // Header ends at 675, so this should be mid-block
1060        let schema = get_alltypes_schema();
1061        let batches = read_async_file(&file, 1024, Some(range), Some(schema), None)
1062            .await
1063            .unwrap();
1064        assert_eq!(batches.len(), 0);
1065    }
1066
1067    #[tokio::test]
1068    async fn test_range_ending_at_file_size() {
1069        let file = arrow_test_data("avro/alltypes_plain.avro");
1070        let store = Arc::new(LocalFileSystem::new());
1071        let location = Path::from_filesystem_path(&file).unwrap();
1072        let meta = store.head(&location).await.unwrap();
1073
1074        let range = 200..meta.size;
1075        let schema = get_alltypes_schema();
1076        let batches = read_async_file(&file, 1024, Some(range), Some(schema), None)
1077            .await
1078            .unwrap();
1079        let batch = &batches[0];
1080
1081        assert_eq!(batch.num_rows(), 8);
1082    }
1083
1084    #[tokio::test]
1085    async fn test_incomplete_block_requires_fetch() {
1086        // Range ends mid-block, should trigger fetching_rem_block logic
1087        let file = arrow_test_data("avro/alltypes_plain.avro");
1088        let range = 0..1200;
1089        let schema = get_alltypes_schema();
1090        let batches = read_async_file(&file, 1024, Some(range), Some(schema), None)
1091            .await
1092            .unwrap();
1093        let batch = &batches[0];
1094
1095        assert_eq!(batch.num_rows(), 8)
1096    }
1097
1098    #[tokio::test]
1099    async fn test_partial_vlq_header_requires_fetch() {
1100        // Range ends mid-VLQ header, triggering the Count|Size partial fetch logic.
1101        let file = arrow_test_data("avro/alltypes_plain.avro");
1102        let range = 16..676; // Header should end at 675
1103        let schema = get_alltypes_schema();
1104        let batches = read_async_file(&file, 1024, Some(range), Some(schema), None)
1105            .await
1106            .unwrap();
1107        let batch = &batches[0];
1108
1109        assert_eq!(batch.num_rows(), 8)
1110    }
1111
1112    #[cfg(feature = "snappy")]
1113    #[tokio::test]
1114    async fn test_snappy_compressed_with_range() {
1115        {
1116            let file = arrow_test_data("avro/alltypes_plain.snappy.avro");
1117            let store = Arc::new(LocalFileSystem::new());
1118            let location = Path::from_filesystem_path(&file).unwrap();
1119            let meta = store.head(&location).await.unwrap();
1120
1121            let range = 200..meta.size;
1122            let schema = get_alltypes_schema();
1123            let batches = read_async_file(&file, 1024, Some(range), Some(schema), None)
1124                .await
1125                .unwrap();
1126            let batch = &batches[0];
1127
1128            assert!(batch.num_rows() > 0);
1129        }
1130    }
1131
1132    #[tokio::test]
1133    async fn test_nulls() {
1134        let file = arrow_test_data("avro/alltypes_nulls_plain.avro");
1135        let schema = get_alltypes_with_nulls_schema();
1136        let batches = read_async_file(&file, 1024, None, Some(schema), None)
1137            .await
1138            .unwrap();
1139        let batch = &batches[0];
1140
1141        assert_eq!(batch.num_rows(), 1);
1142        for col in batch.columns() {
1143            assert!(col.is_null(0));
1144        }
1145    }
1146
1147    #[tokio::test]
1148    async fn test_nested_records() {
1149        let file = arrow_test_data("avro/nested_records.avro");
1150        let schema = get_nested_records_schema();
1151        let batches = read_async_file(&file, 1024, None, Some(schema), None)
1152            .await
1153            .unwrap();
1154        let batch = &batches[0];
1155
1156        assert_eq!(batch.num_rows(), 2);
1157        assert!(batch.num_columns() > 0);
1158    }
1159
1160    #[tokio::test]
1161    async fn test_stream_produces_multiple_batches() {
1162        let file = arrow_test_data("avro/alltypes_plain.avro");
1163        let store = Arc::new(LocalFileSystem::new());
1164        let location = Path::from_filesystem_path(&file).unwrap();
1165
1166        let file_size = store.head(&location).await.unwrap().size;
1167
1168        let file_reader = AvroObjectReader::new(store, location);
1169        let schema = get_alltypes_schema();
1170        let reader_schema = AvroSchema::try_from(schema.as_ref()).unwrap();
1171        let reader = AsyncAvroFileReader::builder(
1172            file_reader,
1173            file_size,
1174            2, // Small batch size to force multiple batches
1175        )
1176        .with_reader_schema(reader_schema)
1177        .try_build()
1178        .await
1179        .unwrap();
1180
1181        let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
1182
1183        assert!(batches.len() > 1);
1184        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
1185        assert_eq!(total_rows, 8);
1186    }
1187
1188    #[tokio::test]
1189    async fn test_stream_early_termination() {
1190        let file = arrow_test_data("avro/alltypes_plain.avro");
1191        let store = Arc::new(LocalFileSystem::new());
1192        let location = Path::from_filesystem_path(&file).unwrap();
1193
1194        let file_size = store.head(&location).await.unwrap().size;
1195
1196        let file_reader = AvroObjectReader::new(store, location);
1197        let schema = get_alltypes_schema();
1198        let reader_schema = AvroSchema::try_from(schema.as_ref()).unwrap();
1199        let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1)
1200            .with_reader_schema(reader_schema)
1201            .try_build()
1202            .await
1203            .unwrap();
1204
1205        let first_batch = reader.take(1).try_collect::<Vec<_>>().await.unwrap();
1206
1207        assert_eq!(first_batch.len(), 1);
1208        assert!(first_batch[0].num_rows() > 0);
1209    }
1210
1211    #[tokio::test]
1212    async fn test_various_batch_sizes() {
1213        let file = arrow_test_data("avro/alltypes_plain.avro");
1214
1215        for batch_size in [1, 2, 3, 5, 7, 11, 100] {
1216            let schema = get_alltypes_schema();
1217            let batches = read_async_file(&file, batch_size, None, Some(schema), None)
1218                .await
1219                .unwrap();
1220            let batch = &batches[0];
1221
1222            // Size should be what was provided, to the limit of the batch in the file
1223            assert_eq!(
1224                batch.num_rows(),
1225                batch_size.min(8),
1226                "Failed with batch_size={}",
1227                batch_size
1228            );
1229        }
1230    }
1231
1232    #[tokio::test]
1233    async fn test_range_larger_than_file() {
1234        let file = arrow_test_data("avro/alltypes_plain.avro");
1235        let store = Arc::new(LocalFileSystem::new());
1236        let location = Path::from_filesystem_path(&file).unwrap();
1237        let meta = store.head(&location).await.unwrap();
1238
1239        // Range extends beyond file size
1240        let range = 100..(meta.size + 1000);
1241        let schema = get_alltypes_schema();
1242        let batches = read_async_file(&file, 1024, Some(range), Some(schema), None)
1243            .await
1244            .unwrap();
1245        let batch = &batches[0];
1246
1247        // Should clamp to file size
1248        assert_eq!(batch.num_rows(), 8);
1249    }
1250
1251    #[tokio::test]
1252    async fn test_roundtrip_write_then_async_read() {
1253        use crate::writer::AvroWriter;
1254        use arrow_array::{Float64Array, StringArray};
1255        use std::fs::File;
1256        use std::io::BufWriter;
1257        use tempfile::tempdir;
1258
1259        // Schema with nullable and non-nullable fields of various types
1260        let schema = Arc::new(Schema::new(vec![
1261            Field::new("id", DataType::Int32, false),
1262            Field::new("name", DataType::Utf8, true),
1263            Field::new("score", DataType::Float64, true),
1264            Field::new("count", DataType::Int64, false),
1265        ]));
1266
1267        let dir = tempdir().unwrap();
1268        let file_path = dir.path().join("roundtrip_test.avro");
1269
1270        // Write multiple batches with nulls
1271        {
1272            let file = File::create(&file_path).unwrap();
1273            let writer = BufWriter::new(file);
1274            let mut avro_writer = AvroWriter::new(writer, schema.as_ref().clone()).unwrap();
1275
1276            // First batch: 3 rows with some nulls
1277            let batch1 = RecordBatch::try_new(
1278                schema.clone(),
1279                vec![
1280                    Arc::new(Int32Array::from(vec![1, 2, 3])),
1281                    Arc::new(StringArray::from(vec![
1282                        Some("alice"),
1283                        None,
1284                        Some("charlie"),
1285                    ])),
1286                    Arc::new(Float64Array::from(vec![Some(95.5), Some(87.3), None])),
1287                    Arc::new(Int64Array::from(vec![10, 20, 30])),
1288                ],
1289            )
1290            .unwrap();
1291            avro_writer.write(&batch1).unwrap();
1292
1293            // Second batch: 2 rows
1294            let batch2 = RecordBatch::try_new(
1295                schema.clone(),
1296                vec![
1297                    Arc::new(Int32Array::from(vec![4, 5])),
1298                    Arc::new(StringArray::from(vec![Some("diana"), Some("eve")])),
1299                    Arc::new(Float64Array::from(vec![None, Some(88.0)])),
1300                    Arc::new(Int64Array::from(vec![40, 50])),
1301                ],
1302            )
1303            .unwrap();
1304            avro_writer.write(&batch2).unwrap();
1305
1306            avro_writer.finish().unwrap();
1307        }
1308
1309        // Read back with small batch size to produce multiple output batches
1310        let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
1311        let location = Path::from_filesystem_path(&file_path).unwrap();
1312        let file_size = store.head(&location).await.unwrap().size;
1313
1314        let file_reader = AvroObjectReader::new(store, location);
1315        let reader = AsyncAvroFileReader::builder(file_reader, file_size, 2)
1316            .try_build()
1317            .await
1318            .unwrap();
1319
1320        let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
1321
1322        // Verify we got multiple output batches due to small batch_size
1323        assert!(
1324            batches.len() > 1,
1325            "Expected multiple batches with batch_size=2"
1326        );
1327
1328        // Verify total row count
1329        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
1330        assert_eq!(total_rows, 5);
1331
1332        // Concatenate all batches to verify data
1333        let combined = arrow::compute::concat_batches(&batches[0].schema(), &batches).unwrap();
1334        assert_eq!(combined.num_rows(), 5);
1335        assert_eq!(combined.num_columns(), 4);
1336
1337        // Check id column (non-nullable)
1338        let id_array = combined
1339            .column(0)
1340            .as_any()
1341            .downcast_ref::<Int32Array>()
1342            .unwrap();
1343        assert_eq!(id_array.values(), &[1, 2, 3, 4, 5]);
1344
1345        // Check name column (nullable) - verify nulls are preserved
1346        // Avro strings are read as Binary by default
1347        let name_col = combined.column(1);
1348        let name_array = name_col.as_string::<i32>();
1349        assert_eq!(name_array.value(0), "alice");
1350        assert!(name_col.is_null(1)); // second row has null name
1351        assert_eq!(name_array.value(2), "charlie");
1352
1353        // Check score column (nullable) - verify nulls are preserved
1354        let score_array = combined
1355            .column(2)
1356            .as_any()
1357            .downcast_ref::<Float64Array>()
1358            .unwrap();
1359        assert!(!score_array.is_null(0));
1360        assert!((score_array.value(0) - 95.5).abs() < f64::EPSILON);
1361        assert!(score_array.is_null(2)); // third row has null score
1362        assert!(score_array.is_null(3)); // fourth row has null score
1363        assert!(!score_array.is_null(4));
1364        assert!((score_array.value(4) - 88.0).abs() < f64::EPSILON);
1365
1366        // Check count column (non-nullable)
1367        let count_array = combined
1368            .column(3)
1369            .as_any()
1370            .downcast_ref::<Int64Array>()
1371            .unwrap();
1372        assert_eq!(count_array.values(), &[10, 20, 30, 40, 50]);
1373    }
1374
1375    #[tokio::test]
1376    async fn test_alltypes_no_schema_no_projection() {
1377        // No reader schema, no projection - uses writer schema from file
1378        let file = arrow_test_data("avro/alltypes_plain.avro");
1379        let batches = read_async_file(&file, 1024, None, None, None)
1380            .await
1381            .unwrap();
1382        let batch = &batches[0];
1383
1384        assert_eq!(batch.num_rows(), 8);
1385        assert_eq!(batch.num_columns(), 11);
1386        assert_eq!(batch.schema().field(0).name(), "id");
1387    }
1388
1389    #[tokio::test]
1390    async fn test_alltypes_no_schema_with_projection() {
1391        // No reader schema, with projection - project writer schema
1392        let file = arrow_test_data("avro/alltypes_plain.avro");
1393        // Project [tinyint_col, id, bigint_col] = indices [2, 0, 5]
1394        let batches = read_async_file(&file, 1024, None, None, Some(vec![2, 0, 5]))
1395            .await
1396            .unwrap();
1397        let batch = &batches[0];
1398
1399        assert_eq!(batch.num_rows(), 8);
1400        assert_eq!(batch.num_columns(), 3);
1401        assert_eq!(batch.schema().field(0).name(), "tinyint_col");
1402        assert_eq!(batch.schema().field(1).name(), "id");
1403        assert_eq!(batch.schema().field(2).name(), "bigint_col");
1404
1405        // Verify data values
1406        let tinyint_col = batch.column(0).as_primitive::<Int32Type>();
1407        assert_eq!(tinyint_col.values(), &[0, 1, 0, 1, 0, 1, 0, 1]);
1408
1409        let id = batch.column(1).as_primitive::<Int32Type>();
1410        assert_eq!(id.values(), &[4, 5, 6, 7, 2, 3, 0, 1]);
1411
1412        let bigint_col = batch.column(2).as_primitive::<Int64Type>();
1413        assert_eq!(bigint_col.values(), &[0, 10, 0, 10, 0, 10, 0, 10]);
1414    }
1415
1416    #[tokio::test]
1417    async fn test_alltypes_with_schema_no_projection() {
1418        // With reader schema, no projection
1419        let file = arrow_test_data("avro/alltypes_plain.avro");
1420        let schema = get_alltypes_schema();
1421        let batches = read_async_file(&file, 1024, None, Some(schema), None)
1422            .await
1423            .unwrap();
1424        let batch = &batches[0];
1425
1426        assert_eq!(batch.num_rows(), 8);
1427        assert_eq!(batch.num_columns(), 11);
1428    }
1429
1430    #[tokio::test]
1431    async fn test_alltypes_with_schema_with_projection() {
1432        // With reader schema, with projection
1433        let file = arrow_test_data("avro/alltypes_plain.avro");
1434        let schema = get_alltypes_schema();
1435        // Project [bool_col, id] = indices [1, 0]
1436        let batches = read_async_file(&file, 1024, None, Some(schema), Some(vec![1, 0]))
1437            .await
1438            .unwrap();
1439        let batch = &batches[0];
1440
1441        assert_eq!(batch.num_rows(), 8);
1442        assert_eq!(batch.num_columns(), 2);
1443        assert_eq!(batch.schema().field(0).name(), "bool_col");
1444        assert_eq!(batch.schema().field(1).name(), "id");
1445
1446        let bool_col = batch.column(0).as_boolean();
1447        assert!(bool_col.value(0));
1448        assert!(!bool_col.value(1));
1449
1450        let id = batch.column(1).as_primitive::<Int32Type>();
1451        assert_eq!(id.values(), &[4, 5, 6, 7, 2, 3, 0, 1]);
1452    }
1453
1454    #[tokio::test]
1455    async fn test_nested_no_schema_no_projection() {
1456        // No reader schema, no projection
1457        let file = arrow_test_data("avro/nested_records.avro");
1458        let batches = read_async_file(&file, 1024, None, None, None)
1459            .await
1460            .unwrap();
1461        let batch = &batches[0];
1462
1463        assert_eq!(batch.num_rows(), 2);
1464        assert_eq!(batch.num_columns(), 4);
1465        assert_eq!(batch.schema().field(0).name(), "f1");
1466        assert_eq!(batch.schema().field(1).name(), "f2");
1467        assert_eq!(batch.schema().field(2).name(), "f3");
1468        assert_eq!(batch.schema().field(3).name(), "f4");
1469    }
1470
1471    #[tokio::test]
1472    async fn test_nested_no_schema_with_projection() {
1473        // No reader schema, with projection - reorder nested fields
1474        let file = arrow_test_data("avro/nested_records.avro");
1475        // Project [f3, f1] = indices [2, 0]
1476        let batches = read_async_file(&file, 1024, None, None, Some(vec![2, 0]))
1477            .await
1478            .unwrap();
1479        let batch = &batches[0];
1480
1481        assert_eq!(batch.num_rows(), 2);
1482        assert_eq!(batch.num_columns(), 2);
1483        assert_eq!(batch.schema().field(0).name(), "f3");
1484        assert_eq!(batch.schema().field(1).name(), "f1");
1485    }
1486
1487    #[tokio::test]
1488    async fn test_nested_with_schema_no_projection() {
1489        // With reader schema, no projection
1490        let file = arrow_test_data("avro/nested_records.avro");
1491        let schema = get_nested_records_schema();
1492        let batches = read_async_file(&file, 1024, None, Some(schema), None)
1493            .await
1494            .unwrap();
1495        let batch = &batches[0];
1496
1497        assert_eq!(batch.num_rows(), 2);
1498        assert_eq!(batch.num_columns(), 4);
1499    }
1500
1501    #[tokio::test]
1502    async fn test_nested_with_schema_with_projection() {
1503        // With reader schema, with projection
1504        let file = arrow_test_data("avro/nested_records.avro");
1505        let schema = get_nested_records_schema();
1506        // Project [f4, f2, f1] = indices [3, 1, 0]
1507        let batches = read_async_file(&file, 1024, None, Some(schema), Some(vec![3, 1, 0]))
1508            .await
1509            .unwrap();
1510        let batch = &batches[0];
1511
1512        assert_eq!(batch.num_rows(), 2);
1513        assert_eq!(batch.num_columns(), 3);
1514        assert_eq!(batch.schema().field(0).name(), "f4");
1515        assert_eq!(batch.schema().field(1).name(), "f2");
1516        assert_eq!(batch.schema().field(2).name(), "f1");
1517    }
1518
1519    #[tokio::test]
1520    async fn test_projection_error_out_of_bounds() {
1521        let file = arrow_test_data("avro/alltypes_plain.avro");
1522        // Index 100 is out of bounds for the 11-field schema
1523        let err = read_async_file(&file, 1024, None, None, Some(vec![100]))
1524            .await
1525            .unwrap_err();
1526        assert!(matches!(err, ArrowError::AvroError(_)));
1527        assert!(err.to_string().contains("out of bounds"));
1528    }
1529
1530    #[tokio::test]
1531    async fn test_projection_error_duplicate_index() {
1532        let file = arrow_test_data("avro/alltypes_plain.avro");
1533        // Duplicate index 0
1534        let err = read_async_file(&file, 1024, None, None, Some(vec![0, 0]))
1535            .await
1536            .unwrap_err();
1537        assert!(matches!(err, ArrowError::AvroError(_)));
1538        assert!(err.to_string().contains("Duplicate projection index"));
1539    }
1540
1541    #[tokio::test]
1542    async fn test_with_header_size_hint_small() {
1543        // Use a very small header size hint to force multiple fetches
1544        let file = arrow_test_data("avro/alltypes_plain.avro");
1545        let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
1546        let location = Path::from_filesystem_path(&file).unwrap();
1547        let file_size = store.head(&location).await.unwrap().size;
1548
1549        let file_reader = AvroObjectReader::new(store, location);
1550        let schema = get_alltypes_schema();
1551        let reader_schema = AvroSchema::try_from(schema.as_ref()).unwrap();
1552
1553        // Use a tiny header hint (64 bytes) - header is much larger
1554        let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024)
1555            .with_reader_schema(reader_schema)
1556            .with_header_size_hint(64)
1557            .try_build()
1558            .await
1559            .unwrap();
1560
1561        let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
1562        let batch = &batches[0];
1563
1564        assert_eq!(batch.num_rows(), 8);
1565        assert_eq!(batch.num_columns(), 11);
1566    }
1567
1568    #[tokio::test]
1569    async fn test_with_header_size_hint_large() {
1570        // Use a larger header size hint than needed
1571        let file = arrow_test_data("avro/alltypes_plain.avro");
1572        let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
1573        let location = Path::from_filesystem_path(&file).unwrap();
1574        let file_size = store.head(&location).await.unwrap().size;
1575
1576        let file_reader = AvroObjectReader::new(store, location);
1577        let schema = get_alltypes_schema();
1578        let reader_schema = AvroSchema::try_from(schema.as_ref()).unwrap();
1579
1580        // Use a large header hint (64KB)
1581        let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024)
1582            .with_reader_schema(reader_schema)
1583            .with_header_size_hint(64 * 1024)
1584            .try_build()
1585            .await
1586            .unwrap();
1587
1588        let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
1589        let batch = &batches[0];
1590
1591        assert_eq!(batch.num_rows(), 8);
1592        assert_eq!(batch.num_columns(), 11);
1593    }
1594
1595    #[tokio::test]
1596    async fn test_with_utf8_view_enabled() {
1597        // Test that utf8_view produces StringViewArray instead of StringArray
1598        let file = arrow_test_data("avro/nested_records.avro");
1599        let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
1600        let location = Path::from_filesystem_path(&file).unwrap();
1601        let file_size = store.head(&location).await.unwrap().size;
1602
1603        let file_reader = AvroObjectReader::new(store, location);
1604
1605        let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024)
1606            .with_utf8_view(true)
1607            .try_build()
1608            .await
1609            .unwrap();
1610
1611        let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
1612        let batch = &batches[0];
1613
1614        assert_eq!(batch.num_rows(), 2);
1615
1616        // The f1 struct contains f1_1 which is a string field
1617        // With utf8_view enabled, it should be Utf8View type
1618        let f1_col = batch.column(0);
1619        let f1_struct = f1_col.as_struct();
1620        let f1_1_field = f1_struct.column_by_name("f1_1").unwrap();
1621
1622        // Check that the data type is Utf8View
1623        assert_eq!(f1_1_field.data_type(), &DataType::Utf8View);
1624    }
1625
1626    #[tokio::test]
1627    async fn test_with_utf8_view_disabled() {
1628        // Test that without utf8_view, we get regular Utf8
1629        let file = arrow_test_data("avro/nested_records.avro");
1630        let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
1631        let location = Path::from_filesystem_path(&file).unwrap();
1632        let file_size = store.head(&location).await.unwrap().size;
1633
1634        let file_reader = AvroObjectReader::new(store, location);
1635
1636        let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024)
1637            .with_utf8_view(false)
1638            .try_build()
1639            .await
1640            .unwrap();
1641
1642        let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
1643        let batch = &batches[0];
1644
1645        assert_eq!(batch.num_rows(), 2);
1646
1647        // The f1 struct contains f1_1 which is a string field
1648        // Without utf8_view, it should be regular Utf8
1649        let f1_col = batch.column(0);
1650        let f1_struct = f1_col.as_struct();
1651        let f1_1_field = f1_struct.column_by_name("f1_1").unwrap();
1652
1653        assert_eq!(f1_1_field.data_type(), &DataType::Utf8);
1654    }
1655
1656    #[tokio::test]
1657    async fn test_with_strict_mode_disabled_allows_null_second() {
1658        // Test that with strict_mode disabled, unions of ['T', 'null'] are allowed
1659        // The alltypes_nulls_plain.avro file has unions with null second
1660        let file = arrow_test_data("avro/alltypes_nulls_plain.avro");
1661        let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
1662        let location = Path::from_filesystem_path(&file).unwrap();
1663        let file_size = store.head(&location).await.unwrap().size;
1664
1665        let file_reader = AvroObjectReader::new(store, location);
1666
1667        // Without strict mode, this should succeed
1668        let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024)
1669            .with_strict_mode(false)
1670            .try_build()
1671            .await
1672            .unwrap();
1673
1674        let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
1675        assert_eq!(batches.len(), 1);
1676        assert_eq!(batches[0].num_rows(), 1);
1677    }
1678
1679    #[tokio::test]
1680    async fn test_with_strict_mode_enabled_rejects_null_second() {
1681        // Test that with strict_mode enabled, unions of ['T', 'null'] are rejected
1682        // The alltypes_plain.avro file has unions like ["int", "null"] (null second)
1683        let file = arrow_test_data("avro/alltypes_plain.avro");
1684        let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
1685        let location = Path::from_filesystem_path(&file).unwrap();
1686        let file_size = store.head(&location).await.unwrap().size;
1687
1688        let file_reader = AvroObjectReader::new(store, location);
1689
1690        // With strict mode, this should fail because of ['T', 'null'] unions
1691        let result = AsyncAvroFileReader::builder(file_reader, file_size, 1024)
1692            .with_strict_mode(true)
1693            .try_build()
1694            .await;
1695
1696        match result {
1697            Ok(_) => panic!("Expected error for strict_mode with ['T', 'null'] union"),
1698            Err(err) => {
1699                assert!(
1700                    err.to_string().contains("disallowed in strict_mode"),
1701                    "Expected strict_mode error, got: {}",
1702                    err
1703                );
1704            }
1705        }
1706    }
1707
1708    #[tokio::test]
1709    async fn test_with_strict_mode_enabled_valid_schema() {
1710        // Test that strict_mode works with schemas that have proper ['null', 'T'] unions
1711        // The nested_records.avro file has properly ordered unions
1712        let file = arrow_test_data("avro/nested_records.avro");
1713        let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
1714        let location = Path::from_filesystem_path(&file).unwrap();
1715        let file_size = store.head(&location).await.unwrap().size;
1716
1717        let file_reader = AvroObjectReader::new(store, location);
1718
1719        // With strict mode, properly ordered unions should still work
1720        let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024)
1721            .with_strict_mode(true)
1722            .try_build()
1723            .await
1724            .unwrap();
1725
1726        let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
1727        assert_eq!(batches.len(), 1);
1728        assert_eq!(batches[0].num_rows(), 2);
1729    }
1730
1731    #[tokio::test]
1732    async fn test_builder_options_combined() {
1733        // Test combining multiple builder options
1734        let file = arrow_test_data("avro/nested_records.avro");
1735        let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
1736        let location = Path::from_filesystem_path(&file).unwrap();
1737        let file_size = store.head(&location).await.unwrap().size;
1738
1739        let file_reader = AvroObjectReader::new(store, location);
1740
1741        let reader = AsyncAvroFileReader::builder(file_reader, file_size, 2)
1742            .with_header_size_hint(128)
1743            .with_utf8_view(true)
1744            .with_strict_mode(true)
1745            .with_projection(vec![0, 2]) // f1 and f3
1746            .try_build()
1747            .await
1748            .unwrap();
1749
1750        let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
1751        let batch = &batches[0];
1752
1753        // Should have 2 columns (f1 and f3) due to projection
1754        assert_eq!(batch.num_columns(), 2);
1755        assert_eq!(batch.schema().field(0).name(), "f1");
1756        assert_eq!(batch.schema().field(1).name(), "f3");
1757
1758        // Verify utf8_view is applied
1759        let f1_col = batch.column(0);
1760        let f1_struct = f1_col.as_struct();
1761        let f1_1_field = f1_struct.column_by_name("f1_1").unwrap();
1762        assert_eq!(f1_1_field.data_type(), &DataType::Utf8View);
1763    }
1764}