Skip to main content

arrow_avro/reader/async_reader/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Asynchronous implementation of Avro file reader.
19//!
20//! This module provides [`AsyncAvroFileReader`], which supports reading and decoding
21//! the Avro OCF format from any source that implements [`AsyncFileReader`].
22
23use crate::compression::CompressionCodec;
24use crate::reader::Decoder;
25use crate::reader::block::{BlockDecoder, BlockDecoderState};
26use arrow_array::RecordBatch;
27use arrow_schema::{ArrowError, SchemaRef};
28use bytes::Bytes;
29use futures::future::BoxFuture;
30use futures::{FutureExt, Stream};
31use std::mem;
32use std::ops::Range;
33use std::pin::Pin;
34use std::task::{Context, Poll};
35
36mod async_file_reader;
37mod builder;
38
39pub use async_file_reader::AsyncFileReader;
40pub use builder::{ReaderBuilder, read_header_info};
41
42#[cfg(feature = "object_store")]
43mod store;
44
45use crate::errors::AvroError;
46#[cfg(feature = "object_store")]
47pub use store::AvroObjectReader;
48
49enum FetchNextBehaviour {
50    /// Initial read: scan for sync marker, then move to decoding blocks
51    ReadSyncMarker,
52    /// Parse VLQ header bytes one at a time until Data state, then continue decoding
53    DecodeVLQHeader,
54    /// Continue decoding the current block with the fetched data
55    ContinueDecoding,
56}
57
58enum ReaderState<R> {
59    /// Intermediate state to fix ownership issues
60    InvalidState,
61    /// Initial state, fetch initial range
62    Idle { reader: R },
63    /// Fetching data from the reader
64    FetchingData {
65        future: BoxFuture<'static, Result<(R, Bytes), AvroError>>,
66        next_behaviour: FetchNextBehaviour,
67    },
68    /// Decode a block in a loop until completion
69    DecodingBlock { data: Bytes, reader: R },
70    /// Output batches from a decoded block
71    ReadingBatches {
72        data: Bytes,
73        block_data: Bytes,
74        remaining_in_block: usize,
75        reader: R,
76    },
77    /// Successfully finished reading file contents; drain any remaining buffered records
78    /// from the decoder into (possibly partial) output batches.
79    Flushing,
80    /// Done, flush decoder and return
81    Finished,
82}
83
84/// An asynchronous Avro file reader that implements `Stream<Item = Result<RecordBatch, ArrowError>>`.
85/// This uses an [`AsyncFileReader`] to fetch data ranges as needed, starting with fetching the header,
86/// then reading all the blocks in the provided range where:
87/// 1. Reads and decodes data until the header is fully decoded.
88/// 2. Searching from `range.start` for the first sync marker, and starting with the following block.
89///    (If `range.start` is less than the header length, we start at the header length minus the sync marker bytes)
90/// 3. Reading blocks sequentially, decoding them into RecordBatches.
91/// 4. If a block is incomplete (due to range ending mid-block), fetching the remaining bytes from the [`AsyncFileReader`].
92/// 5. If no range was originally provided, reads the full file.
93/// 6. If the range is 0, file_size is 0, or `range.end` is less than the header length, finish immediately.
94///
95/// # Example
96///
97/// ```
98/// #[tokio::main(flavor = "current_thread")]
99/// async fn main() -> Result<(), Box<dyn std::error::Error>> {
100/// use std::io::Cursor;
101/// use std::sync::Arc;
102/// use arrow_array::{ArrayRef, Int32Array, RecordBatch};
103/// use arrow_schema::{DataType, Field, Schema};
104/// use arrow_avro::reader::AsyncAvroFileReader;
105/// use arrow_avro::writer::AvroWriter;
106/// use futures::TryStreamExt;
107///
108/// // Build a minimal Arrow schema and batch
109/// let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
110/// let batch = RecordBatch::try_new(
111///     Arc::new(schema.clone()),
112///     vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef],
113/// )?;
114///
115/// // Write an Avro OCF to memory
116/// let buffer: Vec<u8> = Vec::new();
117/// let mut writer = AvroWriter::new(buffer, schema)?;
118/// writer.write(&batch)?;
119/// writer.finish()?;
120/// let bytes = writer.into_inner();
121///
122/// // Create an async reader from the in-memory bytes
123/// // `tokio::fs::File` also implements `AsyncFileReader` for reading from disk
124/// let file_size = bytes.len();
125/// let cursor = Cursor::new(bytes);
126/// let reader = AsyncAvroFileReader::builder(cursor, file_size as u64, 1024)
127///     .try_build()
128///     .await?;
129///
130/// // Consume the stream of RecordBatches
131/// let batches: Vec<RecordBatch> = reader.try_collect().await?;
132/// assert_eq!(batches.len(), 1);
133/// assert_eq!(batches[0].num_rows(), 3);
134/// Ok(())
135/// }
136/// ```
137pub struct AsyncAvroFileReader<R> {
138    // Members required to fetch data
139    range: Range<u64>,
140    file_size: u64,
141
142    // Members required to actually decode and read data
143    decoder: Decoder,
144    block_decoder: BlockDecoder,
145    codec: Option<CompressionCodec>,
146    sync_marker: [u8; 16],
147
148    // Members keeping the current state of the reader
149    reader_state: ReaderState<R>,
150    finishing_partial_block: bool,
151}
152
153impl<R> AsyncAvroFileReader<R> {
154    /// Returns a builder for a new [`Self`], allowing some optional parameters.
155    pub fn builder(reader: R, file_size: u64, batch_size: usize) -> ReaderBuilder<R> {
156        ReaderBuilder::new(reader, file_size, batch_size)
157    }
158
159    fn new(
160        range: Range<u64>,
161        file_size: u64,
162        decoder: Decoder,
163        codec: Option<CompressionCodec>,
164        sync_marker: [u8; 16],
165        reader_state: ReaderState<R>,
166    ) -> Self {
167        Self {
168            range,
169            file_size,
170
171            decoder,
172            block_decoder: Default::default(),
173            codec,
174            sync_marker,
175
176            reader_state,
177            finishing_partial_block: false,
178        }
179    }
180
181    /// Returns the Arrow schema for batches produced by this reader.
182    ///
183    /// The schema is determined by the writer schema in the file and the reader schema provided to the builder.
184    pub fn schema(&self) -> SchemaRef {
185        self.decoder.schema()
186    }
187
188    /// Calculate the byte range needed to complete the current block.
189    /// Only valid when block_decoder is in Data or Sync state.
190    /// Returns the range to fetch, or an error if EOF would be reached.
191    fn remaining_block_range(&self) -> Result<Range<u64>, AvroError> {
192        let remaining = self.block_decoder.bytes_remaining() as u64
193            + match self.block_decoder.state() {
194                BlockDecoderState::Data => 16, // Include sync marker
195                BlockDecoderState::Sync => 0,
196                state => {
197                    return Err(AvroError::General(format!(
198                        "remaining_block_range called in unexpected state: {state:?}"
199                    )));
200                }
201            };
202
203        let fetch_end = self.range.end + remaining;
204        if fetch_end > self.file_size {
205            return Err(AvroError::EOF(
206                "Avro block requires more bytes than what exists in the file".into(),
207            ));
208        }
209
210        Ok(self.range.end..fetch_end)
211    }
212
213    /// Terminate the stream after returning this error once.
214    #[inline]
215    fn finish_with_error(
216        &mut self,
217        error: AvroError,
218    ) -> Poll<Option<Result<RecordBatch, AvroError>>> {
219        self.reader_state = ReaderState::Finished;
220        Poll::Ready(Some(Err(error)))
221    }
222
223    #[inline]
224    fn start_flushing(&mut self) {
225        self.reader_state = ReaderState::Flushing;
226    }
227
228    /// Drain any remaining buffered records from the decoder.
229    #[inline]
230    fn poll_flush(&mut self) -> Poll<Option<Result<RecordBatch, AvroError>>> {
231        match self.decoder.flush() {
232            Ok(Some(batch)) => {
233                self.reader_state = ReaderState::Flushing;
234                Poll::Ready(Some(Ok(batch)))
235            }
236            Ok(None) => {
237                self.reader_state = ReaderState::Finished;
238                Poll::Ready(None)
239            }
240            Err(e) => self.finish_with_error(e),
241        }
242    }
243}
244
245impl<R: AsyncFileReader + Unpin + 'static> AsyncAvroFileReader<R> {
246    // The forbid question mark thing shouldn't apply here, as it is within the future,
247    // so exported this to a separate function.
248    async fn fetch_bytes(mut reader: R, range: Range<u64>) -> Result<(R, Bytes), AvroError> {
249        let data = reader.get_bytes(range).await?;
250        Ok((reader, data))
251    }
252
253    #[forbid(clippy::question_mark_used)]
254    fn read_next(&mut self, cx: &mut Context<'_>) -> Poll<Option<Result<RecordBatch, AvroError>>> {
255        loop {
256            match mem::replace(&mut self.reader_state, ReaderState::InvalidState) {
257                ReaderState::Idle { reader } => {
258                    let range = self.range.clone();
259                    if range.start >= range.end {
260                        return self.finish_with_error(AvroError::InvalidArgument(format!(
261                            "Invalid range specified for Avro file: start {} >= end {}, file_size: {}",
262                            range.start, range.end, self.file_size
263                        )));
264                    }
265
266                    let future = Self::fetch_bytes(reader, range).boxed();
267                    self.reader_state = ReaderState::FetchingData {
268                        future,
269                        next_behaviour: FetchNextBehaviour::ReadSyncMarker,
270                    };
271                }
272                ReaderState::FetchingData {
273                    mut future,
274                    next_behaviour,
275                } => {
276                    let (reader, data_chunk) = match future.poll_unpin(cx) {
277                        Poll::Ready(Ok(data)) => data,
278                        Poll::Ready(Err(e)) => return self.finish_with_error(e),
279                        Poll::Pending => {
280                            self.reader_state = ReaderState::FetchingData {
281                                future,
282                                next_behaviour,
283                            };
284                            return Poll::Pending;
285                        }
286                    };
287
288                    match next_behaviour {
289                        FetchNextBehaviour::ReadSyncMarker => {
290                            let sync_marker_pos = data_chunk
291                                .windows(16)
292                                .position(|slice| slice == self.sync_marker);
293                            let block_start = match sync_marker_pos {
294                                Some(pos) => pos + 16, // Move past the sync marker
295                                None => {
296                                    // Sync marker not found, valid if we arbitrarily split the file at its end.
297                                    self.reader_state = ReaderState::Finished;
298                                    return Poll::Ready(None);
299                                }
300                            };
301
302                            self.reader_state = ReaderState::DecodingBlock {
303                                reader,
304                                data: data_chunk.slice(block_start..),
305                            };
306                        }
307                        FetchNextBehaviour::DecodeVLQHeader => {
308                            let mut data = data_chunk;
309
310                            // Feed bytes one at a time until we reach Data state (VLQ header complete)
311                            while !matches!(self.block_decoder.state(), BlockDecoderState::Data) {
312                                if data.is_empty() {
313                                    return self.finish_with_error(AvroError::EOF(
314                                        "Unexpected EOF while reading Avro block header".into(),
315                                    ));
316                                }
317                                let consumed = match self.block_decoder.decode(&data[..1]) {
318                                    Ok(consumed) => consumed,
319                                    Err(e) => return self.finish_with_error(e),
320                                };
321                                if consumed == 0 {
322                                    return self.finish_with_error(AvroError::General(
323                                        "BlockDecoder failed to consume byte during VLQ header parsing"
324                                            .into(),
325                                    ));
326                                }
327                                data = data.slice(consumed..);
328                            }
329
330                            // Now we know the block size. Slice remaining data to what we need.
331                            let bytes_remaining = self.block_decoder.bytes_remaining();
332                            let data_to_use = data.slice(..data.len().min(bytes_remaining));
333                            let consumed = match self.block_decoder.decode(&data_to_use) {
334                                Ok(consumed) => consumed,
335                                Err(e) => return self.finish_with_error(e),
336                            };
337                            if consumed != data_to_use.len() {
338                                return self.finish_with_error(AvroError::General(
339                                    "BlockDecoder failed to consume all bytes after VLQ header parsing"
340                                        .into(),
341                                ));
342                            }
343
344                            // May need more data to finish the block.
345                            let range_to_fetch = match self.remaining_block_range() {
346                                Ok(range) if range.is_empty() => {
347                                    // All bytes fetched, move to decoding block directly
348                                    self.reader_state = ReaderState::DecodingBlock {
349                                        reader,
350                                        data: Bytes::new(),
351                                    };
352                                    continue;
353                                }
354                                Ok(range) => range,
355                                Err(e) => return self.finish_with_error(e),
356                            };
357
358                            let future = Self::fetch_bytes(reader, range_to_fetch).boxed();
359                            self.reader_state = ReaderState::FetchingData {
360                                future,
361                                next_behaviour: FetchNextBehaviour::ContinueDecoding,
362                            };
363                            continue;
364                        }
365                        FetchNextBehaviour::ContinueDecoding => {
366                            self.reader_state = ReaderState::DecodingBlock {
367                                reader,
368                                data: data_chunk,
369                            };
370                        }
371                    }
372                }
373                ReaderState::InvalidState => {
374                    return self.finish_with_error(AvroError::General(
375                        "AsyncAvroFileReader in invalid state".into(),
376                    ));
377                }
378                ReaderState::DecodingBlock { reader, mut data } => {
379                    // Try to decode another block from the buffered reader.
380                    let consumed = match self.block_decoder.decode(&data) {
381                        Ok(consumed) => consumed,
382                        Err(e) => return self.finish_with_error(e),
383                    };
384                    data = data.slice(consumed..);
385
386                    // If we reached the end of the block, flush it, and move to read batches.
387                    if let Some(block) = self.block_decoder.flush() {
388                        // Successfully decoded a block.
389                        let block_count = block.count;
390                        let block_data = Bytes::from_owner(if let Some(ref codec) = self.codec {
391                            match codec.decompress(&block.data) {
392                                Ok(decompressed) => decompressed,
393                                Err(e) => return self.finish_with_error(e),
394                            }
395                        } else {
396                            block.data
397                        });
398
399                        // Since we have an active block, move to reading batches
400                        self.reader_state = ReaderState::ReadingBatches {
401                            reader,
402                            data,
403                            block_data,
404                            remaining_in_block: block_count,
405                        };
406                        continue;
407                    }
408
409                    // data should always be consumed unless Finished, if it wasn't, something went wrong
410                    if !data.is_empty() {
411                        return self.finish_with_error(AvroError::General(
412                            "BlockDecoder failed to make progress decoding Avro block".into(),
413                        ));
414                    }
415
416                    if matches!(self.block_decoder.state(), BlockDecoderState::Finished) {
417                        // We've already flushed, so if no batch was produced, we are simply done.
418                        self.finishing_partial_block = false;
419                        self.start_flushing();
420                        continue;
421                    }
422
423                    // If we've tried the following stage before, and still can't decode,
424                    // this means the file is truncated or corrupted.
425                    if self.finishing_partial_block {
426                        return self.finish_with_error(AvroError::EOF(
427                            "Unexpected EOF while reading last Avro block".into(),
428                        ));
429                    }
430
431                    // Avro splitting case: block is incomplete, we need to:
432                    // 1. Parse the length so we know how much to read
433                    // 2. Fetch more data from the reader
434                    // 3. Create a new block data from the remaining slice and the newly fetched data
435                    // 4. Continue decoding until end of block
436                    self.finishing_partial_block = true;
437
438                    // Mid-block, but we don't know how many bytes are missing yet
439                    if matches!(
440                        self.block_decoder.state(),
441                        BlockDecoderState::Count | BlockDecoderState::Size
442                    ) {
443                        // Max VLQ header is 20 bytes (10 bytes each for count and size).
444                        // Fetch just enough to complete it.
445                        const MAX_VLQ_HEADER_SIZE: u64 = 20;
446                        let fetch_end = (self.range.end + MAX_VLQ_HEADER_SIZE).min(self.file_size);
447
448                        // If there is nothing more to fetch, error out
449                        if fetch_end == self.range.end {
450                            return self.finish_with_error(AvroError::EOF(
451                                "Unexpected EOF while reading Avro block header".into(),
452                            ));
453                        }
454
455                        let range_to_fetch = self.range.end..fetch_end;
456                        self.range.end = fetch_end; // Track that we've fetched these bytes
457
458                        let future = Self::fetch_bytes(reader, range_to_fetch).boxed();
459                        self.reader_state = ReaderState::FetchingData {
460                            future,
461                            next_behaviour: FetchNextBehaviour::DecodeVLQHeader,
462                        };
463                        continue;
464                    }
465
466                    // Otherwise, we're mid-block but know how many bytes are remaining to fetch.
467                    let range_to_fetch = match self.remaining_block_range() {
468                        Ok(range) => range,
469                        Err(e) => return self.finish_with_error(e),
470                    };
471
472                    let future = Self::fetch_bytes(reader, range_to_fetch).boxed();
473                    self.reader_state = ReaderState::FetchingData {
474                        future,
475                        next_behaviour: FetchNextBehaviour::ContinueDecoding,
476                    };
477                    continue;
478                }
479                ReaderState::ReadingBatches {
480                    reader,
481                    data,
482                    mut block_data,
483                    mut remaining_in_block,
484                } => {
485                    let (consumed, records_decoded) =
486                        match self.decoder.decode_block(&block_data, remaining_in_block) {
487                            Ok((consumed, records_decoded)) => (consumed, records_decoded),
488                            Err(e) => return self.finish_with_error(e),
489                        };
490
491                    remaining_in_block -= records_decoded;
492
493                    if remaining_in_block == 0 {
494                        if data.is_empty() {
495                            // No more data to read, drain remaining buffered records
496                            self.start_flushing();
497                        } else {
498                            // Finished this block, move to decode next block in the next iteration
499                            self.reader_state = ReaderState::DecodingBlock { reader, data };
500                        }
501                    } else {
502                        // Still more records to decode in this block, slice the already-read data and stay in this state
503                        block_data = block_data.slice(consumed..);
504                        self.reader_state = ReaderState::ReadingBatches {
505                            reader,
506                            data,
507                            block_data,
508                            remaining_in_block,
509                        };
510                    }
511
512                    // We have a full batch ready, emit it
513                    // (This is not mutually exclusive with the block being finished, so the state change is valid)
514                    if self.decoder.batch_is_full() {
515                        return match self.decoder.flush() {
516                            Ok(Some(batch)) => Poll::Ready(Some(Ok(batch))),
517                            Ok(None) => self.finish_with_error(AvroError::General(
518                                "Decoder reported a full batch, but flush returned None".into(),
519                            )),
520                            Err(e) => self.finish_with_error(e),
521                        };
522                    }
523                }
524                ReaderState::Flushing => {
525                    return self.poll_flush();
526                }
527                ReaderState::Finished => {
528                    // Terminal: once finished (including after an error), always yield None
529                    self.reader_state = ReaderState::Finished;
530                    return Poll::Ready(None);
531                }
532            }
533        }
534    }
535}
536
537// To maintain compatibility with the expected stream results in the ecosystem, this returns ArrowError.
538impl<R: AsyncFileReader + Unpin + 'static> Stream for AsyncAvroFileReader<R> {
539    type Item = Result<RecordBatch, ArrowError>;
540
541    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
542        self.read_next(cx).map_err(Into::into)
543    }
544}
545
546#[cfg(all(test, feature = "object_store"))]
547mod tests {
548    use super::*;
549    use crate::codec::Tz;
550    use crate::schema::{
551        AVRO_NAME_METADATA_KEY, AVRO_NAMESPACE_METADATA_KEY, AvroSchema, SCHEMA_METADATA_KEY,
552    };
553    use arrow_array::cast::AsArray;
554    use arrow_array::types::{Int32Type, Int64Type};
555    use arrow_array::*;
556    use arrow_schema::{DataType, Field, Schema, SchemaRef, TimeUnit};
557    use futures::{StreamExt, TryStreamExt};
558    use object_store::local::LocalFileSystem;
559    use object_store::path::Path;
560    use object_store::{ObjectStore, ObjectStoreExt};
561    use std::collections::HashMap;
562    use std::sync::Arc;
563
564    fn arrow_test_data(file: &str) -> String {
565        let base =
566            std::env::var("ARROW_TEST_DATA").unwrap_or_else(|_| "../testing/data".to_string());
567        format!("{}/{}", base, file)
568    }
569
570    fn get_alltypes_schema() -> SchemaRef {
571        get_alltypes_schema_with_tz("+00:00")
572    }
573
574    fn get_alltypes_schema_with_tz(tz_id: &str) -> SchemaRef {
575        let schema = Schema::new(vec![
576            Field::new("id", DataType::Int32, true),
577            Field::new("bool_col", DataType::Boolean, true),
578            Field::new("tinyint_col", DataType::Int32, true),
579            Field::new("smallint_col", DataType::Int32, true),
580            Field::new("int_col", DataType::Int32, true),
581            Field::new("bigint_col", DataType::Int64, true),
582            Field::new("float_col", DataType::Float32, true),
583            Field::new("double_col", DataType::Float64, true),
584            Field::new("date_string_col", DataType::Binary, true),
585            Field::new("string_col", DataType::Binary, true),
586            Field::new(
587                "timestamp_col",
588                DataType::Timestamp(TimeUnit::Microsecond, Some(tz_id.into())),
589                true,
590            ),
591        ])
592        .with_metadata(HashMap::from([(
593            SCHEMA_METADATA_KEY.into(),
594            r#"{
595    "type": "record",
596    "name": "topLevelRecord",
597    "fields": [
598        {
599            "name": "id",
600            "type": [
601                "int",
602                "null"
603            ]
604        },
605        {
606            "name": "bool_col",
607            "type": [
608                "boolean",
609                "null"
610            ]
611        },
612        {
613            "name": "tinyint_col",
614            "type": [
615                "int",
616                "null"
617            ]
618        },
619        {
620            "name": "smallint_col",
621            "type": [
622                "int",
623                "null"
624            ]
625        },
626        {
627            "name": "int_col",
628            "type": [
629                "int",
630                "null"
631            ]
632        },
633        {
634            "name": "bigint_col",
635            "type": [
636                "long",
637                "null"
638            ]
639        },
640        {
641            "name": "float_col",
642            "type": [
643                "float",
644                "null"
645            ]
646        },
647        {
648            "name": "double_col",
649            "type": [
650                "double",
651                "null"
652            ]
653        },
654        {
655            "name": "date_string_col",
656            "type": [
657                "bytes",
658                "null"
659            ]
660        },
661        {
662            "name": "string_col",
663            "type": [
664                "bytes",
665                "null"
666            ]
667        },
668        {
669            "name": "timestamp_col",
670            "type": [
671                {
672                    "type": "long",
673                    "logicalType": "timestamp-micros"
674                },
675                "null"
676            ]
677        }
678    ]
679}
680"#
681            .into(),
682        )]));
683        Arc::new(schema)
684    }
685
686    fn get_alltypes_with_nulls_schema() -> SchemaRef {
687        let schema = Schema::new(vec![
688            Field::new("string_col", DataType::Binary, true),
689            Field::new("int_col", DataType::Int32, true),
690            Field::new("bool_col", DataType::Boolean, true),
691            Field::new("bigint_col", DataType::Int64, true),
692            Field::new("float_col", DataType::Float32, true),
693            Field::new("double_col", DataType::Float64, true),
694            Field::new("bytes_col", DataType::Binary, true),
695        ])
696        .with_metadata(HashMap::from([(
697            SCHEMA_METADATA_KEY.into(),
698            r#"{
699    "type": "record",
700    "name": "topLevelRecord",
701    "fields": [
702        {
703            "name": "string_col",
704            "type": [
705                "null",
706                "string"
707            ],
708            "default": null
709        },
710        {
711            "name": "int_col",
712            "type": [
713                "null",
714                "int"
715            ],
716            "default": null
717        },
718        {
719            "name": "bool_col",
720            "type": [
721                "null",
722                "boolean"
723            ],
724            "default": null
725        },
726        {
727            "name": "bigint_col",
728            "type": [
729                "null",
730                "long"
731            ],
732            "default": null
733        },
734        {
735            "name": "float_col",
736            "type": [
737                "null",
738                "float"
739            ],
740            "default": null
741        },
742        {
743            "name": "double_col",
744            "type": [
745                "null",
746                "double"
747            ],
748            "default": null
749        },
750        {
751            "name": "bytes_col",
752            "type": [
753                "null",
754                "bytes"
755            ],
756            "default": null
757        }
758    ]
759}"#
760            .into(),
761        )]));
762
763        Arc::new(schema)
764    }
765
766    fn get_nested_records_schema() -> SchemaRef {
767        let schema = Schema::new(vec![
768            Field::new(
769                "f1",
770                DataType::Struct(
771                    vec![
772                        Field::new("f1_1", DataType::Utf8, false),
773                        Field::new("f1_2", DataType::Int32, false),
774                        Field::new(
775                            "f1_3",
776                            DataType::Struct(
777                                vec![Field::new("f1_3_1", DataType::Float64, false)].into(),
778                            ),
779                            false,
780                        )
781                        .with_metadata(HashMap::from([
782                            (AVRO_NAMESPACE_METADATA_KEY.to_owned(), "ns3".to_owned()),
783                            (AVRO_NAME_METADATA_KEY.to_owned(), "record3".to_owned()),
784                        ])),
785                    ]
786                    .into(),
787                ),
788                false,
789            )
790            .with_metadata(HashMap::from([
791                (AVRO_NAMESPACE_METADATA_KEY.to_owned(), "ns2".to_owned()),
792                (AVRO_NAME_METADATA_KEY.to_owned(), "record2".to_owned()),
793            ])),
794            Field::new(
795                "f2",
796                DataType::List(Arc::new(
797                    Field::new(
798                        "item",
799                        DataType::Struct(
800                            vec![
801                                Field::new("f2_1", DataType::Boolean, false),
802                                Field::new("f2_2", DataType::Float32, false),
803                            ]
804                            .into(),
805                        ),
806                        false,
807                    )
808                    .with_metadata(HashMap::from([
809                        (AVRO_NAMESPACE_METADATA_KEY.to_owned(), "ns4".to_owned()),
810                        (AVRO_NAME_METADATA_KEY.to_owned(), "record4".to_owned()),
811                    ])),
812                )),
813                false,
814            ),
815            Field::new(
816                "f3",
817                DataType::Struct(vec![Field::new("f3_1", DataType::Utf8, false)].into()),
818                true,
819            )
820            .with_metadata(HashMap::from([
821                (AVRO_NAMESPACE_METADATA_KEY.to_owned(), "ns5".to_owned()),
822                (AVRO_NAME_METADATA_KEY.to_owned(), "record5".to_owned()),
823            ])),
824            Field::new(
825                "f4",
826                DataType::List(Arc::new(
827                    Field::new(
828                        "item",
829                        DataType::Struct(vec![Field::new("f4_1", DataType::Int64, false)].into()),
830                        true,
831                    )
832                    .with_metadata(HashMap::from([
833                        (AVRO_NAMESPACE_METADATA_KEY.to_owned(), "ns6".to_owned()),
834                        (AVRO_NAME_METADATA_KEY.to_owned(), "record6".to_owned()),
835                    ])),
836                )),
837                false,
838            ),
839        ])
840        .with_metadata(HashMap::from([(
841            SCHEMA_METADATA_KEY.into(),
842            r#"{
843    "type": "record",
844    "namespace": "ns1",
845    "name": "record1",
846    "fields": [
847        {
848            "name": "f1",
849            "type": {
850                "type": "record",
851                "namespace": "ns2",
852                "name": "record2",
853                "fields": [
854                    {
855                        "name": "f1_1",
856                        "type": "string"
857                    },
858                    {
859                        "name": "f1_2",
860                        "type": "int"
861                    },
862                    {
863                        "name": "f1_3",
864                        "type": {
865                            "type": "record",
866                            "namespace": "ns3",
867                            "name": "record3",
868                            "fields": [
869                                {
870                                    "name": "f1_3_1",
871                                    "type": "double"
872                                }
873                            ]
874                        }
875                    }
876                ]
877            }
878        },
879        {
880            "name": "f2",
881            "type": {
882                "type": "array",
883                "items": {
884                    "type": "record",
885                    "namespace": "ns4",
886                    "name": "record4",
887                    "fields": [
888                        {
889                            "name": "f2_1",
890                            "type": "boolean"
891                        },
892                        {
893                            "name": "f2_2",
894                            "type": "float"
895                        }
896                    ]
897                }
898            }
899        },
900        {
901            "name": "f3",
902            "type": [
903                "null",
904                {
905                    "type": "record",
906                    "namespace": "ns5",
907                    "name": "record5",
908                    "fields": [
909                        {
910                            "name": "f3_1",
911                            "type": "string"
912                        }
913                    ]
914                }
915            ],
916            "default": null
917        },
918        {
919            "name": "f4",
920            "type": {
921                "type": "array",
922                "items": [
923                    "null",
924                    {
925                        "type": "record",
926                        "namespace": "ns6",
927                        "name": "record6",
928                        "fields": [
929                            {
930                                "name": "f4_1",
931                                "type": "long"
932                            }
933                        ]
934                    }
935                ]
936            }
937        }
938    ]
939}
940"#
941            .into(),
942        )]));
943
944        Arc::new(schema)
945    }
946
947    async fn read_async_file(
948        path: &str,
949        batch_size: usize,
950        range: Option<Range<u64>>,
951        schema: Option<SchemaRef>,
952        projection: Option<Vec<usize>>,
953    ) -> Result<Vec<RecordBatch>, ArrowError> {
954        let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
955        let location = Path::from_filesystem_path(path).unwrap();
956
957        let file_size = store.head(&location).await.unwrap().size;
958
959        let file_reader = AvroObjectReader::new(store, location);
960        let mut builder = AsyncAvroFileReader::builder(file_reader, file_size, batch_size);
961
962        if let Some(s) = schema {
963            let reader_schema = AvroSchema::try_from(s.as_ref())?;
964            builder = builder.with_reader_schema(reader_schema);
965        }
966
967        if let Some(proj) = projection {
968            builder = builder.with_projection(proj);
969        }
970
971        if let Some(range) = range {
972            builder = builder.with_range(range);
973        }
974
975        let reader = builder.try_build().await?;
976        reader.try_collect().await
977    }
978
979    #[tokio::test]
980    async fn test_full_file_read() {
981        let file = arrow_test_data("avro/alltypes_plain.avro");
982        let schema = get_alltypes_schema();
983        let batches = read_async_file(&file, 1024, None, Some(schema), None)
984            .await
985            .unwrap();
986        let batch = &batches[0];
987
988        assert_eq!(batch.num_rows(), 8);
989        assert_eq!(batch.num_columns(), 11);
990
991        let id_array = batch
992            .column(0)
993            .as_any()
994            .downcast_ref::<Int32Array>()
995            .unwrap();
996        assert_eq!(id_array.value(0), 4);
997        assert_eq!(id_array.value(7), 1);
998    }
999
1000    #[tokio::test]
1001    async fn test_small_batch_size() {
1002        let file = arrow_test_data("avro/alltypes_plain.avro");
1003        let schema = get_alltypes_schema();
1004        let batches = read_async_file(&file, 2, None, Some(schema), None)
1005            .await
1006            .unwrap();
1007        assert_eq!(batches.len(), 4);
1008
1009        let batch = &batches[0];
1010
1011        assert_eq!(batch.num_rows(), 2);
1012        assert_eq!(batch.num_columns(), 11);
1013    }
1014
1015    #[tokio::test]
1016    async fn test_batch_size_one() {
1017        let file = arrow_test_data("avro/alltypes_plain.avro");
1018        let schema = get_alltypes_schema();
1019        let batches = read_async_file(&file, 1, None, Some(schema), None)
1020            .await
1021            .unwrap();
1022        let batch = &batches[0];
1023
1024        assert_eq!(batches.len(), 8);
1025        assert_eq!(batch.num_rows(), 1);
1026    }
1027
1028    #[tokio::test]
1029    async fn test_batch_size_larger_than_file() {
1030        let file = arrow_test_data("avro/alltypes_plain.avro");
1031        let schema = get_alltypes_schema();
1032        let batches = read_async_file(&file, 10000, None, Some(schema), None)
1033            .await
1034            .unwrap();
1035        let batch = &batches[0];
1036
1037        assert_eq!(batch.num_rows(), 8);
1038    }
1039
1040    #[tokio::test]
1041    async fn test_empty_range() {
1042        let file = arrow_test_data("avro/alltypes_plain.avro");
1043        let range = 100..100;
1044        let schema = get_alltypes_schema();
1045        let batches = read_async_file(&file, 1024, Some(range), Some(schema), None)
1046            .await
1047            .unwrap();
1048        assert_eq!(batches.len(), 0);
1049    }
1050
1051    #[tokio::test]
1052    async fn test_range_starting_at_zero() {
1053        // Tests that range starting at 0 correctly skips header
1054        let file = arrow_test_data("avro/alltypes_plain.avro");
1055        let store = Arc::new(LocalFileSystem::new());
1056        let location = Path::from_filesystem_path(&file).unwrap();
1057        let meta = store.head(&location).await.unwrap();
1058
1059        let range = 0..meta.size;
1060        let schema = get_alltypes_schema();
1061        let batches = read_async_file(&file, 1024, Some(range), Some(schema), None)
1062            .await
1063            .unwrap();
1064        let batch = &batches[0];
1065
1066        assert_eq!(batch.num_rows(), 8);
1067    }
1068
1069    #[tokio::test]
1070    async fn test_range_after_header() {
1071        let file = arrow_test_data("avro/alltypes_plain.avro");
1072        let store = Arc::new(LocalFileSystem::new());
1073        let location = Path::from_filesystem_path(&file).unwrap();
1074        let meta = store.head(&location).await.unwrap();
1075
1076        let range = 100..meta.size;
1077        let schema = get_alltypes_schema();
1078        let batches = read_async_file(&file, 1024, Some(range), Some(schema), None)
1079            .await
1080            .unwrap();
1081        let batch = &batches[0];
1082
1083        assert!(batch.num_rows() > 0);
1084    }
1085
1086    #[tokio::test]
1087    async fn test_range_no_sync_marker() {
1088        // Small range unlikely to contain sync marker
1089        let file = arrow_test_data("avro/alltypes_plain.avro");
1090        let range = 50..150;
1091        let schema = get_alltypes_schema();
1092        let batches = read_async_file(&file, 1024, Some(range), Some(schema), None)
1093            .await
1094            .unwrap();
1095        assert_eq!(batches.len(), 0);
1096    }
1097
1098    #[tokio::test]
1099    async fn test_range_starting_mid_file() {
1100        let file = arrow_test_data("avro/alltypes_plain.avro");
1101
1102        let range = 700..768; // Header ends at 675, so this should be mid-block
1103        let schema = get_alltypes_schema();
1104        let batches = read_async_file(&file, 1024, Some(range), Some(schema), None)
1105            .await
1106            .unwrap();
1107        assert_eq!(batches.len(), 0);
1108    }
1109
1110    #[tokio::test]
1111    async fn test_range_ending_at_file_size() {
1112        let file = arrow_test_data("avro/alltypes_plain.avro");
1113        let store = Arc::new(LocalFileSystem::new());
1114        let location = Path::from_filesystem_path(&file).unwrap();
1115        let meta = store.head(&location).await.unwrap();
1116
1117        let range = 200..meta.size;
1118        let schema = get_alltypes_schema();
1119        let batches = read_async_file(&file, 1024, Some(range), Some(schema), None)
1120            .await
1121            .unwrap();
1122        let batch = &batches[0];
1123
1124        assert_eq!(batch.num_rows(), 8);
1125    }
1126
1127    #[tokio::test]
1128    async fn test_incomplete_block_requires_fetch() {
1129        // Range ends mid-block, should trigger fetching_rem_block logic
1130        let file = arrow_test_data("avro/alltypes_plain.avro");
1131        let range = 0..1200;
1132        let schema = get_alltypes_schema();
1133        let batches = read_async_file(&file, 1024, Some(range), Some(schema), None)
1134            .await
1135            .unwrap();
1136        let batch = &batches[0];
1137
1138        assert_eq!(batch.num_rows(), 8)
1139    }
1140
1141    #[tokio::test]
1142    async fn test_partial_vlq_header_requires_fetch() {
1143        // Range ends mid-VLQ header, triggering the Count|Size partial fetch logic.
1144        let file = arrow_test_data("avro/alltypes_plain.avro");
1145        let range = 16..676; // Header should end at 675
1146        let schema = get_alltypes_schema();
1147        let batches = read_async_file(&file, 1024, Some(range), Some(schema), None)
1148            .await
1149            .unwrap();
1150        let batch = &batches[0];
1151
1152        assert_eq!(batch.num_rows(), 8)
1153    }
1154
1155    #[cfg(feature = "snappy")]
1156    #[tokio::test]
1157    async fn test_snappy_compressed_with_range() {
1158        {
1159            let file = arrow_test_data("avro/alltypes_plain.snappy.avro");
1160            let store = Arc::new(LocalFileSystem::new());
1161            let location = Path::from_filesystem_path(&file).unwrap();
1162            let meta = store.head(&location).await.unwrap();
1163
1164            let range = 200..meta.size;
1165            let schema = get_alltypes_schema();
1166            let batches = read_async_file(&file, 1024, Some(range), Some(schema), None)
1167                .await
1168                .unwrap();
1169            let batch = &batches[0];
1170
1171            assert!(batch.num_rows() > 0);
1172        }
1173    }
1174
1175    #[tokio::test]
1176    async fn test_nulls() {
1177        let file = arrow_test_data("avro/alltypes_nulls_plain.avro");
1178        let schema = get_alltypes_with_nulls_schema();
1179        let batches = read_async_file(&file, 1024, None, Some(schema), None)
1180            .await
1181            .unwrap();
1182        let batch = &batches[0];
1183
1184        assert_eq!(batch.num_rows(), 1);
1185        for col in batch.columns() {
1186            assert!(col.is_null(0));
1187        }
1188    }
1189
1190    #[tokio::test]
1191    async fn test_nested_records() {
1192        let file = arrow_test_data("avro/nested_records.avro");
1193        let schema = get_nested_records_schema();
1194        let batches = read_async_file(&file, 1024, None, Some(schema), None)
1195            .await
1196            .unwrap();
1197        let batch = &batches[0];
1198
1199        assert_eq!(batch.num_rows(), 2);
1200        assert!(batch.num_columns() > 0);
1201    }
1202
1203    #[tokio::test]
1204    async fn test_stream_produces_multiple_batches() {
1205        let file = arrow_test_data("avro/alltypes_plain.avro");
1206        let store = Arc::new(LocalFileSystem::new());
1207        let location = Path::from_filesystem_path(&file).unwrap();
1208
1209        let file_size = store.head(&location).await.unwrap().size;
1210
1211        let file_reader = AvroObjectReader::new(store, location);
1212        let schema = get_alltypes_schema();
1213        let reader_schema = AvroSchema::try_from(schema.as_ref()).unwrap();
1214        let reader = AsyncAvroFileReader::builder(
1215            file_reader,
1216            file_size,
1217            2, // Small batch size to force multiple batches
1218        )
1219        .with_reader_schema(reader_schema)
1220        .try_build()
1221        .await
1222        .unwrap();
1223
1224        let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
1225
1226        assert!(batches.len() > 1);
1227        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
1228        assert_eq!(total_rows, 8);
1229    }
1230
1231    #[tokio::test]
1232    async fn test_stream_early_termination() {
1233        let file = arrow_test_data("avro/alltypes_plain.avro");
1234        let store = Arc::new(LocalFileSystem::new());
1235        let location = Path::from_filesystem_path(&file).unwrap();
1236
1237        let file_size = store.head(&location).await.unwrap().size;
1238
1239        let file_reader = AvroObjectReader::new(store, location);
1240        let schema = get_alltypes_schema();
1241        let reader_schema = AvroSchema::try_from(schema.as_ref()).unwrap();
1242        let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1)
1243            .with_reader_schema(reader_schema)
1244            .try_build()
1245            .await
1246            .unwrap();
1247
1248        let first_batch = reader.take(1).try_collect::<Vec<_>>().await.unwrap();
1249
1250        assert_eq!(first_batch.len(), 1);
1251        assert!(first_batch[0].num_rows() > 0);
1252    }
1253
1254    #[tokio::test]
1255    async fn test_various_batch_sizes() {
1256        let file = arrow_test_data("avro/alltypes_plain.avro");
1257
1258        for batch_size in [1, 2, 3, 5, 7, 11, 100] {
1259            let schema = get_alltypes_schema();
1260            let batches = read_async_file(&file, batch_size, None, Some(schema), None)
1261                .await
1262                .unwrap();
1263            let batch = &batches[0];
1264
1265            // Size should be what was provided, to the limit of the batch in the file
1266            assert_eq!(
1267                batch.num_rows(),
1268                batch_size.min(8),
1269                "Failed with batch_size={}",
1270                batch_size
1271            );
1272        }
1273    }
1274
1275    #[tokio::test]
1276    async fn test_range_larger_than_file() {
1277        let file = arrow_test_data("avro/alltypes_plain.avro");
1278        let store = Arc::new(LocalFileSystem::new());
1279        let location = Path::from_filesystem_path(&file).unwrap();
1280        let meta = store.head(&location).await.unwrap();
1281
1282        // Range extends beyond file size
1283        let range = 100..(meta.size + 1000);
1284        let schema = get_alltypes_schema();
1285        let batches = read_async_file(&file, 1024, Some(range), Some(schema), None)
1286            .await
1287            .unwrap();
1288        let batch = &batches[0];
1289
1290        // Should clamp to file size
1291        assert_eq!(batch.num_rows(), 8);
1292    }
1293
1294    #[tokio::test]
1295    async fn test_builder_with_header_info() {
1296        let file = arrow_test_data("avro/alltypes_plain.avro");
1297        let store = Arc::new(LocalFileSystem::new());
1298        let location = Path::from_filesystem_path(&file).unwrap();
1299
1300        let file_size = store.head(&location).await.unwrap().size;
1301
1302        let mut file_reader = AvroObjectReader::new(store, location);
1303
1304        let header_info = read_header_info(&mut file_reader, file_size, None)
1305            .await
1306            .unwrap();
1307
1308        assert_eq!(header_info.header_len(), 675);
1309
1310        let writer_schema = header_info.writer_schema().unwrap();
1311        let expected_avro_json: serde_json::Value = serde_json::from_str(
1312            get_alltypes_schema()
1313                .metadata()
1314                .get(SCHEMA_METADATA_KEY)
1315                .unwrap(),
1316        )
1317        .unwrap();
1318        let actual_avro_json: serde_json::Value =
1319            serde_json::from_str(&writer_schema.json_string).unwrap();
1320        assert_eq!(actual_avro_json, expected_avro_json);
1321
1322        let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024)
1323            .build_with_header(header_info)
1324            .unwrap();
1325
1326        let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
1327
1328        let batch = &batches[0];
1329        assert_eq!(batch.num_rows(), 8)
1330    }
1331
1332    #[tokio::test]
1333    async fn test_roundtrip_write_then_async_read() {
1334        use crate::writer::AvroWriter;
1335        use arrow_array::{Float64Array, StringArray};
1336        use std::fs::File;
1337        use std::io::BufWriter;
1338        use tempfile::tempdir;
1339
1340        // Schema with nullable and non-nullable fields of various types
1341        let schema = Arc::new(Schema::new(vec![
1342            Field::new("id", DataType::Int32, false),
1343            Field::new("name", DataType::Utf8, true),
1344            Field::new("score", DataType::Float64, true),
1345            Field::new("count", DataType::Int64, false),
1346        ]));
1347
1348        let dir = tempdir().unwrap();
1349        let file_path = dir.path().join("roundtrip_test.avro");
1350
1351        // Write multiple batches with nulls
1352        {
1353            let file = File::create(&file_path).unwrap();
1354            let writer = BufWriter::new(file);
1355            let mut avro_writer = AvroWriter::new(writer, schema.as_ref().clone()).unwrap();
1356
1357            // First batch: 3 rows with some nulls
1358            let batch1 = RecordBatch::try_new(
1359                schema.clone(),
1360                vec![
1361                    Arc::new(Int32Array::from(vec![1, 2, 3])),
1362                    Arc::new(StringArray::from(vec![
1363                        Some("alice"),
1364                        None,
1365                        Some("charlie"),
1366                    ])),
1367                    Arc::new(Float64Array::from(vec![Some(95.5), Some(87.3), None])),
1368                    Arc::new(Int64Array::from(vec![10, 20, 30])),
1369                ],
1370            )
1371            .unwrap();
1372            avro_writer.write(&batch1).unwrap();
1373
1374            // Second batch: 2 rows
1375            let batch2 = RecordBatch::try_new(
1376                schema.clone(),
1377                vec![
1378                    Arc::new(Int32Array::from(vec![4, 5])),
1379                    Arc::new(StringArray::from(vec![Some("diana"), Some("eve")])),
1380                    Arc::new(Float64Array::from(vec![None, Some(88.0)])),
1381                    Arc::new(Int64Array::from(vec![40, 50])),
1382                ],
1383            )
1384            .unwrap();
1385            avro_writer.write(&batch2).unwrap();
1386
1387            avro_writer.finish().unwrap();
1388        }
1389
1390        // Read back with small batch size to produce multiple output batches
1391        let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
1392        let location = Path::from_filesystem_path(&file_path).unwrap();
1393        let file_size = store.head(&location).await.unwrap().size;
1394
1395        let file_reader = AvroObjectReader::new(store, location);
1396        let reader = AsyncAvroFileReader::builder(file_reader, file_size, 2)
1397            .try_build()
1398            .await
1399            .unwrap();
1400
1401        let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
1402
1403        // Verify we got multiple output batches due to small batch_size
1404        assert!(
1405            batches.len() > 1,
1406            "Expected multiple batches with batch_size=2"
1407        );
1408
1409        // Verify total row count
1410        let total_rows: usize = batches.iter().map(|b| b.num_rows()).sum();
1411        assert_eq!(total_rows, 5);
1412
1413        // Concatenate all batches to verify data
1414        let combined = arrow::compute::concat_batches(&batches[0].schema(), &batches).unwrap();
1415        assert_eq!(combined.num_rows(), 5);
1416        assert_eq!(combined.num_columns(), 4);
1417
1418        // Check id column (non-nullable)
1419        let id_array = combined
1420            .column(0)
1421            .as_any()
1422            .downcast_ref::<Int32Array>()
1423            .unwrap();
1424        assert_eq!(id_array.values(), &[1, 2, 3, 4, 5]);
1425
1426        // Check name column (nullable) - verify nulls are preserved
1427        // Avro strings are read as Binary by default
1428        let name_col = combined.column(1);
1429        let name_array = name_col.as_string::<i32>();
1430        assert_eq!(name_array.value(0), "alice");
1431        assert!(name_col.is_null(1)); // second row has null name
1432        assert_eq!(name_array.value(2), "charlie");
1433
1434        // Check score column (nullable) - verify nulls are preserved
1435        let score_array = combined
1436            .column(2)
1437            .as_any()
1438            .downcast_ref::<Float64Array>()
1439            .unwrap();
1440        assert!(!score_array.is_null(0));
1441        assert!((score_array.value(0) - 95.5).abs() < f64::EPSILON);
1442        assert!(score_array.is_null(2)); // third row has null score
1443        assert!(score_array.is_null(3)); // fourth row has null score
1444        assert!(!score_array.is_null(4));
1445        assert!((score_array.value(4) - 88.0).abs() < f64::EPSILON);
1446
1447        // Check count column (non-nullable)
1448        let count_array = combined
1449            .column(3)
1450            .as_any()
1451            .downcast_ref::<Int64Array>()
1452            .unwrap();
1453        assert_eq!(count_array.values(), &[10, 20, 30, 40, 50]);
1454    }
1455
1456    #[tokio::test]
1457    async fn test_alltypes_no_schema_no_projection() {
1458        // No reader schema, no projection - uses writer schema from file
1459        let file = arrow_test_data("avro/alltypes_plain.avro");
1460        let batches = read_async_file(&file, 1024, None, None, None)
1461            .await
1462            .unwrap();
1463        let batch = &batches[0];
1464
1465        assert_eq!(batch.num_rows(), 8);
1466        assert_eq!(batch.num_columns(), 11);
1467        assert_eq!(batch.schema().field(0).name(), "id");
1468    }
1469
1470    #[tokio::test]
1471    async fn test_alltypes_no_schema_with_projection() {
1472        // No reader schema, with projection - project writer schema
1473        let file = arrow_test_data("avro/alltypes_plain.avro");
1474        // Project [tinyint_col, id, bigint_col] = indices [2, 0, 5]
1475        let batches = read_async_file(&file, 1024, None, None, Some(vec![2, 0, 5]))
1476            .await
1477            .unwrap();
1478        let batch = &batches[0];
1479
1480        assert_eq!(batch.num_rows(), 8);
1481        assert_eq!(batch.num_columns(), 3);
1482        assert_eq!(batch.schema().field(0).name(), "tinyint_col");
1483        assert_eq!(batch.schema().field(1).name(), "id");
1484        assert_eq!(batch.schema().field(2).name(), "bigint_col");
1485
1486        // Verify data values
1487        let tinyint_col = batch.column(0).as_primitive::<Int32Type>();
1488        assert_eq!(tinyint_col.values(), &[0, 1, 0, 1, 0, 1, 0, 1]);
1489
1490        let id = batch.column(1).as_primitive::<Int32Type>();
1491        assert_eq!(id.values(), &[4, 5, 6, 7, 2, 3, 0, 1]);
1492
1493        let bigint_col = batch.column(2).as_primitive::<Int64Type>();
1494        assert_eq!(bigint_col.values(), &[0, 10, 0, 10, 0, 10, 0, 10]);
1495    }
1496
1497    #[tokio::test]
1498    async fn test_alltypes_with_schema_no_projection() {
1499        // With reader schema, no projection
1500        let file = arrow_test_data("avro/alltypes_plain.avro");
1501        let schema = get_alltypes_schema();
1502        let batches = read_async_file(&file, 1024, None, Some(schema), None)
1503            .await
1504            .unwrap();
1505        let batch = &batches[0];
1506
1507        assert_eq!(batch.num_rows(), 8);
1508        assert_eq!(batch.num_columns(), 11);
1509    }
1510
1511    #[tokio::test]
1512    async fn test_alltypes_with_schema_with_projection() {
1513        // With reader schema, with projection
1514        let file = arrow_test_data("avro/alltypes_plain.avro");
1515        let schema = get_alltypes_schema();
1516        // Project [bool_col, id] = indices [1, 0]
1517        let batches = read_async_file(&file, 1024, None, Some(schema), Some(vec![1, 0]))
1518            .await
1519            .unwrap();
1520        let batch = &batches[0];
1521
1522        assert_eq!(batch.num_rows(), 8);
1523        assert_eq!(batch.num_columns(), 2);
1524        assert_eq!(batch.schema().field(0).name(), "bool_col");
1525        assert_eq!(batch.schema().field(1).name(), "id");
1526
1527        let bool_col = batch.column(0).as_boolean();
1528        assert!(bool_col.value(0));
1529        assert!(!bool_col.value(1));
1530
1531        let id = batch.column(1).as_primitive::<Int32Type>();
1532        assert_eq!(id.values(), &[4, 5, 6, 7, 2, 3, 0, 1]);
1533    }
1534
1535    #[tokio::test]
1536    async fn test_nested_no_schema_no_projection() {
1537        // No reader schema, no projection
1538        let file = arrow_test_data("avro/nested_records.avro");
1539        let batches = read_async_file(&file, 1024, None, None, None)
1540            .await
1541            .unwrap();
1542        let batch = &batches[0];
1543
1544        assert_eq!(batch.num_rows(), 2);
1545        assert_eq!(batch.num_columns(), 4);
1546        assert_eq!(batch.schema().field(0).name(), "f1");
1547        assert_eq!(batch.schema().field(1).name(), "f2");
1548        assert_eq!(batch.schema().field(2).name(), "f3");
1549        assert_eq!(batch.schema().field(3).name(), "f4");
1550    }
1551
1552    #[tokio::test]
1553    async fn test_nested_no_schema_with_projection() {
1554        // No reader schema, with projection - reorder nested fields
1555        let file = arrow_test_data("avro/nested_records.avro");
1556        // Project [f3, f1] = indices [2, 0]
1557        let batches = read_async_file(&file, 1024, None, None, Some(vec![2, 0]))
1558            .await
1559            .unwrap();
1560        let batch = &batches[0];
1561
1562        assert_eq!(batch.num_rows(), 2);
1563        assert_eq!(batch.num_columns(), 2);
1564        assert_eq!(batch.schema().field(0).name(), "f3");
1565        assert_eq!(batch.schema().field(1).name(), "f1");
1566    }
1567
1568    #[tokio::test]
1569    async fn test_nested_with_schema_no_projection() {
1570        // With reader schema, no projection
1571        let file = arrow_test_data("avro/nested_records.avro");
1572        let schema = get_nested_records_schema();
1573        let batches = read_async_file(&file, 1024, None, Some(schema), None)
1574            .await
1575            .unwrap();
1576        let batch = &batches[0];
1577
1578        assert_eq!(batch.num_rows(), 2);
1579        assert_eq!(batch.num_columns(), 4);
1580    }
1581
1582    #[tokio::test]
1583    async fn test_nested_with_schema_with_projection() {
1584        // With reader schema, with projection
1585        let file = arrow_test_data("avro/nested_records.avro");
1586        let schema = get_nested_records_schema();
1587        // Project [f4, f2, f1] = indices [3, 1, 0]
1588        let batches = read_async_file(&file, 1024, None, Some(schema), Some(vec![3, 1, 0]))
1589            .await
1590            .unwrap();
1591        let batch = &batches[0];
1592
1593        assert_eq!(batch.num_rows(), 2);
1594        assert_eq!(batch.num_columns(), 3);
1595        assert_eq!(batch.schema().field(0).name(), "f4");
1596        assert_eq!(batch.schema().field(1).name(), "f2");
1597        assert_eq!(batch.schema().field(2).name(), "f1");
1598    }
1599
1600    #[tokio::test]
1601    async fn test_projection_error_out_of_bounds() {
1602        let file = arrow_test_data("avro/alltypes_plain.avro");
1603        // Index 100 is out of bounds for the 11-field schema
1604        let err = read_async_file(&file, 1024, None, None, Some(vec![100]))
1605            .await
1606            .unwrap_err();
1607        assert!(matches!(err, ArrowError::AvroError(_)));
1608        assert!(err.to_string().contains("out of bounds"));
1609    }
1610
1611    #[tokio::test]
1612    async fn test_projection_error_duplicate_index() {
1613        let file = arrow_test_data("avro/alltypes_plain.avro");
1614        // Duplicate index 0
1615        let err = read_async_file(&file, 1024, None, None, Some(vec![0, 0]))
1616            .await
1617            .unwrap_err();
1618        assert!(matches!(err, ArrowError::AvroError(_)));
1619        assert!(err.to_string().contains("Duplicate projection index"));
1620    }
1621
1622    #[tokio::test]
1623    async fn test_arrow_schema_from_reader_no_reader_schema() {
1624        let file = arrow_test_data("avro/alltypes_plain.avro");
1625        let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
1626        let location = Path::from_filesystem_path(&file).unwrap();
1627        let file_size = store.head(&location).await.unwrap().size;
1628
1629        let file_reader = AvroObjectReader::new(store, location);
1630        let expected_schema = get_alltypes_schema()
1631            .as_ref()
1632            .clone()
1633            .with_metadata(Default::default());
1634
1635        // Build reader without providing reader schema - should use writer schema from file
1636        let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024)
1637            .try_build()
1638            .await
1639            .unwrap();
1640
1641        assert_eq!(reader.schema().as_ref(), &expected_schema);
1642
1643        let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
1644        let batch = &batches[0];
1645
1646        assert_eq!(batch.schema().as_ref(), &expected_schema);
1647    }
1648
1649    #[tokio::test]
1650    async fn test_arrow_schema_from_reader_with_reader_schema() {
1651        let file = arrow_test_data("avro/alltypes_plain.avro");
1652        let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
1653        let location = Path::from_filesystem_path(&file).unwrap();
1654        let file_size = store.head(&location).await.unwrap().size;
1655
1656        let file_reader = AvroObjectReader::new(store, location);
1657        let schema = get_alltypes_schema()
1658            .project(&[0, 1, 7])
1659            .unwrap()
1660            .with_metadata(Default::default());
1661        let reader_schema = AvroSchema::try_from(&schema).unwrap();
1662        let expected_schema = schema.clone();
1663
1664        // Build reader with provided reader schema - must apply the projection
1665        let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024)
1666            .with_reader_schema(reader_schema)
1667            .try_build()
1668            .await
1669            .unwrap();
1670
1671        assert_eq!(reader.schema().as_ref(), &expected_schema);
1672
1673        let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
1674        let batch = &batches[0];
1675
1676        assert_eq!(batch.schema().as_ref(), &expected_schema);
1677    }
1678
1679    #[tokio::test]
1680    async fn test_arrow_schema_from_reader_nested_records() {
1681        let file = arrow_test_data("avro/nested_records.avro");
1682        let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
1683        let location = Path::from_filesystem_path(&file).unwrap();
1684        let file_size = store.head(&location).await.unwrap().size;
1685
1686        let file_reader = AvroObjectReader::new(store, location);
1687
1688        // The schema produced by the reader should match the expected schema,
1689        // attaching Avro type name metadata to fields of record and list types.
1690        let expected_schema = get_nested_records_schema()
1691            .as_ref()
1692            .clone()
1693            .with_metadata(Default::default());
1694
1695        let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024)
1696            .try_build()
1697            .await
1698            .unwrap();
1699
1700        assert_eq!(reader.schema().as_ref(), &expected_schema);
1701
1702        let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
1703        let batch = &batches[0];
1704
1705        assert_eq!(batch.schema().as_ref(), &expected_schema);
1706    }
1707
1708    #[tokio::test]
1709    async fn test_with_header_size_hint_small() {
1710        // Use a very small header size hint to force multiple fetches
1711        let file = arrow_test_data("avro/alltypes_plain.avro");
1712        let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
1713        let location = Path::from_filesystem_path(&file).unwrap();
1714        let file_size = store.head(&location).await.unwrap().size;
1715
1716        let file_reader = AvroObjectReader::new(store, location);
1717        let schema = get_alltypes_schema();
1718        let reader_schema = AvroSchema::try_from(schema.as_ref()).unwrap();
1719
1720        // Use a tiny header hint (64 bytes) - header is much larger
1721        let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024)
1722            .with_reader_schema(reader_schema)
1723            .with_header_size_hint(64)
1724            .try_build()
1725            .await
1726            .unwrap();
1727
1728        let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
1729        let batch = &batches[0];
1730
1731        assert_eq!(batch.num_rows(), 8);
1732        assert_eq!(batch.num_columns(), 11);
1733    }
1734
1735    #[tokio::test]
1736    async fn test_with_header_size_hint_large() {
1737        // Use a larger header size hint than needed
1738        let file = arrow_test_data("avro/alltypes_plain.avro");
1739        let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
1740        let location = Path::from_filesystem_path(&file).unwrap();
1741        let file_size = store.head(&location).await.unwrap().size;
1742
1743        let file_reader = AvroObjectReader::new(store, location);
1744        let schema = get_alltypes_schema();
1745        let reader_schema = AvroSchema::try_from(schema.as_ref()).unwrap();
1746
1747        // Use a large header hint (64KB)
1748        let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024)
1749            .with_reader_schema(reader_schema)
1750            .with_header_size_hint(64 * 1024)
1751            .try_build()
1752            .await
1753            .unwrap();
1754
1755        let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
1756        let batch = &batches[0];
1757
1758        assert_eq!(batch.num_rows(), 8);
1759        assert_eq!(batch.num_columns(), 11);
1760    }
1761
1762    #[tokio::test]
1763    async fn test_with_tz_utc() {
1764        let file = arrow_test_data("avro/alltypes_plain.avro");
1765        let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
1766        let location = Path::from_filesystem_path(&file).unwrap();
1767        let file_size = store.head(&location).await.unwrap().size;
1768
1769        let file_reader = AvroObjectReader::new(store, location);
1770        let schema = get_alltypes_schema_with_tz("UTC");
1771        let reader_schema = AvroSchema::try_from(schema.as_ref()).unwrap();
1772
1773        // Specify the time zone ID of "UTC" for timestamp fields with time zone.
1774        let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024)
1775            .with_reader_schema(reader_schema)
1776            .with_tz(Tz::Utc)
1777            .try_build()
1778            .await
1779            .unwrap();
1780
1781        let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
1782        let batch = &batches[0];
1783
1784        assert_eq!(batch.num_columns(), 11);
1785
1786        let schema = batch.schema();
1787        let ts_field = schema.field_with_name("timestamp_col").unwrap();
1788        assert!(
1789            matches!(
1790                ts_field.data_type(),
1791                DataType::Timestamp(TimeUnit::Microsecond, Some(tz)) if tz.as_ref() == "UTC"
1792            ),
1793            "expected Timestamp(Microsecond, Some(\"UTC\")), got {:?}",
1794            ts_field.data_type()
1795        );
1796    }
1797
1798    #[tokio::test]
1799    async fn test_with_utf8_view_enabled() {
1800        // Test that utf8_view produces StringViewArray instead of StringArray
1801        let file = arrow_test_data("avro/nested_records.avro");
1802        let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
1803        let location = Path::from_filesystem_path(&file).unwrap();
1804        let file_size = store.head(&location).await.unwrap().size;
1805
1806        let file_reader = AvroObjectReader::new(store, location);
1807
1808        let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024)
1809            .with_utf8_view(true)
1810            .try_build()
1811            .await
1812            .unwrap();
1813
1814        let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
1815        let batch = &batches[0];
1816
1817        assert_eq!(batch.num_rows(), 2);
1818
1819        // The f1 struct contains f1_1 which is a string field
1820        // With utf8_view enabled, it should be Utf8View type
1821        let f1_col = batch.column(0);
1822        let f1_struct = f1_col.as_struct();
1823        let f1_1_field = f1_struct.column_by_name("f1_1").unwrap();
1824
1825        // Check that the data type is Utf8View
1826        assert_eq!(f1_1_field.data_type(), &DataType::Utf8View);
1827    }
1828
1829    #[tokio::test]
1830    async fn test_with_utf8_view_disabled() {
1831        // Test that without utf8_view, we get regular Utf8
1832        let file = arrow_test_data("avro/nested_records.avro");
1833        let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
1834        let location = Path::from_filesystem_path(&file).unwrap();
1835        let file_size = store.head(&location).await.unwrap().size;
1836
1837        let file_reader = AvroObjectReader::new(store, location);
1838
1839        let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024)
1840            .with_utf8_view(false)
1841            .try_build()
1842            .await
1843            .unwrap();
1844
1845        let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
1846        let batch = &batches[0];
1847
1848        assert_eq!(batch.num_rows(), 2);
1849
1850        // The f1 struct contains f1_1 which is a string field
1851        // Without utf8_view, it should be regular Utf8
1852        let f1_col = batch.column(0);
1853        let f1_struct = f1_col.as_struct();
1854        let f1_1_field = f1_struct.column_by_name("f1_1").unwrap();
1855
1856        assert_eq!(f1_1_field.data_type(), &DataType::Utf8);
1857    }
1858
1859    #[tokio::test]
1860    async fn test_with_strict_mode_disabled_allows_null_second() {
1861        // Test that with strict_mode disabled, unions of ['T', 'null'] are allowed
1862        // The alltypes_nulls_plain.avro file has unions with null second
1863        let file = arrow_test_data("avro/alltypes_nulls_plain.avro");
1864        let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
1865        let location = Path::from_filesystem_path(&file).unwrap();
1866        let file_size = store.head(&location).await.unwrap().size;
1867
1868        let file_reader = AvroObjectReader::new(store, location);
1869
1870        // Without strict mode, this should succeed
1871        let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024)
1872            .with_strict_mode(false)
1873            .try_build()
1874            .await
1875            .unwrap();
1876
1877        let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
1878        assert_eq!(batches.len(), 1);
1879        assert_eq!(batches[0].num_rows(), 1);
1880    }
1881
1882    #[tokio::test]
1883    async fn test_with_strict_mode_enabled_rejects_null_second() {
1884        // Test that with strict_mode enabled, unions of ['T', 'null'] are rejected
1885        // The alltypes_plain.avro file has unions like ["int", "null"] (null second)
1886        let file = arrow_test_data("avro/alltypes_plain.avro");
1887        let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
1888        let location = Path::from_filesystem_path(&file).unwrap();
1889        let file_size = store.head(&location).await.unwrap().size;
1890
1891        let file_reader = AvroObjectReader::new(store, location);
1892
1893        // With strict mode, this should fail because of ['T', 'null'] unions
1894        let result = AsyncAvroFileReader::builder(file_reader, file_size, 1024)
1895            .with_strict_mode(true)
1896            .try_build()
1897            .await;
1898
1899        match result {
1900            Ok(_) => panic!("Expected error for strict_mode with ['T', 'null'] union"),
1901            Err(err) => {
1902                assert!(
1903                    err.to_string().contains("disallowed in strict_mode"),
1904                    "Expected strict_mode error, got: {}",
1905                    err
1906                );
1907            }
1908        }
1909    }
1910
1911    #[tokio::test]
1912    async fn test_with_strict_mode_enabled_valid_schema() {
1913        // Test that strict_mode works with schemas that have proper ['null', 'T'] unions
1914        // The nested_records.avro file has properly ordered unions
1915        let file = arrow_test_data("avro/nested_records.avro");
1916        let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
1917        let location = Path::from_filesystem_path(&file).unwrap();
1918        let file_size = store.head(&location).await.unwrap().size;
1919
1920        let file_reader = AvroObjectReader::new(store, location);
1921
1922        // With strict mode, properly ordered unions should still work
1923        let reader = AsyncAvroFileReader::builder(file_reader, file_size, 1024)
1924            .with_strict_mode(true)
1925            .try_build()
1926            .await
1927            .unwrap();
1928
1929        let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
1930        assert_eq!(batches.len(), 1);
1931        assert_eq!(batches[0].num_rows(), 2);
1932    }
1933
1934    #[tokio::test]
1935    async fn test_builder_options_combined() {
1936        // Test combining multiple builder options
1937        let file = arrow_test_data("avro/nested_records.avro");
1938        let store: Arc<dyn ObjectStore> = Arc::new(LocalFileSystem::new());
1939        let location = Path::from_filesystem_path(&file).unwrap();
1940        let file_size = store.head(&location).await.unwrap().size;
1941
1942        let file_reader = AvroObjectReader::new(store, location);
1943
1944        let reader = AsyncAvroFileReader::builder(file_reader, file_size, 2)
1945            .with_header_size_hint(128)
1946            .with_utf8_view(true)
1947            .with_strict_mode(true)
1948            .with_projection(vec![0, 2]) // f1 and f3
1949            .try_build()
1950            .await
1951            .unwrap();
1952
1953        let batches: Vec<RecordBatch> = reader.try_collect().await.unwrap();
1954        let batch = &batches[0];
1955
1956        // Should have 2 columns (f1 and f3) due to projection
1957        assert_eq!(batch.num_columns(), 2);
1958        assert_eq!(batch.schema().field(0).name(), "f1");
1959        assert_eq!(batch.schema().field(1).name(), "f3");
1960
1961        // Verify utf8_view is applied
1962        let f1_col = batch.column(0);
1963        let f1_struct = f1_col.as_struct();
1964        let f1_1_field = f1_struct.column_by_name("f1_1").unwrap();
1965        assert_eq!(f1_1_field.data_type(), &DataType::Utf8View);
1966    }
1967}