arrow_avro/writer/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Avro writer implementation for the `arrow-avro` crate.
19//!
20//! # Overview
21//!
22//! Use this module to serialize Arrow `RecordBatch` values into Avro. Two output
23//! formats are supported:
24//!
25//! * **[`AvroWriter`](crate::writer::AvroWriter)** — writes an **Object Container File (OCF)**: a self‑describing
26//!   file with header (schema JSON + metadata), optional compression, data blocks, and
27//!   sync markers. See Avro 1.11.1 “Object Container Files.”
28//!   <https://avro.apache.org/docs/1.11.1/specification/#object-container-files>
29//! * **[`AvroStreamWriter`](crate::writer::AvroStreamWriter)** — writes a **Single Object Encoding (SOE) Stream** (“datum” bytes) without
30//!   any container framing. This is useful when the schema is known out‑of‑band (i.e.,
31//!   via a registry) and you want minimal overhead.
32//!
33//! ## Which format should you use?
34//!
35//! * Use **OCF** when you need a portable, self‑contained file. The schema travels with
36//!   the data, making it easy to read elsewhere.
37//! * Use the **SOE stream** when your surrounding protocol supplies schema information
38//!   (i.e., a schema registry). The writer automatically adds the per‑record prefix:
39//!   - **SOE**: Each record is prefixed with the 2-byte header (`0xC3 0x01`) followed by
40//!     an 8‑byte little‑endian CRC‑64‑AVRO fingerprint, then the Avro body.
41//!     See Avro 1.11.1 "Single object encoding".
42//!     <https://avro.apache.org/docs/1.11.1/specification/#single-object-encoding>
43//!   - **Confluent wire format**: Each record is prefixed with magic byte `0x00` followed by
44//!     a **big‑endian** 4‑byte schema ID, then the Avro body. Use `FingerprintStrategy::Id(schema_id)`.
45//!     <https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format>
46//!   - **Apicurio wire format**: Each record is prefixed with magic byte `0x00` followed by
47//!     a **big‑endian** 8‑byte schema ID, then the Avro body. Use `FingerprintStrategy::Id64(schema_id)`.
48//!     <https://www.apicur.io/registry/docs/apicurio-registry/1.3.3.Final/getting-started/assembly-using-kafka-client-serdes.html#registry-serdes-types-avro-registry>
49//!
50//! ## Choosing the Avro schema
51//!
52//! By default, the writer converts your Arrow schema to Avro (including a top‑level record
53//! name). If you already have an Avro schema JSON you want to use verbatim, put it into the
54//! Arrow schema metadata under the `avro.schema` key before constructing the writer. The
55//! builder will use that schema instead of generating a new one (unless `strip_metadata` is
56//! set to true in the options).
57//!
58//! ## Compression
59//!
60//! For OCF, you may enable a compression codec via `WriterBuilder::with_compression`. The
61//! chosen codec is written into the file header and used for subsequent blocks. SOE stream
62//! writing doesn’t apply container‑level compression.
63//!
64//! ---
65use crate::codec::AvroFieldBuilder;
66use crate::compression::CompressionCodec;
67use crate::schema::{
68    AvroSchema, Fingerprint, FingerprintAlgorithm, FingerprintStrategy, SCHEMA_METADATA_KEY,
69};
70use crate::writer::encoder::{RecordEncoder, RecordEncoderBuilder, write_long};
71use crate::writer::format::{AvroFormat, AvroOcfFormat, AvroSoeFormat};
72use arrow_array::RecordBatch;
73use arrow_schema::{ArrowError, Schema};
74use std::io::Write;
75use std::sync::Arc;
76
77/// Encodes `RecordBatch` into the Avro binary format.
78mod encoder;
79/// Logic for different Avro container file formats.
80pub mod format;
81
82/// Builder to configure and create a `Writer`.
83#[derive(Debug, Clone)]
84pub struct WriterBuilder {
85    schema: Schema,
86    codec: Option<CompressionCodec>,
87    capacity: usize,
88    fingerprint_strategy: Option<FingerprintStrategy>,
89}
90
91impl WriterBuilder {
92    /// Create a new builder with default settings.
93    ///
94    /// The Avro schema used for writing is determined as follows:
95    /// 1) If the Arrow schema metadata contains `avro::schema` (see `SCHEMA_METADATA_KEY`),
96    ///    that JSON is used verbatim.
97    /// 2) Otherwise, the Arrow schema is converted to an Avro record schema.
98    pub fn new(schema: Schema) -> Self {
99        Self {
100            schema,
101            codec: None,
102            capacity: 1024,
103            fingerprint_strategy: None,
104        }
105    }
106
107    /// Set the fingerprinting strategy for the stream writer.
108    /// This determines the per-record prefix format.
109    pub fn with_fingerprint_strategy(mut self, strategy: FingerprintStrategy) -> Self {
110        self.fingerprint_strategy = Some(strategy);
111        self
112    }
113
114    /// Change the compression codec.
115    pub fn with_compression(mut self, codec: Option<CompressionCodec>) -> Self {
116        self.codec = codec;
117        self
118    }
119
120    /// Sets the capacity for the given object and returns the modified instance.
121    pub fn with_capacity(mut self, capacity: usize) -> Self {
122        self.capacity = capacity;
123        self
124    }
125
126    /// Create a new `Writer` with specified `AvroFormat` and builder options.
127    /// Performs one‑time startup (header/stream init, encoder plan).
128    pub fn build<W, F>(self, mut writer: W) -> Result<Writer<W, F>, ArrowError>
129    where
130        W: Write,
131        F: AvroFormat,
132    {
133        let mut format = F::default();
134        let avro_schema = match self.schema.metadata.get(SCHEMA_METADATA_KEY) {
135            Some(json) => AvroSchema::new(json.clone()),
136            None => AvroSchema::try_from(&self.schema)?,
137        };
138        let maybe_fingerprint = if F::NEEDS_PREFIX {
139            match self.fingerprint_strategy {
140                Some(FingerprintStrategy::Id(id)) => Some(Fingerprint::Id(id)),
141                Some(FingerprintStrategy::Id64(id)) => Some(Fingerprint::Id64(id)),
142                Some(strategy) => {
143                    Some(avro_schema.fingerprint(FingerprintAlgorithm::from(strategy))?)
144                }
145                None => Some(
146                    avro_schema
147                        .fingerprint(FingerprintAlgorithm::from(FingerprintStrategy::Rabin))?,
148                ),
149            }
150        } else {
151            None
152        };
153        let mut md = self.schema.metadata().clone();
154        md.insert(
155            SCHEMA_METADATA_KEY.to_string(),
156            avro_schema.clone().json_string,
157        );
158        let schema = Arc::new(Schema::new_with_metadata(self.schema.fields().clone(), md));
159        format.start_stream(&mut writer, &schema, self.codec)?;
160        let avro_root = AvroFieldBuilder::new(&avro_schema.schema()?).build()?;
161        let encoder = RecordEncoderBuilder::new(&avro_root, schema.as_ref())
162            .with_fingerprint(maybe_fingerprint)
163            .build()?;
164        Ok(Writer {
165            writer,
166            schema,
167            format,
168            compression: self.codec,
169            capacity: self.capacity,
170            encoder,
171        })
172    }
173}
174
175/// Generic Avro writer.
176///
177/// This type is generic over the output Write sink (`W`) and the Avro format (`F`).
178/// You’ll usually use the concrete aliases:
179///
180/// * **[`AvroWriter`]** for **OCF** (self‑describing container file)
181/// * **[`AvroStreamWriter`]** for **SOE** Avro streams
182#[derive(Debug)]
183pub struct Writer<W: Write, F: AvroFormat> {
184    writer: W,
185    schema: Arc<Schema>,
186    format: F,
187    compression: Option<CompressionCodec>,
188    capacity: usize,
189    encoder: RecordEncoder,
190}
191
192/// Alias for an Avro **Object Container File** writer.
193///
194/// ### Quickstart (runnable)
195///
196/// ```
197/// use std::io::Cursor;
198/// use std::sync::Arc;
199/// use arrow_array::{ArrayRef, Int64Array, StringArray, RecordBatch};
200/// use arrow_schema::{DataType, Field, Schema};
201/// use arrow_avro::writer::AvroWriter;
202/// use arrow_avro::reader::ReaderBuilder;
203///
204/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
205/// // Writer schema: { id: long, name: string }
206/// let writer_schema = Schema::new(vec![
207///     Field::new("id", DataType::Int64, false),
208///     Field::new("name", DataType::Utf8, false),
209/// ]);
210///
211/// // Build a RecordBatch with two rows
212/// let batch = RecordBatch::try_new(
213///     Arc::new(writer_schema.clone()),
214///     vec![
215///         Arc::new(Int64Array::from(vec![1, 2])) as ArrayRef,
216///         Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
217///     ],
218/// )?;
219///
220/// // Write an Avro **Object Container File** (OCF) to memory
221/// let mut w = AvroWriter::new(Vec::<u8>::new(), writer_schema.clone())?;
222/// w.write(&batch)?;
223/// w.finish()?;
224/// let bytes = w.into_inner();
225///
226/// // Build a Reader and decode the batch back
227/// let mut r = ReaderBuilder::new().build(Cursor::new(bytes))?;
228/// let out = r.next().unwrap()?;
229/// assert_eq!(out.num_rows(), 2);
230/// # Ok(()) }
231/// ```
232pub type AvroWriter<W> = Writer<W, AvroOcfFormat>;
233
234/// Alias for an Avro **Single Object Encoding** stream writer.
235///
236/// ### Example
237///
238/// This writer automatically adds the appropriate per-record prefix (based on the
239/// fingerprint strategy) before the Avro body of each record. The default is Single
240/// Object Encoding (SOE) with a Rabin fingerprint.
241///
242/// ```
243/// use std::sync::Arc;
244/// use arrow_array::{ArrayRef, Int64Array, RecordBatch};
245/// use arrow_schema::{DataType, Field, Schema};
246/// use arrow_avro::writer::AvroStreamWriter;
247///
248/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
249/// // One‑column Arrow batch
250/// let schema = Schema::new(vec![Field::new("x", DataType::Int64, false)]);
251/// let batch = RecordBatch::try_new(
252///     Arc::new(schema.clone()),
253///     vec![Arc::new(Int64Array::from(vec![10, 20])) as ArrayRef],
254/// )?;
255///
256/// // Write an Avro Single Object Encoding stream to a Vec<u8>
257/// let sink: Vec<u8> = Vec::new();
258/// let mut w = AvroStreamWriter::new(sink, schema)?;
259/// w.write(&batch)?;
260/// w.finish()?;
261/// let bytes = w.into_inner();
262/// assert!(!bytes.is_empty());
263/// # Ok(()) }
264/// ```
265pub type AvroStreamWriter<W> = Writer<W, AvroSoeFormat>;
266
267impl<W: Write> Writer<W, AvroOcfFormat> {
268    /// Convenience constructor – same as [`WriterBuilder::build`] with `AvroOcfFormat`.
269    ///
270    /// ### Example
271    ///
272    /// ```
273    /// use std::sync::Arc;
274    /// use arrow_array::{ArrayRef, Int32Array, RecordBatch};
275    /// use arrow_schema::{DataType, Field, Schema};
276    /// use arrow_avro::writer::AvroWriter;
277    ///
278    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
279    /// let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
280    /// let batch = RecordBatch::try_new(
281    ///     Arc::new(schema.clone()),
282    ///     vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef],
283    /// )?;
284    ///
285    /// let buf: Vec<u8> = Vec::new();
286    /// let mut w = AvroWriter::new(buf, schema)?;
287    /// w.write(&batch)?;
288    /// w.finish()?;
289    /// let bytes = w.into_inner();
290    /// assert!(!bytes.is_empty());
291    /// # Ok(()) }
292    /// ```
293    pub fn new(writer: W, schema: Schema) -> Result<Self, ArrowError> {
294        WriterBuilder::new(schema).build::<W, AvroOcfFormat>(writer)
295    }
296
297    /// Return a reference to the 16‑byte sync marker generated for this file.
298    pub fn sync_marker(&self) -> Option<&[u8; 16]> {
299        self.format.sync_marker()
300    }
301}
302
303impl<W: Write> Writer<W, AvroSoeFormat> {
304    /// Convenience constructor to create a new [`AvroStreamWriter`].
305    ///
306    /// The resulting stream contains **Single Object Encodings** (no OCF header/sync).
307    ///
308    /// ### Example
309    ///
310    /// ```
311    /// use std::sync::Arc;
312    /// use arrow_array::{ArrayRef, Int64Array, RecordBatch};
313    /// use arrow_schema::{DataType, Field, Schema};
314    /// use arrow_avro::writer::AvroStreamWriter;
315    ///
316    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
317    /// let schema = Schema::new(vec![Field::new("x", DataType::Int64, false)]);
318    /// let batch = RecordBatch::try_new(
319    ///     Arc::new(schema.clone()),
320    ///     vec![Arc::new(Int64Array::from(vec![10, 20])) as ArrayRef],
321    /// )?;
322    ///
323    /// let sink: Vec<u8> = Vec::new();
324    /// let mut w = AvroStreamWriter::new(sink, schema)?;
325    /// w.write(&batch)?;
326    /// w.finish()?;
327    /// let bytes = w.into_inner();
328    /// assert!(!bytes.is_empty());
329    /// # Ok(()) }
330    /// ```
331    pub fn new(writer: W, schema: Schema) -> Result<Self, ArrowError> {
332        WriterBuilder::new(schema).build::<W, AvroSoeFormat>(writer)
333    }
334}
335
336impl<W: Write, F: AvroFormat> Writer<W, F> {
337    /// Serialize one [`RecordBatch`] to the output.
338    pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
339        if batch.schema().fields() != self.schema.fields() {
340            return Err(ArrowError::SchemaError(
341                "Schema of RecordBatch differs from Writer schema".to_string(),
342            ));
343        }
344        match self.format.sync_marker() {
345            Some(&sync) => self.write_ocf_block(batch, &sync),
346            None => self.write_stream(batch),
347        }
348    }
349
350    /// A convenience method to write a slice of [`RecordBatch`].
351    ///
352    /// This is equivalent to calling `write` for each batch in the slice.
353    pub fn write_batches(&mut self, batches: &[&RecordBatch]) -> Result<(), ArrowError> {
354        for b in batches {
355            self.write(b)?;
356        }
357        Ok(())
358    }
359
360    /// Flush remaining buffered data and (for OCF) ensure the header is present.
361    pub fn finish(&mut self) -> Result<(), ArrowError> {
362        self.writer
363            .flush()
364            .map_err(|e| ArrowError::IoError(format!("Error flushing writer: {e}"), e))
365    }
366
367    /// Consume the writer, returning the underlying output object.
368    pub fn into_inner(self) -> W {
369        self.writer
370    }
371
372    fn write_ocf_block(&mut self, batch: &RecordBatch, sync: &[u8; 16]) -> Result<(), ArrowError> {
373        let mut buf = Vec::<u8>::with_capacity(self.capacity);
374        self.encoder.encode(&mut buf, batch)?;
375        let encoded = match self.compression {
376            Some(codec) => codec.compress(&buf)?,
377            None => buf,
378        };
379        write_long(&mut self.writer, batch.num_rows() as i64)?;
380        write_long(&mut self.writer, encoded.len() as i64)?;
381        self.writer
382            .write_all(&encoded)
383            .map_err(|e| ArrowError::IoError(format!("Error writing Avro block: {e}"), e))?;
384        self.writer
385            .write_all(sync)
386            .map_err(|e| ArrowError::IoError(format!("Error writing Avro sync: {e}"), e))?;
387        Ok(())
388    }
389
390    fn write_stream(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
391        self.encoder.encode(&mut self.writer, batch)?;
392        Ok(())
393    }
394}
395
396#[cfg(test)]
397mod tests {
398    use super::*;
399    use crate::compression::CompressionCodec;
400    use crate::reader::ReaderBuilder;
401    use crate::schema::{AvroSchema, SchemaStore};
402    use crate::test_util::arrow_test_data;
403    use arrow::datatypes::TimeUnit;
404    #[cfg(feature = "avro_custom_types")]
405    use arrow_array::types::{Int16Type, Int32Type, Int64Type};
406    use arrow_array::types::{
407        Time32MillisecondType, Time64MicrosecondType, TimestampMicrosecondType,
408        TimestampMillisecondType, TimestampNanosecondType,
409    };
410    use arrow_array::{
411        Array, ArrayRef, BinaryArray, Date32Array, Int32Array, PrimitiveArray, RecordBatch,
412        StringArray, StructArray, UnionArray,
413    };
414    #[cfg(feature = "avro_custom_types")]
415    use arrow_array::{Int16Array, Int64Array, RunArray};
416    use arrow_schema::UnionMode;
417    #[cfg(not(feature = "avro_custom_types"))]
418    use arrow_schema::{DataType, Field, Schema};
419    #[cfg(feature = "avro_custom_types")]
420    use arrow_schema::{DataType, Field, Schema};
421    use std::collections::HashMap;
422    use std::collections::HashSet;
423    use std::fs::File;
424    use std::io::{BufReader, Cursor};
425    use std::path::PathBuf;
426    use std::sync::Arc;
427    use tempfile::NamedTempFile;
428
429    fn files() -> impl Iterator<Item = &'static str> {
430        [
431            // TODO: avoid requiring snappy for this file
432            #[cfg(feature = "snappy")]
433            "avro/alltypes_plain.avro",
434            #[cfg(feature = "snappy")]
435            "avro/alltypes_plain.snappy.avro",
436            #[cfg(feature = "zstd")]
437            "avro/alltypes_plain.zstandard.avro",
438            #[cfg(feature = "bzip2")]
439            "avro/alltypes_plain.bzip2.avro",
440            #[cfg(feature = "xz")]
441            "avro/alltypes_plain.xz.avro",
442        ]
443        .into_iter()
444    }
445
446    fn make_schema() -> Schema {
447        Schema::new(vec![
448            Field::new("id", DataType::Int32, false),
449            Field::new("name", DataType::Binary, false),
450        ])
451    }
452
453    fn make_batch() -> RecordBatch {
454        let ids = Int32Array::from(vec![1, 2, 3]);
455        let names = BinaryArray::from_vec(vec![b"a".as_ref(), b"b".as_ref(), b"c".as_ref()]);
456        RecordBatch::try_new(
457            Arc::new(make_schema()),
458            vec![Arc::new(ids) as ArrayRef, Arc::new(names) as ArrayRef],
459        )
460        .expect("failed to build test RecordBatch")
461    }
462
463    #[test]
464    fn test_stream_writer_writes_prefix_per_row_rt() -> Result<(), ArrowError> {
465        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
466        let batch = RecordBatch::try_new(
467            Arc::new(schema.clone()),
468            vec![Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef],
469        )?;
470        let buf: Vec<u8> = Vec::new();
471        let mut writer = AvroStreamWriter::new(buf, schema.clone())?;
472        writer.write(&batch)?;
473        let encoded = writer.into_inner();
474        let mut store = SchemaStore::new(); // Rabin by default
475        let avro_schema = AvroSchema::try_from(&schema)?;
476        let _fp = store.register(avro_schema)?;
477        let mut decoder = ReaderBuilder::new()
478            .with_writer_schema_store(store)
479            .build_decoder()?;
480        let _consumed = decoder.decode(&encoded)?;
481        let decoded = decoder
482            .flush()?
483            .expect("expected at least one batch from decoder");
484        assert_eq!(decoded.num_columns(), 1);
485        assert_eq!(decoded.num_rows(), 2);
486        let col = decoded
487            .column(0)
488            .as_any()
489            .downcast_ref::<Int32Array>()
490            .expect("int column");
491        assert_eq!(col, &Int32Array::from(vec![10, 20]));
492        Ok(())
493    }
494
495    #[test]
496    fn test_nullable_struct_with_nonnullable_field_sliced_encoding() {
497        use arrow_array::{ArrayRef, Int32Array, StringArray, StructArray};
498        use arrow_buffer::NullBuffer;
499        use arrow_schema::{DataType, Field, Fields, Schema};
500        use std::sync::Arc;
501        let inner_fields = Fields::from(vec![
502            Field::new("id", DataType::Int32, false), // non-nullable
503            Field::new("name", DataType::Utf8, true), // nullable
504        ]);
505        let inner_struct_type = DataType::Struct(inner_fields.clone());
506        let schema = Schema::new(vec![
507            Field::new("before", inner_struct_type.clone(), true), // nullable struct
508            Field::new("after", inner_struct_type.clone(), true),  // nullable struct
509            Field::new("op", DataType::Utf8, false),               // non-nullable
510        ]);
511        let before_ids = Int32Array::from(vec![None, None]);
512        let before_names = StringArray::from(vec![None::<&str>, None]);
513        let before_struct = StructArray::new(
514            inner_fields.clone(),
515            vec![
516                Arc::new(before_ids) as ArrayRef,
517                Arc::new(before_names) as ArrayRef,
518            ],
519            Some(NullBuffer::from(vec![false, false])),
520        );
521        let after_ids = Int32Array::from(vec![1, 2]); // non-nullable, no nulls
522        let after_names = StringArray::from(vec![Some("Alice"), Some("Bob")]);
523        let after_struct = StructArray::new(
524            inner_fields.clone(),
525            vec![
526                Arc::new(after_ids) as ArrayRef,
527                Arc::new(after_names) as ArrayRef,
528            ],
529            Some(NullBuffer::from(vec![true, true])),
530        );
531        let op_col = StringArray::from(vec!["r", "r"]);
532        let batch = RecordBatch::try_new(
533            Arc::new(schema.clone()),
534            vec![
535                Arc::new(before_struct) as ArrayRef,
536                Arc::new(after_struct) as ArrayRef,
537                Arc::new(op_col) as ArrayRef,
538            ],
539        )
540        .expect("failed to create test batch");
541        let mut sink = Vec::new();
542        let mut writer = WriterBuilder::new(schema)
543            .with_fingerprint_strategy(FingerprintStrategy::Id(1))
544            .build::<_, AvroSoeFormat>(&mut sink)
545            .expect("failed to create writer");
546        for row_idx in 0..batch.num_rows() {
547            let single_row = batch.slice(row_idx, 1);
548            let after_col = single_row.column(1);
549            assert_eq!(
550                after_col.null_count(),
551                0,
552                "after column should have no nulls in sliced row"
553            );
554            writer
555                .write(&single_row)
556                .unwrap_or_else(|e| panic!("Failed to encode row {row_idx}: {e}"));
557        }
558        writer.finish().expect("failed to finish writer");
559        assert!(!sink.is_empty(), "encoded output should not be empty");
560    }
561
562    #[test]
563    fn test_nullable_struct_with_decimal_and_timestamp_sliced() {
564        use arrow_array::{
565            ArrayRef, Decimal128Array, Int32Array, StringArray, StructArray,
566            TimestampMicrosecondArray,
567        };
568        use arrow_buffer::NullBuffer;
569        use arrow_schema::{DataType, Field, Fields, Schema};
570        use std::sync::Arc;
571        let row_fields = Fields::from(vec![
572            Field::new("id", DataType::Int32, false),
573            Field::new("name", DataType::Utf8, true),
574            Field::new("category", DataType::Utf8, true),
575            Field::new("price", DataType::Decimal128(10, 2), true),
576            Field::new("stock_quantity", DataType::Int32, true),
577            Field::new(
578                "created_at",
579                DataType::Timestamp(TimeUnit::Microsecond, None),
580                true,
581            ),
582        ]);
583        let row_struct_type = DataType::Struct(row_fields.clone());
584        let schema = Schema::new(vec![
585            Field::new("before", row_struct_type.clone(), true),
586            Field::new("after", row_struct_type.clone(), true),
587            Field::new("op", DataType::Utf8, false),
588        ]);
589        let before_struct = StructArray::new_null(row_fields.clone(), 2);
590        let ids = Int32Array::from(vec![1, 2]);
591        let names = StringArray::from(vec![Some("Widget"), Some("Gadget")]);
592        let categories = StringArray::from(vec![Some("Electronics"), Some("Electronics")]);
593        let prices = Decimal128Array::from(vec![Some(1999), Some(2999)])
594            .with_precision_and_scale(10, 2)
595            .unwrap();
596        let quantities = Int32Array::from(vec![Some(100), Some(50)]);
597        let timestamps = TimestampMicrosecondArray::from(vec![
598            Some(1700000000000000i64),
599            Some(1700000001000000i64),
600        ]);
601        let after_struct = StructArray::new(
602            row_fields.clone(),
603            vec![
604                Arc::new(ids) as ArrayRef,
605                Arc::new(names) as ArrayRef,
606                Arc::new(categories) as ArrayRef,
607                Arc::new(prices) as ArrayRef,
608                Arc::new(quantities) as ArrayRef,
609                Arc::new(timestamps) as ArrayRef,
610            ],
611            Some(NullBuffer::from(vec![true, true])),
612        );
613        let op_col = StringArray::from(vec!["r", "r"]);
614        let batch = RecordBatch::try_new(
615            Arc::new(schema.clone()),
616            vec![
617                Arc::new(before_struct) as ArrayRef,
618                Arc::new(after_struct) as ArrayRef,
619                Arc::new(op_col) as ArrayRef,
620            ],
621        )
622        .expect("failed to create products batch");
623        let mut sink = Vec::new();
624        let mut writer = WriterBuilder::new(schema)
625            .with_fingerprint_strategy(FingerprintStrategy::Id(1))
626            .build::<_, AvroSoeFormat>(&mut sink)
627            .expect("failed to create writer");
628        // Encode row by row
629        for row_idx in 0..batch.num_rows() {
630            let single_row = batch.slice(row_idx, 1);
631            writer
632                .write(&single_row)
633                .unwrap_or_else(|e| panic!("Failed to encode product row {row_idx}: {e}"));
634        }
635        writer.finish().expect("failed to finish writer");
636        assert!(!sink.is_empty());
637    }
638
639    #[test]
640    fn non_nullable_child_in_nullable_struct_should_encode_per_row() {
641        use arrow_array::{
642            ArrayRef, Int32Array, Int64Array, RecordBatch, StringArray, StructArray,
643        };
644        use arrow_schema::{DataType, Field, Fields, Schema};
645        use std::sync::Arc;
646        let row_fields = Fields::from(vec![
647            Field::new("id", DataType::Int32, false),
648            Field::new("name", DataType::Utf8, true),
649        ]);
650        let row_struct_dt = DataType::Struct(row_fields.clone());
651        let before: ArrayRef = Arc::new(StructArray::new_null(row_fields.clone(), 1));
652        let id_col: ArrayRef = Arc::new(Int32Array::from(vec![1]));
653        let name_col: ArrayRef = Arc::new(StringArray::from(vec![None::<&str>]));
654        let after: ArrayRef = Arc::new(StructArray::new(
655            row_fields.clone(),
656            vec![id_col, name_col],
657            None,
658        ));
659        let schema = Arc::new(Schema::new(vec![
660            Field::new("before", row_struct_dt.clone(), true),
661            Field::new("after", row_struct_dt, true),
662            Field::new("op", DataType::Utf8, false),
663            Field::new("ts_ms", DataType::Int64, false),
664        ]));
665        let op = Arc::new(StringArray::from(vec!["r"])) as ArrayRef;
666        let ts_ms = Arc::new(Int64Array::from(vec![1732900000000_i64])) as ArrayRef;
667        let batch = RecordBatch::try_new(schema.clone(), vec![before, after, op, ts_ms]).unwrap();
668        let mut buf = Vec::new();
669        let mut writer = WriterBuilder::new(schema.as_ref().clone())
670            .build::<_, AvroSoeFormat>(&mut buf)
671            .unwrap();
672        let single = batch.slice(0, 1);
673        let res = writer.write(&single);
674        assert!(
675            res.is_ok(),
676            "expected to encode successfully, got: {:?}",
677            res.err()
678        );
679    }
680
681    #[test]
682    fn test_union_nonzero_type_ids() -> Result<(), ArrowError> {
683        use arrow_array::UnionArray;
684        use arrow_buffer::Buffer;
685        use arrow_schema::UnionFields;
686        let union_fields = UnionFields::try_new(
687            vec![2, 5],
688            vec![
689                Field::new("v_str", DataType::Utf8, true),
690                Field::new("v_int", DataType::Int32, true),
691            ],
692        )
693        .unwrap();
694        let strings = StringArray::from(vec!["hello", "world"]);
695        let ints = Int32Array::from(vec![10, 20, 30]);
696        let type_ids = Buffer::from_slice_ref([2_i8, 5, 5, 2, 5]);
697        let offsets = Buffer::from_slice_ref([0_i32, 0, 1, 1, 2]);
698        let union_array = UnionArray::try_new(
699            union_fields.clone(),
700            type_ids.into(),
701            Some(offsets.into()),
702            vec![Arc::new(strings) as ArrayRef, Arc::new(ints) as ArrayRef],
703        )?;
704        let schema = Schema::new(vec![Field::new(
705            "union_col",
706            DataType::Union(union_fields, UnionMode::Dense),
707            false,
708        )]);
709        let batch = RecordBatch::try_new(
710            Arc::new(schema.clone()),
711            vec![Arc::new(union_array) as ArrayRef],
712        )?;
713        let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
714        assert!(
715            writer.write(&batch).is_ok(),
716            "Expected no error from writing"
717        );
718        writer.finish()?;
719        assert!(
720            writer.finish().is_ok(),
721            "Expected no error from finishing writer"
722        );
723        Ok(())
724    }
725
726    #[test]
727    fn test_stream_writer_with_id_fingerprint_rt() -> Result<(), ArrowError> {
728        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
729        let batch = RecordBatch::try_new(
730            Arc::new(schema.clone()),
731            vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef],
732        )?;
733        let schema_id: u32 = 42;
734        let mut writer = WriterBuilder::new(schema.clone())
735            .with_fingerprint_strategy(FingerprintStrategy::Id(schema_id))
736            .build::<_, AvroSoeFormat>(Vec::new())?;
737        writer.write(&batch)?;
738        let encoded = writer.into_inner();
739        let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id);
740        let avro_schema = AvroSchema::try_from(&schema)?;
741        let _ = store.set(Fingerprint::Id(schema_id), avro_schema)?;
742        let mut decoder = ReaderBuilder::new()
743            .with_writer_schema_store(store)
744            .build_decoder()?;
745        let _ = decoder.decode(&encoded)?;
746        let decoded = decoder
747            .flush()?
748            .expect("expected at least one batch from decoder");
749        assert_eq!(decoded.num_columns(), 1);
750        assert_eq!(decoded.num_rows(), 3);
751        let col = decoded
752            .column(0)
753            .as_any()
754            .downcast_ref::<Int32Array>()
755            .expect("int column");
756        assert_eq!(col, &Int32Array::from(vec![1, 2, 3]));
757        Ok(())
758    }
759
760    #[test]
761    fn test_stream_writer_with_id64_fingerprint_rt() -> Result<(), ArrowError> {
762        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
763        let batch = RecordBatch::try_new(
764            Arc::new(schema.clone()),
765            vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef],
766        )?;
767        let schema_id: u64 = 42;
768        let mut writer = WriterBuilder::new(schema.clone())
769            .with_fingerprint_strategy(FingerprintStrategy::Id64(schema_id))
770            .build::<_, AvroSoeFormat>(Vec::new())?;
771        writer.write(&batch)?;
772        let encoded = writer.into_inner();
773        let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id64);
774        let avro_schema = AvroSchema::try_from(&schema)?;
775        let _ = store.set(Fingerprint::Id64(schema_id), avro_schema)?;
776        let mut decoder = ReaderBuilder::new()
777            .with_writer_schema_store(store)
778            .build_decoder()?;
779        let _ = decoder.decode(&encoded)?;
780        let decoded = decoder
781            .flush()?
782            .expect("expected at least one batch from decoder");
783        assert_eq!(decoded.num_columns(), 1);
784        assert_eq!(decoded.num_rows(), 3);
785        let col = decoded
786            .column(0)
787            .as_any()
788            .downcast_ref::<Int32Array>()
789            .expect("int column");
790        assert_eq!(col, &Int32Array::from(vec![1, 2, 3]));
791        Ok(())
792    }
793
794    #[test]
795    fn test_ocf_writer_generates_header_and_sync() -> Result<(), ArrowError> {
796        let batch = make_batch();
797        let buffer: Vec<u8> = Vec::new();
798        let mut writer = AvroWriter::new(buffer, make_schema())?;
799        writer.write(&batch)?;
800        writer.finish()?;
801        let out = writer.into_inner();
802        assert_eq!(&out[..4], b"Obj\x01", "OCF magic bytes missing/incorrect");
803        let trailer = &out[out.len() - 16..];
804        assert_eq!(trailer.len(), 16, "expected 16‑byte sync marker");
805        Ok(())
806    }
807
808    #[test]
809    fn test_schema_mismatch_yields_error() {
810        let batch = make_batch();
811        let alt_schema = Schema::new(vec![Field::new("x", DataType::Int32, false)]);
812        let buffer = Vec::<u8>::new();
813        let mut writer = AvroWriter::new(buffer, alt_schema).unwrap();
814        let err = writer.write(&batch).unwrap_err();
815        assert!(matches!(err, ArrowError::SchemaError(_)));
816    }
817
818    #[test]
819    fn test_write_batches_accumulates_multiple() -> Result<(), ArrowError> {
820        let batch1 = make_batch();
821        let batch2 = make_batch();
822        let buffer = Vec::<u8>::new();
823        let mut writer = AvroWriter::new(buffer, make_schema())?;
824        writer.write_batches(&[&batch1, &batch2])?;
825        writer.finish()?;
826        let out = writer.into_inner();
827        assert!(out.len() > 4, "combined batches produced tiny file");
828        Ok(())
829    }
830
831    #[test]
832    fn test_finish_without_write_adds_header() -> Result<(), ArrowError> {
833        let buffer = Vec::<u8>::new();
834        let mut writer = AvroWriter::new(buffer, make_schema())?;
835        writer.finish()?;
836        let out = writer.into_inner();
837        assert_eq!(&out[..4], b"Obj\x01", "finish() should emit OCF header");
838        Ok(())
839    }
840
841    #[test]
842    fn test_write_long_encodes_zigzag_varint() -> Result<(), ArrowError> {
843        let mut buf = Vec::new();
844        write_long(&mut buf, 0)?;
845        write_long(&mut buf, -1)?;
846        write_long(&mut buf, 1)?;
847        write_long(&mut buf, -2)?;
848        write_long(&mut buf, 2147483647)?;
849        assert!(
850            buf.starts_with(&[0x00, 0x01, 0x02, 0x03]),
851            "zig‑zag varint encodings incorrect: {buf:?}"
852        );
853        Ok(())
854    }
855
856    #[test]
857    fn test_roundtrip_alltypes_roundtrip_writer() -> Result<(), ArrowError> {
858        for rel in files() {
859            let path = arrow_test_data(rel);
860            let rdr_file = File::open(&path).expect("open input avro");
861            let reader = ReaderBuilder::new()
862                .build(BufReader::new(rdr_file))
863                .expect("build reader");
864            let schema = reader.schema();
865            let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
866            let original =
867                arrow::compute::concat_batches(&schema, &input_batches).expect("concat input");
868            let tmp = NamedTempFile::new().expect("create temp file");
869            let out_path = tmp.into_temp_path();
870            let out_file = File::create(&out_path).expect("create temp avro");
871            let codec = if rel.contains(".snappy.") {
872                Some(CompressionCodec::Snappy)
873            } else if rel.contains(".zstandard.") {
874                Some(CompressionCodec::ZStandard)
875            } else if rel.contains(".bzip2.") {
876                Some(CompressionCodec::Bzip2)
877            } else if rel.contains(".xz.") {
878                Some(CompressionCodec::Xz)
879            } else {
880                None
881            };
882            let mut writer = WriterBuilder::new(original.schema().as_ref().clone())
883                .with_compression(codec)
884                .build::<_, AvroOcfFormat>(out_file)?;
885            writer.write(&original)?;
886            writer.finish()?;
887            drop(writer);
888            let rt_file = File::open(&out_path).expect("open roundtrip avro");
889            let rt_reader = ReaderBuilder::new()
890                .build(BufReader::new(rt_file))
891                .expect("build roundtrip reader");
892            let rt_schema = rt_reader.schema();
893            let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
894            let roundtrip =
895                arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip");
896            assert_eq!(
897                roundtrip, original,
898                "Round-trip batch mismatch for file: {}",
899                rel
900            );
901        }
902        Ok(())
903    }
904
905    #[test]
906    fn test_roundtrip_nested_records_writer() -> Result<(), ArrowError> {
907        let path = arrow_test_data("avro/nested_records.avro");
908        let rdr_file = File::open(&path).expect("open nested_records.avro");
909        let reader = ReaderBuilder::new()
910            .build(BufReader::new(rdr_file))
911            .expect("build reader for nested_records.avro");
912        let schema = reader.schema();
913        let batches = reader.collect::<Result<Vec<_>, _>>()?;
914        let original = arrow::compute::concat_batches(&schema, &batches).expect("concat original");
915        let tmp = NamedTempFile::new().expect("create temp file");
916        let out_path = tmp.into_temp_path();
917        {
918            let out_file = File::create(&out_path).expect("create output avro");
919            let mut writer = AvroWriter::new(out_file, original.schema().as_ref().clone())?;
920            writer.write(&original)?;
921            writer.finish()?;
922        }
923        let rt_file = File::open(&out_path).expect("open round_trip avro");
924        let rt_reader = ReaderBuilder::new()
925            .build(BufReader::new(rt_file))
926            .expect("build round_trip reader");
927        let rt_schema = rt_reader.schema();
928        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
929        let round_trip =
930            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
931        assert_eq!(
932            round_trip, original,
933            "Round-trip batch mismatch for nested_records.avro"
934        );
935        Ok(())
936    }
937
938    #[test]
939    #[cfg(feature = "snappy")]
940    fn test_roundtrip_nested_lists_writer() -> Result<(), ArrowError> {
941        let path = arrow_test_data("avro/nested_lists.snappy.avro");
942        let rdr_file = File::open(&path).expect("open nested_lists.snappy.avro");
943        let reader = ReaderBuilder::new()
944            .build(BufReader::new(rdr_file))
945            .expect("build reader for nested_lists.snappy.avro");
946        let schema = reader.schema();
947        let batches = reader.collect::<Result<Vec<_>, _>>()?;
948        let original = arrow::compute::concat_batches(&schema, &batches).expect("concat original");
949        let tmp = NamedTempFile::new().expect("create temp file");
950        let out_path = tmp.into_temp_path();
951        {
952            let out_file = File::create(&out_path).expect("create output avro");
953            let mut writer = WriterBuilder::new(original.schema().as_ref().clone())
954                .with_compression(Some(CompressionCodec::Snappy))
955                .build::<_, AvroOcfFormat>(out_file)?;
956            writer.write(&original)?;
957            writer.finish()?;
958        }
959        let rt_file = File::open(&out_path).expect("open round_trip avro");
960        let rt_reader = ReaderBuilder::new()
961            .build(BufReader::new(rt_file))
962            .expect("build round_trip reader");
963        let rt_schema = rt_reader.schema();
964        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
965        let round_trip =
966            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
967        assert_eq!(
968            round_trip, original,
969            "Round-trip batch mismatch for nested_lists.snappy.avro"
970        );
971        Ok(())
972    }
973
974    #[test]
975    fn test_round_trip_simple_fixed_ocf() -> Result<(), ArrowError> {
976        let path = arrow_test_data("avro/simple_fixed.avro");
977        let rdr_file = File::open(&path).expect("open avro/simple_fixed.avro");
978        let reader = ReaderBuilder::new()
979            .build(BufReader::new(rdr_file))
980            .expect("build avro reader");
981        let schema = reader.schema();
982        let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
983        let original =
984            arrow::compute::concat_batches(&schema, &input_batches).expect("concat input");
985        let tmp = NamedTempFile::new().expect("create temp file");
986        let out_file = File::create(tmp.path()).expect("create temp avro");
987        let mut writer = AvroWriter::new(out_file, original.schema().as_ref().clone())?;
988        writer.write(&original)?;
989        writer.finish()?;
990        drop(writer);
991        let rt_file = File::open(tmp.path()).expect("open round_trip avro");
992        let rt_reader = ReaderBuilder::new()
993            .build(BufReader::new(rt_file))
994            .expect("build round_trip reader");
995        let rt_schema = rt_reader.schema();
996        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
997        let round_trip =
998            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
999        assert_eq!(round_trip, original);
1000        Ok(())
1001    }
1002
1003    // Strict equality (schema + values) only when canonical extension types are enabled
1004    #[test]
1005    #[cfg(feature = "canonical_extension_types")]
1006    fn test_round_trip_duration_and_uuid_ocf() -> Result<(), ArrowError> {
1007        use arrow_schema::{DataType, IntervalUnit};
1008        let in_file =
1009            File::open("test/data/duration_uuid.avro").expect("open test/data/duration_uuid.avro");
1010        let reader = ReaderBuilder::new()
1011            .build(BufReader::new(in_file))
1012            .expect("build reader for duration_uuid.avro");
1013        let in_schema = reader.schema();
1014        let has_mdn = in_schema.fields().iter().any(|f| {
1015            matches!(
1016                f.data_type(),
1017                DataType::Interval(IntervalUnit::MonthDayNano)
1018            )
1019        });
1020        assert!(
1021            has_mdn,
1022            "expected at least one Interval(MonthDayNano) field in duration_uuid.avro"
1023        );
1024        let has_uuid_fixed = in_schema
1025            .fields()
1026            .iter()
1027            .any(|f| matches!(f.data_type(), DataType::FixedSizeBinary(16)));
1028        assert!(
1029            has_uuid_fixed,
1030            "expected at least one FixedSizeBinary(16) (uuid) field in duration_uuid.avro"
1031        );
1032        let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
1033        let input =
1034            arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
1035        // Write to an in‑memory OCF and read back
1036        let mut writer = AvroWriter::new(Vec::<u8>::new(), in_schema.as_ref().clone())?;
1037        writer.write(&input)?;
1038        writer.finish()?;
1039        let bytes = writer.into_inner();
1040        let rt_reader = ReaderBuilder::new()
1041            .build(Cursor::new(bytes))
1042            .expect("build round_trip reader");
1043        let rt_schema = rt_reader.schema();
1044        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1045        let round_trip =
1046            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
1047        assert_eq!(round_trip, input);
1048        Ok(())
1049    }
1050
1051    // Feature OFF: only values are asserted equal; schema may legitimately differ (uuid as fixed(16))
1052    #[test]
1053    #[cfg(not(feature = "canonical_extension_types"))]
1054    fn test_duration_and_uuid_ocf_without_extensions_round_trips_values() -> Result<(), ArrowError>
1055    {
1056        use arrow::datatypes::{DataType, IntervalUnit};
1057        use std::io::BufReader;
1058
1059        // Read input Avro (duration + uuid)
1060        let in_file =
1061            File::open("test/data/duration_uuid.avro").expect("open test/data/duration_uuid.avro");
1062        let reader = ReaderBuilder::new()
1063            .build(BufReader::new(in_file))
1064            .expect("build reader for duration_uuid.avro");
1065        let in_schema = reader.schema();
1066
1067        // Sanity checks: has MonthDayNano and a FixedSizeBinary(16)
1068        assert!(
1069            in_schema.fields().iter().any(|f| {
1070                matches!(
1071                    f.data_type(),
1072                    DataType::Interval(IntervalUnit::MonthDayNano)
1073                )
1074            }),
1075            "expected at least one Interval(MonthDayNano) field"
1076        );
1077        assert!(
1078            in_schema
1079                .fields()
1080                .iter()
1081                .any(|f| matches!(f.data_type(), DataType::FixedSizeBinary(16))),
1082            "expected a FixedSizeBinary(16) field (uuid)"
1083        );
1084
1085        let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
1086        let input =
1087            arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
1088
1089        // Write to a temp OCF and read back
1090        let mut writer = AvroWriter::new(Vec::<u8>::new(), in_schema.as_ref().clone())?;
1091        writer.write(&input)?;
1092        writer.finish()?;
1093        let bytes = writer.into_inner();
1094        let rt_reader = ReaderBuilder::new()
1095            .build(Cursor::new(bytes))
1096            .expect("build round_trip reader");
1097        let rt_schema = rt_reader.schema();
1098        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1099        let round_trip =
1100            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
1101
1102        // 1) Values must round-trip for both columns
1103        assert_eq!(
1104            round_trip.column(0),
1105            input.column(0),
1106            "duration column values differ"
1107        );
1108        assert_eq!(round_trip.column(1), input.column(1), "uuid bytes differ");
1109
1110        // 2) Schema expectation without extensions:
1111        //    uuid is written as named fixed(16), so reader attaches avro.name
1112        let uuid_rt = rt_schema.field_with_name("uuid_field")?;
1113        assert_eq!(uuid_rt.data_type(), &DataType::FixedSizeBinary(16));
1114        assert_eq!(
1115            uuid_rt.metadata().get("logicalType").map(|s| s.as_str()),
1116            Some("uuid"),
1117            "expected `logicalType = \"uuid\"` on round-tripped field metadata"
1118        );
1119
1120        // 3) Duration remains Interval(MonthDayNano)
1121        let dur_rt = rt_schema.field_with_name("duration_field")?;
1122        assert!(matches!(
1123            dur_rt.data_type(),
1124            DataType::Interval(IntervalUnit::MonthDayNano)
1125        ));
1126
1127        Ok(())
1128    }
1129
1130    // This test reads the same 'nonnullable.impala.avro' used by the reader tests,
1131    // writes it back out with the writer (hitting Map encoding paths), then reads it
1132    // again and asserts exact Arrow equivalence.
1133    #[test]
1134    // TODO: avoid requiring snappy for this file
1135    #[cfg(feature = "snappy")]
1136    fn test_nonnullable_impala_roundtrip_writer() -> Result<(), ArrowError> {
1137        // Load source Avro with Map fields
1138        let path = arrow_test_data("avro/nonnullable.impala.avro");
1139        let rdr_file = File::open(&path).expect("open avro/nonnullable.impala.avro");
1140        let reader = ReaderBuilder::new()
1141            .build(BufReader::new(rdr_file))
1142            .expect("build reader for nonnullable.impala.avro");
1143        // Collect all input batches and concatenate to a single RecordBatch
1144        let in_schema = reader.schema();
1145        // Sanity: ensure the file actually contains at least one Map field
1146        let has_map = in_schema
1147            .fields()
1148            .iter()
1149            .any(|f| matches!(f.data_type(), DataType::Map(_, _)));
1150        assert!(
1151            has_map,
1152            "expected at least one Map field in avro/nonnullable.impala.avro"
1153        );
1154
1155        let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
1156        let original =
1157            arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
1158        // Write out using the OCF writer into an in-memory Vec<u8>
1159        let buffer = Vec::<u8>::new();
1160        let mut writer = AvroWriter::new(buffer, in_schema.as_ref().clone())?;
1161        writer.write(&original)?;
1162        writer.finish()?;
1163        let out_bytes = writer.into_inner();
1164        // Read the produced bytes back with the Reader
1165        let rt_reader = ReaderBuilder::new()
1166            .build(Cursor::new(out_bytes))
1167            .expect("build reader for round-tripped in-memory OCF");
1168        let rt_schema = rt_reader.schema();
1169        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1170        let roundtrip =
1171            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip");
1172        // Exact value fidelity (schema + data)
1173        assert_eq!(
1174            roundtrip, original,
1175            "Round-trip Avro map data mismatch for nonnullable.impala.avro"
1176        );
1177        Ok(())
1178    }
1179
1180    #[test]
1181    // TODO: avoid requiring snappy for these files
1182    #[cfg(feature = "snappy")]
1183    fn test_roundtrip_decimals_via_writer() -> Result<(), ArrowError> {
1184        // (file, resolve via ARROW_TEST_DATA?)
1185        let files: [(&str, bool); 8] = [
1186            ("avro/fixed_length_decimal.avro", true), // fixed-backed -> Decimal128(25,2)
1187            ("avro/fixed_length_decimal_legacy.avro", true), // legacy fixed[8] -> Decimal64(13,2)
1188            ("avro/int32_decimal.avro", true),        // bytes-backed -> Decimal32(4,2)
1189            ("avro/int64_decimal.avro", true),        // bytes-backed -> Decimal64(10,2)
1190            ("test/data/int256_decimal.avro", false), // bytes-backed -> Decimal256(76,2)
1191            ("test/data/fixed256_decimal.avro", false), // fixed[32]-backed -> Decimal256(76,10)
1192            ("test/data/fixed_length_decimal_legacy_32.avro", false), // legacy fixed[4] -> Decimal32(9,2)
1193            ("test/data/int128_decimal.avro", false), // bytes-backed -> Decimal128(38,2)
1194        ];
1195        for (rel, in_test_data_dir) in files {
1196            // Resolve path the same way as reader::test_decimal
1197            let path: String = if in_test_data_dir {
1198                arrow_test_data(rel)
1199            } else {
1200                PathBuf::from(env!("CARGO_MANIFEST_DIR"))
1201                    .join(rel)
1202                    .to_string_lossy()
1203                    .into_owned()
1204            };
1205            // Read original file into a single RecordBatch for comparison
1206            let f_in = File::open(&path).expect("open input avro");
1207            let rdr = ReaderBuilder::new().build(BufReader::new(f_in))?;
1208            let in_schema = rdr.schema();
1209            let in_batches = rdr.collect::<Result<Vec<_>, _>>()?;
1210            let original =
1211                arrow::compute::concat_batches(&in_schema, &in_batches).expect("concat input");
1212            // Write it out with the OCF writer (no special compression)
1213            let tmp = NamedTempFile::new().expect("create temp file");
1214            let out_path = tmp.into_temp_path();
1215            let out_file = File::create(&out_path).expect("create temp avro");
1216            let mut writer = AvroWriter::new(out_file, original.schema().as_ref().clone())?;
1217            writer.write(&original)?;
1218            writer.finish()?;
1219            // Read back the file we just wrote and compare equality (schema + data)
1220            let f_rt = File::open(&out_path).expect("open roundtrip avro");
1221            let rt_rdr = ReaderBuilder::new().build(BufReader::new(f_rt))?;
1222            let rt_schema = rt_rdr.schema();
1223            let rt_batches = rt_rdr.collect::<Result<Vec<_>, _>>()?;
1224            let roundtrip =
1225                arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat rt");
1226            assert_eq!(roundtrip, original, "decimal round-trip mismatch for {rel}");
1227        }
1228        Ok(())
1229    }
1230
1231    #[test]
1232    fn test_named_types_complex_roundtrip() -> Result<(), ArrowError> {
1233        // 1. Read the new, more complex named references file.
1234        let path =
1235            PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test/data/named_types_complex.avro");
1236        let rdr_file = File::open(&path).expect("open avro/named_types_complex.avro");
1237
1238        let reader = ReaderBuilder::new()
1239            .build(BufReader::new(rdr_file))
1240            .expect("build reader for named_types_complex.avro");
1241
1242        // 2. Concatenate all batches to one RecordBatch.
1243        let in_schema = reader.schema();
1244        let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
1245        let original =
1246            arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
1247
1248        // 3. Sanity Checks: Validate that all named types were reused correctly.
1249        {
1250            let arrow_schema = original.schema();
1251
1252            // --- A. Validate 'User' record reuse ---
1253            let author_field = arrow_schema.field_with_name("author")?;
1254            let author_type = author_field.data_type();
1255            let editors_field = arrow_schema.field_with_name("editors")?;
1256            let editors_item_type = match editors_field.data_type() {
1257                DataType::List(item_field) => item_field.data_type(),
1258                other => panic!("Editors field should be a List, but was {:?}", other),
1259            };
1260            assert_eq!(
1261                author_type, editors_item_type,
1262                "The DataType for the 'author' struct and the 'editors' list items must be identical"
1263            );
1264
1265            // --- B. Validate 'PostStatus' enum reuse ---
1266            let status_field = arrow_schema.field_with_name("status")?;
1267            let status_type = status_field.data_type();
1268            assert!(
1269                matches!(status_type, DataType::Dictionary(_, _)),
1270                "Status field should be a Dictionary (Enum)"
1271            );
1272
1273            let prev_status_field = arrow_schema.field_with_name("previous_status")?;
1274            let prev_status_type = prev_status_field.data_type();
1275            assert_eq!(
1276                status_type, prev_status_type,
1277                "The DataType for 'status' and 'previous_status' enums must be identical"
1278            );
1279
1280            // --- C. Validate 'MD5' fixed reuse ---
1281            let content_hash_field = arrow_schema.field_with_name("content_hash")?;
1282            let content_hash_type = content_hash_field.data_type();
1283            assert!(
1284                matches!(content_hash_type, DataType::FixedSizeBinary(16)),
1285                "Content hash should be FixedSizeBinary(16)"
1286            );
1287
1288            let thumb_hash_field = arrow_schema.field_with_name("thumbnail_hash")?;
1289            let thumb_hash_type = thumb_hash_field.data_type();
1290            assert_eq!(
1291                content_hash_type, thumb_hash_type,
1292                "The DataType for 'content_hash' and 'thumbnail_hash' fixed types must be identical"
1293            );
1294        }
1295
1296        // 4. Write the data to an in-memory buffer.
1297        let buffer: Vec<u8> = Vec::new();
1298        let mut writer = AvroWriter::new(buffer, original.schema().as_ref().clone())?;
1299        writer.write(&original)?;
1300        writer.finish()?;
1301        let bytes = writer.into_inner();
1302
1303        // 5. Read the data back and compare for exact equality.
1304        let rt_reader = ReaderBuilder::new()
1305            .build(Cursor::new(bytes))
1306            .expect("build reader for round-trip");
1307        let rt_schema = rt_reader.schema();
1308        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1309        let roundtrip =
1310            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip");
1311
1312        assert_eq!(
1313            roundtrip, original,
1314            "Avro complex named types round-trip mismatch"
1315        );
1316
1317        Ok(())
1318    }
1319
1320    // Union Roundtrip Test Helpers
1321
1322    // Asserts that the `actual` schema is a semantically equivalent superset of the `expected` one.
1323    // This allows the `actual` schema to contain additional metadata keys
1324    // (`arrowUnionMode`, `arrowUnionTypeIds`, `avro.name`) that are added during an Arrow-to-Avro-to-Arrow
1325    // roundtrip, while ensuring no other information was lost or changed.
1326    fn assert_schema_is_semantically_equivalent(expected: &Schema, actual: &Schema) {
1327        // Compare top-level schema metadata using the same superset logic.
1328        assert_metadata_is_superset(expected.metadata(), actual.metadata(), "Schema");
1329
1330        // Compare fields.
1331        assert_eq!(
1332            expected.fields().len(),
1333            actual.fields().len(),
1334            "Schema must have the same number of fields"
1335        );
1336
1337        for (expected_field, actual_field) in expected.fields().iter().zip(actual.fields().iter()) {
1338            assert_field_is_semantically_equivalent(expected_field, actual_field);
1339        }
1340    }
1341
1342    fn assert_field_is_semantically_equivalent(expected: &Field, actual: &Field) {
1343        let context = format!("Field '{}'", expected.name());
1344
1345        assert_eq!(
1346            expected.name(),
1347            actual.name(),
1348            "{context}: names must match"
1349        );
1350        assert_eq!(
1351            expected.is_nullable(),
1352            actual.is_nullable(),
1353            "{context}: nullability must match"
1354        );
1355
1356        // Recursively check the data types.
1357        assert_datatype_is_semantically_equivalent(
1358            expected.data_type(),
1359            actual.data_type(),
1360            &context,
1361        );
1362
1363        // Check that metadata is a valid superset.
1364        assert_metadata_is_superset(expected.metadata(), actual.metadata(), &context);
1365    }
1366
1367    fn assert_datatype_is_semantically_equivalent(
1368        expected: &DataType,
1369        actual: &DataType,
1370        context: &str,
1371    ) {
1372        match (expected, actual) {
1373            (DataType::List(expected_field), DataType::List(actual_field))
1374            | (DataType::LargeList(expected_field), DataType::LargeList(actual_field))
1375            | (DataType::Map(expected_field, _), DataType::Map(actual_field, _)) => {
1376                assert_field_is_semantically_equivalent(expected_field, actual_field);
1377            }
1378            (DataType::Struct(expected_fields), DataType::Struct(actual_fields)) => {
1379                assert_eq!(
1380                    expected_fields.len(),
1381                    actual_fields.len(),
1382                    "{context}: struct must have same number of fields"
1383                );
1384                for (ef, af) in expected_fields.iter().zip(actual_fields.iter()) {
1385                    assert_field_is_semantically_equivalent(ef, af);
1386                }
1387            }
1388            (
1389                DataType::Union(expected_fields, expected_mode),
1390                DataType::Union(actual_fields, actual_mode),
1391            ) => {
1392                assert_eq!(
1393                    expected_mode, actual_mode,
1394                    "{context}: union mode must match"
1395                );
1396                assert_eq!(
1397                    expected_fields.len(),
1398                    actual_fields.len(),
1399                    "{context}: union must have same number of variants"
1400                );
1401                for ((exp_id, exp_field), (act_id, act_field)) in
1402                    expected_fields.iter().zip(actual_fields.iter())
1403                {
1404                    assert_eq!(exp_id, act_id, "{context}: union type ids must match");
1405                    assert_field_is_semantically_equivalent(exp_field, act_field);
1406                }
1407            }
1408            _ => {
1409                assert_eq!(expected, actual, "{context}: data types must be identical");
1410            }
1411        }
1412    }
1413
1414    fn assert_batch_data_is_identical(expected: &RecordBatch, actual: &RecordBatch) {
1415        assert_eq!(
1416            expected.num_columns(),
1417            actual.num_columns(),
1418            "RecordBatches must have the same number of columns"
1419        );
1420        assert_eq!(
1421            expected.num_rows(),
1422            actual.num_rows(),
1423            "RecordBatches must have the same number of rows"
1424        );
1425
1426        for i in 0..expected.num_columns() {
1427            let context = format!("Column {i}");
1428            let expected_col = expected.column(i);
1429            let actual_col = actual.column(i);
1430            assert_array_data_is_identical(expected_col, actual_col, &context);
1431        }
1432    }
1433
1434    /// Recursively asserts that the data content of two Arrays is identical.
1435    fn assert_array_data_is_identical(expected: &dyn Array, actual: &dyn Array, context: &str) {
1436        assert_eq!(
1437            expected.nulls(),
1438            actual.nulls(),
1439            "{context}: null buffers must match"
1440        );
1441        assert_eq!(
1442            expected.len(),
1443            actual.len(),
1444            "{context}: array lengths must match"
1445        );
1446
1447        match (expected.data_type(), actual.data_type()) {
1448            (DataType::Union(expected_fields, _), DataType::Union(..)) => {
1449                let expected_union = expected.as_any().downcast_ref::<UnionArray>().unwrap();
1450                let actual_union = actual.as_any().downcast_ref::<UnionArray>().unwrap();
1451
1452                // Compare the type_ids buffer (always the first buffer).
1453                assert_eq!(
1454                    &expected.to_data().buffers()[0],
1455                    &actual.to_data().buffers()[0],
1456                    "{context}: union type_ids buffer mismatch"
1457                );
1458
1459                // For dense unions, compare the value_offsets buffer (the second buffer).
1460                if expected.to_data().buffers().len() > 1 {
1461                    assert_eq!(
1462                        &expected.to_data().buffers()[1],
1463                        &actual.to_data().buffers()[1],
1464                        "{context}: union value_offsets buffer mismatch"
1465                    );
1466                }
1467
1468                // Recursively compare children based on the fields in the DataType.
1469                for (type_id, _) in expected_fields.iter() {
1470                    let child_context = format!("{context} -> child variant {type_id}");
1471                    assert_array_data_is_identical(
1472                        expected_union.child(type_id),
1473                        actual_union.child(type_id),
1474                        &child_context,
1475                    );
1476                }
1477            }
1478            (DataType::Struct(_), DataType::Struct(_)) => {
1479                let expected_struct = expected.as_any().downcast_ref::<StructArray>().unwrap();
1480                let actual_struct = actual.as_any().downcast_ref::<StructArray>().unwrap();
1481                for i in 0..expected_struct.num_columns() {
1482                    let child_context = format!("{context} -> struct child {i}");
1483                    assert_array_data_is_identical(
1484                        expected_struct.column(i),
1485                        actual_struct.column(i),
1486                        &child_context,
1487                    );
1488                }
1489            }
1490            // Fallback for primitive types and other types where buffer comparison is sufficient.
1491            _ => {
1492                assert_eq!(
1493                    expected.to_data().buffers(),
1494                    actual.to_data().buffers(),
1495                    "{context}: data buffers must match"
1496                );
1497            }
1498        }
1499    }
1500
1501    /// Checks that `actual_meta` contains all of `expected_meta`, and any additional
1502    /// keys in `actual_meta` are from a permitted set.
1503    fn assert_metadata_is_superset(
1504        expected_meta: &HashMap<String, String>,
1505        actual_meta: &HashMap<String, String>,
1506        context: &str,
1507    ) {
1508        let allowed_additions: HashSet<&str> =
1509            vec!["arrowUnionMode", "arrowUnionTypeIds", "avro.name"]
1510                .into_iter()
1511                .collect();
1512        for (key, expected_value) in expected_meta {
1513            match actual_meta.get(key) {
1514                Some(actual_value) => assert_eq!(
1515                    expected_value, actual_value,
1516                    "{context}: preserved metadata for key '{key}' must have the same value"
1517                ),
1518                None => panic!("{context}: metadata key '{key}' was lost during roundtrip"),
1519            }
1520        }
1521        for key in actual_meta.keys() {
1522            if !expected_meta.contains_key(key) && !allowed_additions.contains(key.as_str()) {
1523                panic!("{context}: unexpected metadata key '{key}' was added during roundtrip");
1524            }
1525        }
1526    }
1527
1528    #[test]
1529    fn test_union_roundtrip() -> Result<(), ArrowError> {
1530        let file_path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
1531            .join("test/data/union_fields.avro")
1532            .to_string_lossy()
1533            .into_owned();
1534        let rdr_file = File::open(&file_path).expect("open avro/union_fields.avro");
1535        let reader = ReaderBuilder::new()
1536            .build(BufReader::new(rdr_file))
1537            .expect("build reader for union_fields.avro");
1538        let schema = reader.schema();
1539        let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
1540        let original =
1541            arrow::compute::concat_batches(&schema, &input_batches).expect("concat input");
1542        let mut writer = AvroWriter::new(Vec::<u8>::new(), original.schema().as_ref().clone())?;
1543        writer.write(&original)?;
1544        writer.finish()?;
1545        let bytes = writer.into_inner();
1546        let rt_reader = ReaderBuilder::new()
1547            .build(Cursor::new(bytes))
1548            .expect("build round_trip reader");
1549        let rt_schema = rt_reader.schema();
1550        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1551        let round_trip =
1552            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
1553
1554        // The nature of the crate is such that metadata gets appended during the roundtrip,
1555        // so we can't compare the schemas directly. Instead, we semantically compare the schemas and data.
1556        assert_schema_is_semantically_equivalent(&original.schema(), &round_trip.schema());
1557
1558        assert_batch_data_is_identical(&original, &round_trip);
1559        Ok(())
1560    }
1561
1562    #[test]
1563    fn test_enum_roundtrip_uses_reader_fixture() -> Result<(), ArrowError> {
1564        // Read the known-good enum file (same as reader::test_simple)
1565        let path = arrow_test_data("avro/simple_enum.avro");
1566        let rdr_file = File::open(&path).expect("open avro/simple_enum.avro");
1567        let reader = ReaderBuilder::new()
1568            .build(BufReader::new(rdr_file))
1569            .expect("build reader for simple_enum.avro");
1570        // Concatenate all batches to one RecordBatch for a clean equality check
1571        let in_schema = reader.schema();
1572        let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
1573        let original =
1574            arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
1575        // Sanity: expect at least one Dictionary(Int32, Utf8) column (enum)
1576        let has_enum_dict = in_schema.fields().iter().any(|f| {
1577            matches!(
1578                f.data_type(),
1579                DataType::Dictionary(k, v) if **k == DataType::Int32 && **v == DataType::Utf8
1580            )
1581        });
1582        assert!(
1583            has_enum_dict,
1584            "Expected at least one enum-mapped Dictionary<Int32, Utf8> field"
1585        );
1586        // Write with OCF writer into memory using the reader-provided Arrow schema.
1587        // The writer will embed the Avro JSON from `avro.schema` metadata if present.
1588        let buffer: Vec<u8> = Vec::new();
1589        let mut writer = AvroWriter::new(buffer, in_schema.as_ref().clone())?;
1590        writer.write(&original)?;
1591        writer.finish()?;
1592        let bytes = writer.into_inner();
1593        // Read back and compare for exact equality (schema + data)
1594        let rt_reader = ReaderBuilder::new()
1595            .build(Cursor::new(bytes))
1596            .expect("reader for round-trip");
1597        let rt_schema = rt_reader.schema();
1598        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1599        let roundtrip =
1600            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip");
1601        assert_eq!(roundtrip, original, "Avro enum round-trip mismatch");
1602        Ok(())
1603    }
1604
1605    #[test]
1606    fn test_builder_propagates_capacity_to_writer() -> Result<(), ArrowError> {
1607        let cap = 64 * 1024;
1608        let buffer = Vec::<u8>::new();
1609        let mut writer = WriterBuilder::new(make_schema())
1610            .with_capacity(cap)
1611            .build::<_, AvroOcfFormat>(buffer)?;
1612        assert_eq!(writer.capacity, cap, "builder capacity not propagated");
1613        let batch = make_batch();
1614        writer.write(&batch)?;
1615        writer.finish()?;
1616        let out = writer.into_inner();
1617        assert_eq!(&out[..4], b"Obj\x01", "OCF magic missing/incorrect");
1618        Ok(())
1619    }
1620
1621    #[test]
1622    fn test_stream_writer_stores_capacity_direct_writes() -> Result<(), ArrowError> {
1623        use arrow_array::{ArrayRef, Int32Array};
1624        use arrow_schema::{DataType, Field, Schema};
1625        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1626        let batch = RecordBatch::try_new(
1627            Arc::new(schema.clone()),
1628            vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef],
1629        )?;
1630        let cap = 8192;
1631        let mut writer = WriterBuilder::new(schema)
1632            .with_capacity(cap)
1633            .build::<_, AvroSoeFormat>(Vec::new())?;
1634        assert_eq!(writer.capacity, cap);
1635        writer.write(&batch)?;
1636        let _bytes = writer.into_inner();
1637        Ok(())
1638    }
1639
1640    #[cfg(feature = "avro_custom_types")]
1641    #[test]
1642    fn test_roundtrip_duration_logical_types_ocf() -> Result<(), ArrowError> {
1643        let file_path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
1644            .join("test/data/duration_logical_types.avro")
1645            .to_string_lossy()
1646            .into_owned();
1647
1648        let in_file = File::open(&file_path)
1649            .unwrap_or_else(|_| panic!("Failed to open test file: {}", file_path));
1650
1651        let reader = ReaderBuilder::new()
1652            .build(BufReader::new(in_file))
1653            .expect("build reader for duration_logical_types.avro");
1654        let in_schema = reader.schema();
1655
1656        let expected_units: HashSet<TimeUnit> = [
1657            TimeUnit::Nanosecond,
1658            TimeUnit::Microsecond,
1659            TimeUnit::Millisecond,
1660            TimeUnit::Second,
1661        ]
1662        .into_iter()
1663        .collect();
1664
1665        let found_units: HashSet<TimeUnit> = in_schema
1666            .fields()
1667            .iter()
1668            .filter_map(|f| match f.data_type() {
1669                DataType::Duration(unit) => Some(*unit),
1670                _ => None,
1671            })
1672            .collect();
1673
1674        assert_eq!(
1675            found_units, expected_units,
1676            "Expected to find all four Duration TimeUnits in the schema from the initial read"
1677        );
1678
1679        let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
1680        let input =
1681            arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
1682
1683        let tmp = NamedTempFile::new().expect("create temp file");
1684        {
1685            let out_file = File::create(tmp.path()).expect("create temp avro");
1686            let mut writer = AvroWriter::new(out_file, in_schema.as_ref().clone())?;
1687            writer.write(&input)?;
1688            writer.finish()?;
1689        }
1690
1691        let rt_file = File::open(tmp.path()).expect("open round_trip avro");
1692        let rt_reader = ReaderBuilder::new()
1693            .build(BufReader::new(rt_file))
1694            .expect("build round_trip reader");
1695        let rt_schema = rt_reader.schema();
1696        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1697        let round_trip =
1698            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
1699
1700        assert_eq!(round_trip, input);
1701        Ok(())
1702    }
1703
1704    #[cfg(feature = "avro_custom_types")]
1705    #[test]
1706    fn test_run_end_encoded_roundtrip_writer() -> Result<(), ArrowError> {
1707        let run_ends = Int32Array::from(vec![3, 5, 7, 8]);
1708        let run_values = Int32Array::from(vec![Some(1), Some(2), None, Some(3)]);
1709        let ree = RunArray::<Int32Type>::try_new(&run_ends, &run_values)?;
1710        let field = Field::new("x", ree.data_type().clone(), true);
1711        let schema = Schema::new(vec![field]);
1712        let batch = RecordBatch::try_new(
1713            Arc::new(schema.clone()),
1714            vec![Arc::new(ree.clone()) as ArrayRef],
1715        )?;
1716        let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
1717        writer.write(&batch)?;
1718        writer.finish()?;
1719        let bytes = writer.into_inner();
1720        let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
1721        let out_schema = reader.schema();
1722        let batches = reader.collect::<Result<Vec<_>, _>>()?;
1723        let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output");
1724        assert_eq!(out.num_columns(), 1);
1725        assert_eq!(out.num_rows(), 8);
1726        match out.schema().field(0).data_type() {
1727            DataType::RunEndEncoded(run_ends_field, values_field) => {
1728                assert_eq!(run_ends_field.name(), "run_ends");
1729                assert_eq!(run_ends_field.data_type(), &DataType::Int32);
1730                assert_eq!(values_field.name(), "values");
1731                assert_eq!(values_field.data_type(), &DataType::Int32);
1732                assert!(values_field.is_nullable());
1733                let got_ree = out
1734                    .column(0)
1735                    .as_any()
1736                    .downcast_ref::<RunArray<Int32Type>>()
1737                    .expect("RunArray<Int32Type>");
1738                assert_eq!(got_ree, &ree);
1739            }
1740            other => panic!(
1741                "Unexpected DataType for round-tripped RunEndEncoded column: {:?}",
1742                other
1743            ),
1744        }
1745        Ok(())
1746    }
1747
1748    #[cfg(feature = "avro_custom_types")]
1749    #[test]
1750    fn test_run_end_encoded_string_values_int16_run_ends_roundtrip_writer() -> Result<(), ArrowError>
1751    {
1752        let run_ends = Int16Array::from(vec![2, 5, 7]); // end indices
1753        let run_values = StringArray::from(vec![Some("a"), None, Some("c")]);
1754        let ree = RunArray::<Int16Type>::try_new(&run_ends, &run_values)?;
1755        let field = Field::new("s", ree.data_type().clone(), true);
1756        let schema = Schema::new(vec![field]);
1757        let batch = RecordBatch::try_new(
1758            Arc::new(schema.clone()),
1759            vec![Arc::new(ree.clone()) as ArrayRef],
1760        )?;
1761        let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
1762        writer.write(&batch)?;
1763        writer.finish()?;
1764        let bytes = writer.into_inner();
1765        let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
1766        let out_schema = reader.schema();
1767        let batches = reader.collect::<Result<Vec<_>, _>>()?;
1768        let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output");
1769        assert_eq!(out.num_columns(), 1);
1770        assert_eq!(out.num_rows(), 7);
1771        match out.schema().field(0).data_type() {
1772            DataType::RunEndEncoded(run_ends_field, values_field) => {
1773                assert_eq!(run_ends_field.data_type(), &DataType::Int16);
1774                assert_eq!(values_field.data_type(), &DataType::Utf8);
1775                assert!(
1776                    values_field.is_nullable(),
1777                    "REE 'values' child should be nullable"
1778                );
1779                let got = out
1780                    .column(0)
1781                    .as_any()
1782                    .downcast_ref::<RunArray<Int16Type>>()
1783                    .expect("RunArray<Int16Type>");
1784                assert_eq!(got, &ree);
1785            }
1786            other => panic!("Unexpected DataType: {:?}", other),
1787        }
1788        Ok(())
1789    }
1790
1791    #[cfg(feature = "avro_custom_types")]
1792    #[test]
1793    fn test_run_end_encoded_int64_run_ends_numeric_values_roundtrip_writer()
1794    -> Result<(), ArrowError> {
1795        let run_ends = Int64Array::from(vec![4_i64, 8_i64]);
1796        let run_values = Int32Array::from(vec![Some(999), Some(-5)]);
1797        let ree = RunArray::<Int64Type>::try_new(&run_ends, &run_values)?;
1798        let field = Field::new("y", ree.data_type().clone(), true);
1799        let schema = Schema::new(vec![field]);
1800        let batch = RecordBatch::try_new(
1801            Arc::new(schema.clone()),
1802            vec![Arc::new(ree.clone()) as ArrayRef],
1803        )?;
1804        let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
1805        writer.write(&batch)?;
1806        writer.finish()?;
1807        let bytes = writer.into_inner();
1808        let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
1809        let out_schema = reader.schema();
1810        let batches = reader.collect::<Result<Vec<_>, _>>()?;
1811        let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output");
1812        assert_eq!(out.num_columns(), 1);
1813        assert_eq!(out.num_rows(), 8);
1814        match out.schema().field(0).data_type() {
1815            DataType::RunEndEncoded(run_ends_field, values_field) => {
1816                assert_eq!(run_ends_field.data_type(), &DataType::Int64);
1817                assert_eq!(values_field.data_type(), &DataType::Int32);
1818                assert!(values_field.is_nullable());
1819                let got = out
1820                    .column(0)
1821                    .as_any()
1822                    .downcast_ref::<RunArray<Int64Type>>()
1823                    .expect("RunArray<Int64Type>");
1824                assert_eq!(got, &ree);
1825            }
1826            other => panic!("Unexpected DataType for REE column: {:?}", other),
1827        }
1828        Ok(())
1829    }
1830
1831    #[cfg(feature = "avro_custom_types")]
1832    #[test]
1833    fn test_run_end_encoded_sliced_roundtrip_writer() -> Result<(), ArrowError> {
1834        let run_ends = Int32Array::from(vec![3, 5, 7, 8]);
1835        let run_values = Int32Array::from(vec![Some(1), Some(2), None, Some(3)]);
1836        let base = RunArray::<Int32Type>::try_new(&run_ends, &run_values)?;
1837        let offset = 1usize;
1838        let length = 6usize;
1839        let base_values = base
1840            .values()
1841            .as_any()
1842            .downcast_ref::<Int32Array>()
1843            .expect("REE values as Int32Array");
1844        let mut logical_window: Vec<Option<i32>> = Vec::with_capacity(length);
1845        for i in offset..offset + length {
1846            let phys = base.get_physical_index(i);
1847            let v = if base_values.is_null(phys) {
1848                None
1849            } else {
1850                Some(base_values.value(phys))
1851            };
1852            logical_window.push(v);
1853        }
1854
1855        fn compress_run_ends_i32(vals: &[Option<i32>]) -> (Int32Array, Int32Array) {
1856            if vals.is_empty() {
1857                return (Int32Array::new_null(0), Int32Array::new_null(0));
1858            }
1859            let mut run_ends_out: Vec<i32> = Vec::new();
1860            let mut run_vals_out: Vec<Option<i32>> = Vec::new();
1861            let mut cur = vals[0];
1862            let mut len = 1i32;
1863            for v in &vals[1..] {
1864                if *v == cur {
1865                    len += 1;
1866                } else {
1867                    let last_end = run_ends_out.last().copied().unwrap_or(0);
1868                    run_ends_out.push(last_end + len);
1869                    run_vals_out.push(cur);
1870                    cur = *v;
1871                    len = 1;
1872                }
1873            }
1874            let last_end = run_ends_out.last().copied().unwrap_or(0);
1875            run_ends_out.push(last_end + len);
1876            run_vals_out.push(cur);
1877            (
1878                Int32Array::from(run_ends_out),
1879                Int32Array::from(run_vals_out),
1880            )
1881        }
1882        let (owned_run_ends, owned_run_values) = compress_run_ends_i32(&logical_window);
1883        let owned_slice = RunArray::<Int32Type>::try_new(&owned_run_ends, &owned_run_values)?;
1884        let field = Field::new("x", owned_slice.data_type().clone(), true);
1885        let schema = Schema::new(vec![field]);
1886        let batch = RecordBatch::try_new(
1887            Arc::new(schema.clone()),
1888            vec![Arc::new(owned_slice.clone()) as ArrayRef],
1889        )?;
1890        let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
1891        writer.write(&batch)?;
1892        writer.finish()?;
1893        let bytes = writer.into_inner();
1894        let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
1895        let out_schema = reader.schema();
1896        let batches = reader.collect::<Result<Vec<_>, _>>()?;
1897        let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output");
1898        assert_eq!(out.num_columns(), 1);
1899        assert_eq!(out.num_rows(), length);
1900        match out.schema().field(0).data_type() {
1901            DataType::RunEndEncoded(run_ends_field, values_field) => {
1902                assert_eq!(run_ends_field.data_type(), &DataType::Int32);
1903                assert_eq!(values_field.data_type(), &DataType::Int32);
1904                assert!(values_field.is_nullable());
1905                let got = out
1906                    .column(0)
1907                    .as_any()
1908                    .downcast_ref::<RunArray<Int32Type>>()
1909                    .expect("RunArray<Int32Type>");
1910                fn expand_ree_to_int32(a: &RunArray<Int32Type>) -> Int32Array {
1911                    let vals = a
1912                        .values()
1913                        .as_any()
1914                        .downcast_ref::<Int32Array>()
1915                        .expect("REE values as Int32Array");
1916                    let mut out: Vec<Option<i32>> = Vec::with_capacity(a.len());
1917                    for i in 0..a.len() {
1918                        let phys = a.get_physical_index(i);
1919                        out.push(if vals.is_null(phys) {
1920                            None
1921                        } else {
1922                            Some(vals.value(phys))
1923                        });
1924                    }
1925                    Int32Array::from(out)
1926                }
1927                let got_logical = expand_ree_to_int32(got);
1928                let expected_logical = Int32Array::from(logical_window);
1929                assert_eq!(
1930                    got_logical, expected_logical,
1931                    "Logical values differ after REE slice round-trip"
1932                );
1933            }
1934            other => panic!("Unexpected DataType for REE column: {:?}", other),
1935        }
1936        Ok(())
1937    }
1938
1939    #[cfg(not(feature = "avro_custom_types"))]
1940    #[test]
1941    fn test_run_end_encoded_roundtrip_writer_feature_off() -> Result<(), ArrowError> {
1942        use arrow_schema::{DataType, Field, Schema};
1943        let run_ends = arrow_array::Int32Array::from(vec![3, 5, 7, 8]);
1944        let run_values = arrow_array::Int32Array::from(vec![Some(1), Some(2), None, Some(3)]);
1945        let ree = arrow_array::RunArray::<arrow_array::types::Int32Type>::try_new(
1946            &run_ends,
1947            &run_values,
1948        )?;
1949        let field = Field::new("x", ree.data_type().clone(), true);
1950        let schema = Schema::new(vec![field]);
1951        let batch =
1952            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(ree) as ArrayRef])?;
1953        let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
1954        writer.write(&batch)?;
1955        writer.finish()?;
1956        let bytes = writer.into_inner();
1957        let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
1958        let out_schema = reader.schema();
1959        let batches = reader.collect::<Result<Vec<_>, _>>()?;
1960        let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output");
1961        assert_eq!(out.num_columns(), 1);
1962        assert_eq!(out.num_rows(), 8);
1963        assert_eq!(out.schema().field(0).data_type(), &DataType::Int32);
1964        let got = out
1965            .column(0)
1966            .as_any()
1967            .downcast_ref::<Int32Array>()
1968            .expect("Int32Array");
1969        let expected = Int32Array::from(vec![
1970            Some(1),
1971            Some(1),
1972            Some(1),
1973            Some(2),
1974            Some(2),
1975            None,
1976            None,
1977            Some(3),
1978        ]);
1979        assert_eq!(got, &expected);
1980        Ok(())
1981    }
1982
1983    #[cfg(not(feature = "avro_custom_types"))]
1984    #[test]
1985    fn test_run_end_encoded_string_values_int16_run_ends_roundtrip_writer_feature_off()
1986    -> Result<(), ArrowError> {
1987        use arrow_schema::{DataType, Field, Schema};
1988        let run_ends = arrow_array::Int16Array::from(vec![2, 5, 7]);
1989        let run_values = arrow_array::StringArray::from(vec![Some("a"), None, Some("c")]);
1990        let ree = arrow_array::RunArray::<arrow_array::types::Int16Type>::try_new(
1991            &run_ends,
1992            &run_values,
1993        )?;
1994        let field = Field::new("s", ree.data_type().clone(), true);
1995        let schema = Schema::new(vec![field]);
1996        let batch =
1997            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(ree) as ArrayRef])?;
1998        let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
1999        writer.write(&batch)?;
2000        writer.finish()?;
2001        let bytes = writer.into_inner();
2002        let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
2003        let out_schema = reader.schema();
2004        let batches = reader.collect::<Result<Vec<_>, _>>()?;
2005        let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output");
2006        assert_eq!(out.num_columns(), 1);
2007        assert_eq!(out.num_rows(), 7);
2008        assert_eq!(out.schema().field(0).data_type(), &DataType::Utf8);
2009        let got = out
2010            .column(0)
2011            .as_any()
2012            .downcast_ref::<arrow_array::StringArray>()
2013            .expect("StringArray");
2014        let expected = arrow_array::StringArray::from(vec![
2015            Some("a"),
2016            Some("a"),
2017            None,
2018            None,
2019            None,
2020            Some("c"),
2021            Some("c"),
2022        ]);
2023        assert_eq!(got, &expected);
2024        Ok(())
2025    }
2026
2027    #[cfg(not(feature = "avro_custom_types"))]
2028    #[test]
2029    fn test_run_end_encoded_int64_run_ends_numeric_values_roundtrip_writer_feature_off()
2030    -> Result<(), ArrowError> {
2031        use arrow_schema::{DataType, Field, Schema};
2032        let run_ends = arrow_array::Int64Array::from(vec![4_i64, 8_i64]);
2033        let run_values = Int32Array::from(vec![Some(999), Some(-5)]);
2034        let ree = arrow_array::RunArray::<arrow_array::types::Int64Type>::try_new(
2035            &run_ends,
2036            &run_values,
2037        )?;
2038        let field = Field::new("y", ree.data_type().clone(), true);
2039        let schema = Schema::new(vec![field]);
2040        let batch =
2041            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(ree) as ArrayRef])?;
2042        let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
2043        writer.write(&batch)?;
2044        writer.finish()?;
2045        let bytes = writer.into_inner();
2046        let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
2047        let out_schema = reader.schema();
2048        let batches = reader.collect::<Result<Vec<_>, _>>()?;
2049        let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output");
2050        assert_eq!(out.num_columns(), 1);
2051        assert_eq!(out.num_rows(), 8);
2052        assert_eq!(out.schema().field(0).data_type(), &DataType::Int32);
2053        let got = out
2054            .column(0)
2055            .as_any()
2056            .downcast_ref::<Int32Array>()
2057            .expect("Int32Array");
2058        let expected = Int32Array::from(vec![
2059            Some(999),
2060            Some(999),
2061            Some(999),
2062            Some(999),
2063            Some(-5),
2064            Some(-5),
2065            Some(-5),
2066            Some(-5),
2067        ]);
2068        assert_eq!(got, &expected);
2069        Ok(())
2070    }
2071
2072    #[cfg(not(feature = "avro_custom_types"))]
2073    #[test]
2074    fn test_run_end_encoded_sliced_roundtrip_writer_feature_off() -> Result<(), ArrowError> {
2075        use arrow_schema::{DataType, Field, Schema};
2076        let run_ends = Int32Array::from(vec![2, 4, 6]);
2077        let run_values = Int32Array::from(vec![Some(1), Some(2), None]);
2078        let ree = arrow_array::RunArray::<arrow_array::types::Int32Type>::try_new(
2079            &run_ends,
2080            &run_values,
2081        )?;
2082        let field = Field::new("x", ree.data_type().clone(), true);
2083        let schema = Schema::new(vec![field]);
2084        let batch =
2085            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(ree) as ArrayRef])?;
2086        let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
2087        writer.write(&batch)?;
2088        writer.finish()?;
2089        let bytes = writer.into_inner();
2090        let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
2091        let out_schema = reader.schema();
2092        let batches = reader.collect::<Result<Vec<_>, _>>()?;
2093        let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output");
2094        assert_eq!(out.num_columns(), 1);
2095        assert_eq!(out.num_rows(), 6);
2096        assert_eq!(out.schema().field(0).data_type(), &DataType::Int32);
2097        let got = out
2098            .column(0)
2099            .as_any()
2100            .downcast_ref::<Int32Array>()
2101            .expect("Int32Array");
2102        let expected = Int32Array::from(vec![Some(1), Some(1), Some(2), Some(2), None, None]);
2103        assert_eq!(got, &expected);
2104        Ok(())
2105    }
2106
2107    #[test]
2108    // TODO: avoid requiring snappy for this file
2109    #[cfg(feature = "snappy")]
2110    fn test_nullable_impala_roundtrip() -> Result<(), ArrowError> {
2111        let path = arrow_test_data("avro/nullable.impala.avro");
2112        let rdr_file = File::open(&path).expect("open avro/nullable.impala.avro");
2113        let reader = ReaderBuilder::new()
2114            .build(BufReader::new(rdr_file))
2115            .expect("build reader for nullable.impala.avro");
2116        let in_schema = reader.schema();
2117        assert!(
2118            in_schema.fields().iter().any(|f| f.is_nullable()),
2119            "expected at least one nullable field in avro/nullable.impala.avro"
2120        );
2121        let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
2122        let original =
2123            arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
2124        let buffer: Vec<u8> = Vec::new();
2125        let mut writer = AvroWriter::new(buffer, in_schema.as_ref().clone())?;
2126        writer.write(&original)?;
2127        writer.finish()?;
2128        let out_bytes = writer.into_inner();
2129        let rt_reader = ReaderBuilder::new()
2130            .build(Cursor::new(out_bytes))
2131            .expect("build reader for round-tripped in-memory OCF");
2132        let rt_schema = rt_reader.schema();
2133        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2134        let roundtrip =
2135            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip");
2136        assert_eq!(
2137            roundtrip, original,
2138            "Round-trip Avro data mismatch for nullable.impala.avro"
2139        );
2140        Ok(())
2141    }
2142
2143    #[test]
2144    #[cfg(feature = "snappy")]
2145    fn test_datapage_v2_roundtrip() -> Result<(), ArrowError> {
2146        let path = arrow_test_data("avro/datapage_v2.snappy.avro");
2147        let rdr_file = File::open(&path).expect("open avro/datapage_v2.snappy.avro");
2148        let reader = ReaderBuilder::new()
2149            .build(BufReader::new(rdr_file))
2150            .expect("build reader for datapage_v2.snappy.avro");
2151        let in_schema = reader.schema();
2152        let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
2153        let original =
2154            arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
2155        let mut writer = AvroWriter::new(Vec::<u8>::new(), in_schema.as_ref().clone())?;
2156        writer.write(&original)?;
2157        writer.finish()?;
2158        let bytes = writer.into_inner();
2159        let rt_reader = ReaderBuilder::new()
2160            .build(Cursor::new(bytes))
2161            .expect("build round-trip reader");
2162        let rt_schema = rt_reader.schema();
2163        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2164        let round_trip =
2165            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
2166        assert_eq!(
2167            round_trip, original,
2168            "Round-trip batch mismatch for datapage_v2.snappy.avro"
2169        );
2170        Ok(())
2171    }
2172
2173    #[test]
2174    #[cfg(feature = "snappy")]
2175    fn test_single_nan_roundtrip() -> Result<(), ArrowError> {
2176        let path = arrow_test_data("avro/single_nan.avro");
2177        let in_file = File::open(&path).expect("open avro/single_nan.avro");
2178        let reader = ReaderBuilder::new()
2179            .build(BufReader::new(in_file))
2180            .expect("build reader for single_nan.avro");
2181        let in_schema = reader.schema();
2182        let in_batches = reader.collect::<Result<Vec<_>, _>>()?;
2183        let original =
2184            arrow::compute::concat_batches(&in_schema, &in_batches).expect("concat input");
2185        let mut writer = AvroWriter::new(Vec::<u8>::new(), original.schema().as_ref().clone())?;
2186        writer.write(&original)?;
2187        writer.finish()?;
2188        let bytes = writer.into_inner();
2189        let rt_reader = ReaderBuilder::new()
2190            .build(Cursor::new(bytes))
2191            .expect("build round_trip reader");
2192        let rt_schema = rt_reader.schema();
2193        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2194        let round_trip =
2195            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
2196        assert_eq!(
2197            round_trip, original,
2198            "Round-trip batch mismatch for avro/single_nan.avro"
2199        );
2200        Ok(())
2201    }
2202    #[test]
2203    // TODO: avoid requiring snappy for this file
2204    #[cfg(feature = "snappy")]
2205    fn test_dict_pages_offset_zero_roundtrip() -> Result<(), ArrowError> {
2206        let path = arrow_test_data("avro/dict-page-offset-zero.avro");
2207        let rdr_file = File::open(&path).expect("open avro/dict-page-offset-zero.avro");
2208        let reader = ReaderBuilder::new()
2209            .build(BufReader::new(rdr_file))
2210            .expect("build reader for dict-page-offset-zero.avro");
2211        let in_schema = reader.schema();
2212        let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
2213        let original =
2214            arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
2215        let buffer: Vec<u8> = Vec::new();
2216        let mut writer = AvroWriter::new(buffer, original.schema().as_ref().clone())?;
2217        writer.write(&original)?;
2218        writer.finish()?;
2219        let bytes = writer.into_inner();
2220        let rt_reader = ReaderBuilder::new()
2221            .build(Cursor::new(bytes))
2222            .expect("build reader for round-trip");
2223        let rt_schema = rt_reader.schema();
2224        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2225        let roundtrip =
2226            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip");
2227        assert_eq!(
2228            roundtrip, original,
2229            "Round-trip batch mismatch for avro/dict-page-offset-zero.avro"
2230        );
2231        Ok(())
2232    }
2233
2234    #[test]
2235    #[cfg(feature = "snappy")]
2236    fn test_repeated_no_annotation_roundtrip() -> Result<(), ArrowError> {
2237        let path = arrow_test_data("avro/repeated_no_annotation.avro");
2238        let in_file = File::open(&path).expect("open avro/repeated_no_annotation.avro");
2239        let reader = ReaderBuilder::new()
2240            .build(BufReader::new(in_file))
2241            .expect("build reader for repeated_no_annotation.avro");
2242        let in_schema = reader.schema();
2243        let in_batches = reader.collect::<Result<Vec<_>, _>>()?;
2244        let original =
2245            arrow::compute::concat_batches(&in_schema, &in_batches).expect("concat input");
2246        let mut writer = AvroWriter::new(Vec::<u8>::new(), original.schema().as_ref().clone())?;
2247        writer.write(&original)?;
2248        writer.finish()?;
2249        let bytes = writer.into_inner();
2250        let rt_reader = ReaderBuilder::new()
2251            .build(Cursor::new(bytes))
2252            .expect("build reader for round-trip buffer");
2253        let rt_schema = rt_reader.schema();
2254        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2255        let round_trip =
2256            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round-trip");
2257        assert_eq!(
2258            round_trip, original,
2259            "Round-trip batch mismatch for avro/repeated_no_annotation.avro"
2260        );
2261        Ok(())
2262    }
2263
2264    #[test]
2265    fn test_nested_record_type_reuse_roundtrip() -> Result<(), ArrowError> {
2266        let path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
2267            .join("test/data/nested_record_reuse.avro")
2268            .to_string_lossy()
2269            .into_owned();
2270        let in_file = File::open(&path).expect("open avro/nested_record_reuse.avro");
2271        let reader = ReaderBuilder::new()
2272            .build(BufReader::new(in_file))
2273            .expect("build reader for nested_record_reuse.avro");
2274        let in_schema = reader.schema();
2275        let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
2276        let input =
2277            arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
2278        let mut writer = AvroWriter::new(Vec::<u8>::new(), in_schema.as_ref().clone())?;
2279        writer.write(&input)?;
2280        writer.finish()?;
2281        let bytes = writer.into_inner();
2282        let rt_reader = ReaderBuilder::new()
2283            .build(Cursor::new(bytes))
2284            .expect("build round_trip reader");
2285        let rt_schema = rt_reader.schema();
2286        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2287        let round_trip =
2288            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
2289        assert_eq!(
2290            round_trip, input,
2291            "Round-trip batch mismatch for nested_record_reuse.avro"
2292        );
2293        Ok(())
2294    }
2295
2296    #[test]
2297    fn test_enum_type_reuse_roundtrip() -> Result<(), ArrowError> {
2298        let path =
2299            std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test/data/enum_reuse.avro");
2300        let rdr_file = std::fs::File::open(&path).expect("open test/data/enum_reuse.avro");
2301        let reader = ReaderBuilder::new()
2302            .build(std::io::BufReader::new(rdr_file))
2303            .expect("build reader for enum_reuse.avro");
2304        let in_schema = reader.schema();
2305        let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
2306        let original =
2307            arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
2308        let mut writer = AvroWriter::new(Vec::<u8>::new(), original.schema().as_ref().clone())?;
2309        writer.write(&original)?;
2310        writer.finish()?;
2311        let bytes = writer.into_inner();
2312        let rt_reader = ReaderBuilder::new()
2313            .build(std::io::Cursor::new(bytes))
2314            .expect("build round_trip reader");
2315        let rt_schema = rt_reader.schema();
2316        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2317        let round_trip =
2318            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
2319        assert_eq!(
2320            round_trip, original,
2321            "Avro enum type reuse round-trip mismatch"
2322        );
2323        Ok(())
2324    }
2325
2326    #[test]
2327    fn comprehensive_e2e_test_roundtrip() -> Result<(), ArrowError> {
2328        let path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
2329            .join("test/data/comprehensive_e2e.avro");
2330        let rdr_file = File::open(&path).expect("open test/data/comprehensive_e2e.avro");
2331        let reader = ReaderBuilder::new()
2332            .build(BufReader::new(rdr_file))
2333            .expect("build reader for comprehensive_e2e.avro");
2334        let in_schema = reader.schema();
2335        let in_batches = reader.collect::<Result<Vec<_>, _>>()?;
2336        let original =
2337            arrow::compute::concat_batches(&in_schema, &in_batches).expect("concat input");
2338        let sink: Vec<u8> = Vec::new();
2339        let mut writer = AvroWriter::new(sink, original.schema().as_ref().clone())?;
2340        writer.write(&original)?;
2341        writer.finish()?;
2342        let bytes = writer.into_inner();
2343        let rt_reader = ReaderBuilder::new()
2344            .build(Cursor::new(bytes))
2345            .expect("build round-trip reader");
2346        let rt_schema = rt_reader.schema();
2347        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2348        let roundtrip =
2349            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip");
2350        assert_eq!(
2351            roundtrip, original,
2352            "Round-trip batch mismatch for comprehensive_e2e.avro"
2353        );
2354        Ok(())
2355    }
2356
2357    #[test]
2358    fn test_roundtrip_new_time_encoders_writer() -> Result<(), ArrowError> {
2359        let schema = Schema::new(vec![
2360            Field::new("d32", DataType::Date32, false),
2361            Field::new("t32_ms", DataType::Time32(TimeUnit::Millisecond), false),
2362            Field::new("t64_us", DataType::Time64(TimeUnit::Microsecond), false),
2363            Field::new(
2364                "ts_ms",
2365                DataType::Timestamp(TimeUnit::Millisecond, None),
2366                false,
2367            ),
2368            Field::new(
2369                "ts_us",
2370                DataType::Timestamp(TimeUnit::Microsecond, None),
2371                false,
2372            ),
2373            Field::new(
2374                "ts_ns",
2375                DataType::Timestamp(TimeUnit::Nanosecond, None),
2376                false,
2377            ),
2378        ]);
2379        let d32 = Date32Array::from(vec![0, 1, -1]);
2380        let t32_ms: PrimitiveArray<Time32MillisecondType> =
2381            vec![0_i32, 12_345_i32, 86_399_999_i32].into();
2382        let t64_us: PrimitiveArray<Time64MicrosecondType> =
2383            vec![0_i64, 1_234_567_i64, 86_399_999_999_i64].into();
2384        let ts_ms: PrimitiveArray<TimestampMillisecondType> =
2385            vec![0_i64, -1_i64, 1_700_000_000_000_i64].into();
2386        let ts_us: PrimitiveArray<TimestampMicrosecondType> = vec![0_i64, 1_i64, -1_i64].into();
2387        let ts_ns: PrimitiveArray<TimestampNanosecondType> = vec![0_i64, 1_i64, -1_i64].into();
2388        let batch = RecordBatch::try_new(
2389            Arc::new(schema.clone()),
2390            vec![
2391                Arc::new(d32) as ArrayRef,
2392                Arc::new(t32_ms) as ArrayRef,
2393                Arc::new(t64_us) as ArrayRef,
2394                Arc::new(ts_ms) as ArrayRef,
2395                Arc::new(ts_us) as ArrayRef,
2396                Arc::new(ts_ns) as ArrayRef,
2397            ],
2398        )?;
2399        let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
2400        writer.write(&batch)?;
2401        writer.finish()?;
2402        let bytes = writer.into_inner();
2403        let rt_reader = ReaderBuilder::new()
2404            .build(std::io::Cursor::new(bytes))
2405            .expect("build reader for round-trip of new time encoders");
2406        let rt_schema = rt_reader.schema();
2407        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2408        let roundtrip =
2409            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip");
2410        assert_eq!(roundtrip, batch);
2411        Ok(())
2412    }
2413}