arrow_avro/writer/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Avro writer implementation for the `arrow-avro` crate.
19//!
20//! # Overview
21//!
22//! Use this module to serialize Arrow `RecordBatch` values into Avro. Two output
23//! formats are supported:
24//!
25//! * **[`AvroWriter`](crate::writer::AvroWriter)** — writes an **Object Container File (OCF)**: a self‑describing
26//!   file with header (schema JSON + metadata), optional compression, data blocks, and
27//!   sync markers. See Avro 1.11.1 “Object Container Files.”
28//!   <https://avro.apache.org/docs/1.11.1/specification/#object-container-files>
29//! * **[`AvroStreamWriter`](crate::writer::AvroStreamWriter)** — writes a **Single Object Encoding (SOE) Stream** (“datum” bytes) without
30//!   any container framing. This is useful when the schema is known out‑of‑band (i.e.,
31//!   via a registry) and you want minimal overhead.
32//!
33//! ## Which format should you use?
34//!
35//! * Use **OCF** when you need a portable, self‑contained file. The schema travels with
36//!   the data, making it easy to read elsewhere.
37//! * Use the **SOE stream** when your surrounding protocol supplies schema information
38//!   (i.e., a schema registry). The writer automatically adds the per‑record prefix:
39//!   - **SOE**: Each record is prefixed with the 2-byte header (`0xC3 0x01`) followed by
40//!     an 8‑byte little‑endian CRC‑64‑AVRO fingerprint, then the Avro body.
41//!     See Avro 1.11.1 "Single object encoding".
42//!     <https://avro.apache.org/docs/1.11.1/specification/#single-object-encoding>
43//!   - **Confluent wire format**: Each record is prefixed with magic byte `0x00` followed by
44//!     a **big‑endian** 4‑byte schema ID, then the Avro body. Use `FingerprintStrategy::Id(schema_id)`.
45//!     <https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format>
46//!   - **Apicurio wire format**: Each record is prefixed with magic byte `0x00` followed by
47//!     a **big‑endian** 8‑byte schema ID, then the Avro body. Use `FingerprintStrategy::Id64(schema_id)`.
48//!     <https://www.apicur.io/registry/docs/apicurio-registry/1.3.3.Final/getting-started/assembly-using-kafka-client-serdes.html#registry-serdes-types-avro-registry>
49//!
50//! ## Choosing the Avro schema
51//!
52//! By default, the writer converts your Arrow schema to Avro (including a top‑level record
53//! name). If you already have an Avro schema JSON you want to use verbatim, put it into the
54//! Arrow schema metadata under the `avro.schema` key before constructing the writer. The
55//! builder will use that schema instead of generating a new one (unless `strip_metadata` is
56//! set to true in the options).
57//!
58//! ## Compression
59//!
60//! For OCF, you may enable a compression codec via `WriterBuilder::with_compression`. The
61//! chosen codec is written into the file header and used for subsequent blocks. SOE stream
62//! writing doesn’t apply container‑level compression.
63//!
64//! ---
65use crate::codec::AvroFieldBuilder;
66use crate::compression::CompressionCodec;
67use crate::schema::{
68    AvroSchema, Fingerprint, FingerprintAlgorithm, FingerprintStrategy, SCHEMA_METADATA_KEY,
69};
70use crate::writer::encoder::{RecordEncoder, RecordEncoderBuilder, write_long};
71use crate::writer::format::{AvroFormat, AvroOcfFormat, AvroSoeFormat};
72use arrow_array::RecordBatch;
73use arrow_schema::{ArrowError, Schema};
74use std::io::Write;
75use std::sync::Arc;
76
77/// Encodes `RecordBatch` into the Avro binary format.
78mod encoder;
79/// Logic for different Avro container file formats.
80pub mod format;
81
82/// Builder to configure and create a `Writer`.
83#[derive(Debug, Clone)]
84pub struct WriterBuilder {
85    schema: Schema,
86    codec: Option<CompressionCodec>,
87    capacity: usize,
88    fingerprint_strategy: Option<FingerprintStrategy>,
89}
90
91impl WriterBuilder {
92    /// Create a new builder with default settings.
93    ///
94    /// The Avro schema used for writing is determined as follows:
95    /// 1) If the Arrow schema metadata contains `avro::schema` (see `SCHEMA_METADATA_KEY`),
96    ///    that JSON is used verbatim.
97    /// 2) Otherwise, the Arrow schema is converted to an Avro record schema.
98    pub fn new(schema: Schema) -> Self {
99        Self {
100            schema,
101            codec: None,
102            capacity: 1024,
103            fingerprint_strategy: None,
104        }
105    }
106
107    /// Set the fingerprinting strategy for the stream writer.
108    /// This determines the per-record prefix format.
109    pub fn with_fingerprint_strategy(mut self, strategy: FingerprintStrategy) -> Self {
110        self.fingerprint_strategy = Some(strategy);
111        self
112    }
113
114    /// Change the compression codec.
115    pub fn with_compression(mut self, codec: Option<CompressionCodec>) -> Self {
116        self.codec = codec;
117        self
118    }
119
120    /// Sets the capacity for the given object and returns the modified instance.
121    pub fn with_capacity(mut self, capacity: usize) -> Self {
122        self.capacity = capacity;
123        self
124    }
125
126    /// Create a new `Writer` with specified `AvroFormat` and builder options.
127    /// Performs one‑time startup (header/stream init, encoder plan).
128    pub fn build<W, F>(self, mut writer: W) -> Result<Writer<W, F>, ArrowError>
129    where
130        W: Write,
131        F: AvroFormat,
132    {
133        let mut format = F::default();
134        let avro_schema = match self.schema.metadata.get(SCHEMA_METADATA_KEY) {
135            Some(json) => AvroSchema::new(json.clone()),
136            None => AvroSchema::try_from(&self.schema)?,
137        };
138        let maybe_fingerprint = if F::NEEDS_PREFIX {
139            match self.fingerprint_strategy {
140                Some(FingerprintStrategy::Id(id)) => Some(Fingerprint::Id(id)),
141                Some(FingerprintStrategy::Id64(id)) => Some(Fingerprint::Id64(id)),
142                Some(strategy) => {
143                    Some(avro_schema.fingerprint(FingerprintAlgorithm::from(strategy))?)
144                }
145                None => Some(
146                    avro_schema
147                        .fingerprint(FingerprintAlgorithm::from(FingerprintStrategy::Rabin))?,
148                ),
149            }
150        } else {
151            None
152        };
153        let mut md = self.schema.metadata().clone();
154        md.insert(
155            SCHEMA_METADATA_KEY.to_string(),
156            avro_schema.clone().json_string,
157        );
158        let schema = Arc::new(Schema::new_with_metadata(self.schema.fields().clone(), md));
159        format.start_stream(&mut writer, &schema, self.codec)?;
160        let avro_root = AvroFieldBuilder::new(&avro_schema.schema()?).build()?;
161        let encoder = RecordEncoderBuilder::new(&avro_root, schema.as_ref())
162            .with_fingerprint(maybe_fingerprint)
163            .build()?;
164        Ok(Writer {
165            writer,
166            schema,
167            format,
168            compression: self.codec,
169            capacity: self.capacity,
170            encoder,
171        })
172    }
173}
174
175/// Generic Avro writer.
176///
177/// This type is generic over the output Write sink (`W`) and the Avro format (`F`).
178/// You’ll usually use the concrete aliases:
179///
180/// * **[`AvroWriter`]** for **OCF** (self‑describing container file)
181/// * **[`AvroStreamWriter`]** for **SOE** Avro streams
182#[derive(Debug)]
183pub struct Writer<W: Write, F: AvroFormat> {
184    writer: W,
185    schema: Arc<Schema>,
186    format: F,
187    compression: Option<CompressionCodec>,
188    capacity: usize,
189    encoder: RecordEncoder,
190}
191
192/// Alias for an Avro **Object Container File** writer.
193///
194/// ### Quickstart (runnable)
195///
196/// ```
197/// use std::io::Cursor;
198/// use std::sync::Arc;
199/// use arrow_array::{ArrayRef, Int64Array, StringArray, RecordBatch};
200/// use arrow_schema::{DataType, Field, Schema};
201/// use arrow_avro::writer::AvroWriter;
202/// use arrow_avro::reader::ReaderBuilder;
203///
204/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
205/// // Writer schema: { id: long, name: string }
206/// let writer_schema = Schema::new(vec![
207///     Field::new("id", DataType::Int64, false),
208///     Field::new("name", DataType::Utf8, false),
209/// ]);
210///
211/// // Build a RecordBatch with two rows
212/// let batch = RecordBatch::try_new(
213///     Arc::new(writer_schema.clone()),
214///     vec![
215///         Arc::new(Int64Array::from(vec![1, 2])) as ArrayRef,
216///         Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
217///     ],
218/// )?;
219///
220/// // Write an Avro **Object Container File** (OCF) to memory
221/// let mut w = AvroWriter::new(Vec::<u8>::new(), writer_schema.clone())?;
222/// w.write(&batch)?;
223/// w.finish()?;
224/// let bytes = w.into_inner();
225///
226/// // Build a Reader and decode the batch back
227/// let mut r = ReaderBuilder::new().build(Cursor::new(bytes))?;
228/// let out = r.next().unwrap()?;
229/// assert_eq!(out.num_rows(), 2);
230/// # Ok(()) }
231/// ```
232pub type AvroWriter<W> = Writer<W, AvroOcfFormat>;
233
234/// Alias for an Avro **Single Object Encoding** stream writer.
235///
236/// ### Example
237///
238/// This writer automatically adds the appropriate per-record prefix (based on the
239/// fingerprint strategy) before the Avro body of each record. The default is Single
240/// Object Encoding (SOE) with a Rabin fingerprint.
241///
242/// ```
243/// use std::sync::Arc;
244/// use arrow_array::{ArrayRef, Int64Array, RecordBatch};
245/// use arrow_schema::{DataType, Field, Schema};
246/// use arrow_avro::writer::AvroStreamWriter;
247///
248/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
249/// // One‑column Arrow batch
250/// let schema = Schema::new(vec![Field::new("x", DataType::Int64, false)]);
251/// let batch = RecordBatch::try_new(
252///     Arc::new(schema.clone()),
253///     vec![Arc::new(Int64Array::from(vec![10, 20])) as ArrayRef],
254/// )?;
255///
256/// // Write an Avro Single Object Encoding stream to a Vec<u8>
257/// let sink: Vec<u8> = Vec::new();
258/// let mut w = AvroStreamWriter::new(sink, schema)?;
259/// w.write(&batch)?;
260/// w.finish()?;
261/// let bytes = w.into_inner();
262/// assert!(!bytes.is_empty());
263/// # Ok(()) }
264/// ```
265pub type AvroStreamWriter<W> = Writer<W, AvroSoeFormat>;
266
267impl<W: Write> Writer<W, AvroOcfFormat> {
268    /// Convenience constructor – same as [`WriterBuilder::build`] with `AvroOcfFormat`.
269    ///
270    /// ### Example
271    ///
272    /// ```
273    /// use std::sync::Arc;
274    /// use arrow_array::{ArrayRef, Int32Array, RecordBatch};
275    /// use arrow_schema::{DataType, Field, Schema};
276    /// use arrow_avro::writer::AvroWriter;
277    ///
278    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
279    /// let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
280    /// let batch = RecordBatch::try_new(
281    ///     Arc::new(schema.clone()),
282    ///     vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef],
283    /// )?;
284    ///
285    /// let buf: Vec<u8> = Vec::new();
286    /// let mut w = AvroWriter::new(buf, schema)?;
287    /// w.write(&batch)?;
288    /// w.finish()?;
289    /// let bytes = w.into_inner();
290    /// assert!(!bytes.is_empty());
291    /// # Ok(()) }
292    /// ```
293    pub fn new(writer: W, schema: Schema) -> Result<Self, ArrowError> {
294        WriterBuilder::new(schema).build::<W, AvroOcfFormat>(writer)
295    }
296
297    /// Return a reference to the 16‑byte sync marker generated for this file.
298    pub fn sync_marker(&self) -> Option<&[u8; 16]> {
299        self.format.sync_marker()
300    }
301}
302
303impl<W: Write> Writer<W, AvroSoeFormat> {
304    /// Convenience constructor to create a new [`AvroStreamWriter`].
305    ///
306    /// The resulting stream contains **Single Object Encodings** (no OCF header/sync).
307    ///
308    /// ### Example
309    ///
310    /// ```
311    /// use std::sync::Arc;
312    /// use arrow_array::{ArrayRef, Int64Array, RecordBatch};
313    /// use arrow_schema::{DataType, Field, Schema};
314    /// use arrow_avro::writer::AvroStreamWriter;
315    ///
316    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
317    /// let schema = Schema::new(vec![Field::new("x", DataType::Int64, false)]);
318    /// let batch = RecordBatch::try_new(
319    ///     Arc::new(schema.clone()),
320    ///     vec![Arc::new(Int64Array::from(vec![10, 20])) as ArrayRef],
321    /// )?;
322    ///
323    /// let sink: Vec<u8> = Vec::new();
324    /// let mut w = AvroStreamWriter::new(sink, schema)?;
325    /// w.write(&batch)?;
326    /// w.finish()?;
327    /// let bytes = w.into_inner();
328    /// assert!(!bytes.is_empty());
329    /// # Ok(()) }
330    /// ```
331    pub fn new(writer: W, schema: Schema) -> Result<Self, ArrowError> {
332        WriterBuilder::new(schema).build::<W, AvroSoeFormat>(writer)
333    }
334}
335
336impl<W: Write, F: AvroFormat> Writer<W, F> {
337    /// Serialize one [`RecordBatch`] to the output.
338    pub fn write(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
339        if batch.schema().fields() != self.schema.fields() {
340            return Err(ArrowError::SchemaError(
341                "Schema of RecordBatch differs from Writer schema".to_string(),
342            ));
343        }
344        match self.format.sync_marker() {
345            Some(&sync) => self.write_ocf_block(batch, &sync),
346            None => self.write_stream(batch),
347        }
348    }
349
350    /// A convenience method to write a slice of [`RecordBatch`].
351    ///
352    /// This is equivalent to calling `write` for each batch in the slice.
353    pub fn write_batches(&mut self, batches: &[&RecordBatch]) -> Result<(), ArrowError> {
354        for b in batches {
355            self.write(b)?;
356        }
357        Ok(())
358    }
359
360    /// Flush remaining buffered data and (for OCF) ensure the header is present.
361    pub fn finish(&mut self) -> Result<(), ArrowError> {
362        self.writer
363            .flush()
364            .map_err(|e| ArrowError::IoError(format!("Error flushing writer: {e}"), e))
365    }
366
367    /// Consume the writer, returning the underlying output object.
368    pub fn into_inner(self) -> W {
369        self.writer
370    }
371
372    fn write_ocf_block(&mut self, batch: &RecordBatch, sync: &[u8; 16]) -> Result<(), ArrowError> {
373        let mut buf = Vec::<u8>::with_capacity(self.capacity);
374        self.encoder.encode(&mut buf, batch)?;
375        let encoded = match self.compression {
376            Some(codec) => codec.compress(&buf)?,
377            None => buf,
378        };
379        write_long(&mut self.writer, batch.num_rows() as i64)?;
380        write_long(&mut self.writer, encoded.len() as i64)?;
381        self.writer
382            .write_all(&encoded)
383            .map_err(|e| ArrowError::IoError(format!("Error writing Avro block: {e}"), e))?;
384        self.writer
385            .write_all(sync)
386            .map_err(|e| ArrowError::IoError(format!("Error writing Avro sync: {e}"), e))?;
387        Ok(())
388    }
389
390    fn write_stream(&mut self, batch: &RecordBatch) -> Result<(), ArrowError> {
391        self.encoder.encode(&mut self.writer, batch)?;
392        Ok(())
393    }
394}
395
396#[cfg(test)]
397mod tests {
398    use super::*;
399    use crate::compression::CompressionCodec;
400    use crate::reader::ReaderBuilder;
401    use crate::schema::{AvroSchema, SchemaStore};
402    use crate::test_util::arrow_test_data;
403    use arrow::datatypes::TimeUnit;
404    #[cfg(feature = "avro_custom_types")]
405    use arrow_array::types::{Int16Type, Int32Type, Int64Type};
406    use arrow_array::types::{
407        Time32MillisecondType, Time64MicrosecondType, TimestampMicrosecondType,
408        TimestampMillisecondType, TimestampNanosecondType,
409    };
410    use arrow_array::{
411        Array, ArrayRef, BinaryArray, Date32Array, Int32Array, PrimitiveArray, RecordBatch,
412        StructArray, UnionArray,
413    };
414    #[cfg(feature = "avro_custom_types")]
415    use arrow_array::{Int16Array, Int64Array, RunArray, StringArray};
416    #[cfg(not(feature = "avro_custom_types"))]
417    use arrow_schema::{DataType, Field, Schema};
418    #[cfg(feature = "avro_custom_types")]
419    use arrow_schema::{DataType, Field, Schema};
420    use std::collections::HashMap;
421    use std::collections::HashSet;
422    use std::fs::File;
423    use std::io::{BufReader, Cursor};
424    use std::path::PathBuf;
425    use std::sync::Arc;
426    use tempfile::NamedTempFile;
427
428    fn files() -> impl Iterator<Item = &'static str> {
429        [
430            // TODO: avoid requiring snappy for this file
431            #[cfg(feature = "snappy")]
432            "avro/alltypes_plain.avro",
433            #[cfg(feature = "snappy")]
434            "avro/alltypes_plain.snappy.avro",
435            #[cfg(feature = "zstd")]
436            "avro/alltypes_plain.zstandard.avro",
437            #[cfg(feature = "bzip2")]
438            "avro/alltypes_plain.bzip2.avro",
439            #[cfg(feature = "xz")]
440            "avro/alltypes_plain.xz.avro",
441        ]
442        .into_iter()
443    }
444
445    fn make_schema() -> Schema {
446        Schema::new(vec![
447            Field::new("id", DataType::Int32, false),
448            Field::new("name", DataType::Binary, false),
449        ])
450    }
451
452    fn make_batch() -> RecordBatch {
453        let ids = Int32Array::from(vec![1, 2, 3]);
454        let names = BinaryArray::from_vec(vec![b"a".as_ref(), b"b".as_ref(), b"c".as_ref()]);
455        RecordBatch::try_new(
456            Arc::new(make_schema()),
457            vec![Arc::new(ids) as ArrayRef, Arc::new(names) as ArrayRef],
458        )
459        .expect("failed to build test RecordBatch")
460    }
461
462    #[test]
463    fn test_stream_writer_writes_prefix_per_row_rt() -> Result<(), ArrowError> {
464        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
465        let batch = RecordBatch::try_new(
466            Arc::new(schema.clone()),
467            vec![Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef],
468        )?;
469        let buf: Vec<u8> = Vec::new();
470        let mut writer = AvroStreamWriter::new(buf, schema.clone())?;
471        writer.write(&batch)?;
472        let encoded = writer.into_inner();
473        let mut store = SchemaStore::new(); // Rabin by default
474        let avro_schema = AvroSchema::try_from(&schema)?;
475        let _fp = store.register(avro_schema)?;
476        let mut decoder = ReaderBuilder::new()
477            .with_writer_schema_store(store)
478            .build_decoder()?;
479        let _consumed = decoder.decode(&encoded)?;
480        let decoded = decoder
481            .flush()?
482            .expect("expected at least one batch from decoder");
483        assert_eq!(decoded.num_columns(), 1);
484        assert_eq!(decoded.num_rows(), 2);
485        let col = decoded
486            .column(0)
487            .as_any()
488            .downcast_ref::<Int32Array>()
489            .expect("int column");
490        assert_eq!(col, &Int32Array::from(vec![10, 20]));
491        Ok(())
492    }
493
494    #[test]
495    fn test_stream_writer_with_id_fingerprint_rt() -> Result<(), ArrowError> {
496        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
497        let batch = RecordBatch::try_new(
498            Arc::new(schema.clone()),
499            vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef],
500        )?;
501        let schema_id: u32 = 42;
502        let mut writer = WriterBuilder::new(schema.clone())
503            .with_fingerprint_strategy(FingerprintStrategy::Id(schema_id))
504            .build::<_, AvroSoeFormat>(Vec::new())?;
505        writer.write(&batch)?;
506        let encoded = writer.into_inner();
507        let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id);
508        let avro_schema = AvroSchema::try_from(&schema)?;
509        let _ = store.set(Fingerprint::Id(schema_id), avro_schema)?;
510        let mut decoder = ReaderBuilder::new()
511            .with_writer_schema_store(store)
512            .build_decoder()?;
513        let _ = decoder.decode(&encoded)?;
514        let decoded = decoder
515            .flush()?
516            .expect("expected at least one batch from decoder");
517        assert_eq!(decoded.num_columns(), 1);
518        assert_eq!(decoded.num_rows(), 3);
519        let col = decoded
520            .column(0)
521            .as_any()
522            .downcast_ref::<Int32Array>()
523            .expect("int column");
524        assert_eq!(col, &Int32Array::from(vec![1, 2, 3]));
525        Ok(())
526    }
527
528    #[test]
529    fn test_stream_writer_with_id64_fingerprint_rt() -> Result<(), ArrowError> {
530        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
531        let batch = RecordBatch::try_new(
532            Arc::new(schema.clone()),
533            vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef],
534        )?;
535        let schema_id: u64 = 42;
536        let mut writer = WriterBuilder::new(schema.clone())
537            .with_fingerprint_strategy(FingerprintStrategy::Id64(schema_id))
538            .build::<_, AvroSoeFormat>(Vec::new())?;
539        writer.write(&batch)?;
540        let encoded = writer.into_inner();
541        let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id64);
542        let avro_schema = AvroSchema::try_from(&schema)?;
543        let _ = store.set(Fingerprint::Id64(schema_id), avro_schema)?;
544        let mut decoder = ReaderBuilder::new()
545            .with_writer_schema_store(store)
546            .build_decoder()?;
547        let _ = decoder.decode(&encoded)?;
548        let decoded = decoder
549            .flush()?
550            .expect("expected at least one batch from decoder");
551        assert_eq!(decoded.num_columns(), 1);
552        assert_eq!(decoded.num_rows(), 3);
553        let col = decoded
554            .column(0)
555            .as_any()
556            .downcast_ref::<Int32Array>()
557            .expect("int column");
558        assert_eq!(col, &Int32Array::from(vec![1, 2, 3]));
559        Ok(())
560    }
561
562    #[test]
563    fn test_ocf_writer_generates_header_and_sync() -> Result<(), ArrowError> {
564        let batch = make_batch();
565        let buffer: Vec<u8> = Vec::new();
566        let mut writer = AvroWriter::new(buffer, make_schema())?;
567        writer.write(&batch)?;
568        writer.finish()?;
569        let out = writer.into_inner();
570        assert_eq!(&out[..4], b"Obj\x01", "OCF magic bytes missing/incorrect");
571        let trailer = &out[out.len() - 16..];
572        assert_eq!(trailer.len(), 16, "expected 16‑byte sync marker");
573        Ok(())
574    }
575
576    #[test]
577    fn test_schema_mismatch_yields_error() {
578        let batch = make_batch();
579        let alt_schema = Schema::new(vec![Field::new("x", DataType::Int32, false)]);
580        let buffer = Vec::<u8>::new();
581        let mut writer = AvroWriter::new(buffer, alt_schema).unwrap();
582        let err = writer.write(&batch).unwrap_err();
583        assert!(matches!(err, ArrowError::SchemaError(_)));
584    }
585
586    #[test]
587    fn test_write_batches_accumulates_multiple() -> Result<(), ArrowError> {
588        let batch1 = make_batch();
589        let batch2 = make_batch();
590        let buffer = Vec::<u8>::new();
591        let mut writer = AvroWriter::new(buffer, make_schema())?;
592        writer.write_batches(&[&batch1, &batch2])?;
593        writer.finish()?;
594        let out = writer.into_inner();
595        assert!(out.len() > 4, "combined batches produced tiny file");
596        Ok(())
597    }
598
599    #[test]
600    fn test_finish_without_write_adds_header() -> Result<(), ArrowError> {
601        let buffer = Vec::<u8>::new();
602        let mut writer = AvroWriter::new(buffer, make_schema())?;
603        writer.finish()?;
604        let out = writer.into_inner();
605        assert_eq!(&out[..4], b"Obj\x01", "finish() should emit OCF header");
606        Ok(())
607    }
608
609    #[test]
610    fn test_write_long_encodes_zigzag_varint() -> Result<(), ArrowError> {
611        let mut buf = Vec::new();
612        write_long(&mut buf, 0)?;
613        write_long(&mut buf, -1)?;
614        write_long(&mut buf, 1)?;
615        write_long(&mut buf, -2)?;
616        write_long(&mut buf, 2147483647)?;
617        assert!(
618            buf.starts_with(&[0x00, 0x01, 0x02, 0x03]),
619            "zig‑zag varint encodings incorrect: {buf:?}"
620        );
621        Ok(())
622    }
623
624    #[test]
625    fn test_roundtrip_alltypes_roundtrip_writer() -> Result<(), ArrowError> {
626        for rel in files() {
627            let path = arrow_test_data(rel);
628            let rdr_file = File::open(&path).expect("open input avro");
629            let reader = ReaderBuilder::new()
630                .build(BufReader::new(rdr_file))
631                .expect("build reader");
632            let schema = reader.schema();
633            let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
634            let original =
635                arrow::compute::concat_batches(&schema, &input_batches).expect("concat input");
636            let tmp = NamedTempFile::new().expect("create temp file");
637            let out_path = tmp.into_temp_path();
638            let out_file = File::create(&out_path).expect("create temp avro");
639            let codec = if rel.contains(".snappy.") {
640                Some(CompressionCodec::Snappy)
641            } else if rel.contains(".zstandard.") {
642                Some(CompressionCodec::ZStandard)
643            } else if rel.contains(".bzip2.") {
644                Some(CompressionCodec::Bzip2)
645            } else if rel.contains(".xz.") {
646                Some(CompressionCodec::Xz)
647            } else {
648                None
649            };
650            let mut writer = WriterBuilder::new(original.schema().as_ref().clone())
651                .with_compression(codec)
652                .build::<_, AvroOcfFormat>(out_file)?;
653            writer.write(&original)?;
654            writer.finish()?;
655            drop(writer);
656            let rt_file = File::open(&out_path).expect("open roundtrip avro");
657            let rt_reader = ReaderBuilder::new()
658                .build(BufReader::new(rt_file))
659                .expect("build roundtrip reader");
660            let rt_schema = rt_reader.schema();
661            let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
662            let roundtrip =
663                arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip");
664            assert_eq!(
665                roundtrip, original,
666                "Round-trip batch mismatch for file: {}",
667                rel
668            );
669        }
670        Ok(())
671    }
672
673    #[test]
674    fn test_roundtrip_nested_records_writer() -> Result<(), ArrowError> {
675        let path = arrow_test_data("avro/nested_records.avro");
676        let rdr_file = File::open(&path).expect("open nested_records.avro");
677        let reader = ReaderBuilder::new()
678            .build(BufReader::new(rdr_file))
679            .expect("build reader for nested_records.avro");
680        let schema = reader.schema();
681        let batches = reader.collect::<Result<Vec<_>, _>>()?;
682        let original = arrow::compute::concat_batches(&schema, &batches).expect("concat original");
683        let tmp = NamedTempFile::new().expect("create temp file");
684        let out_path = tmp.into_temp_path();
685        {
686            let out_file = File::create(&out_path).expect("create output avro");
687            let mut writer = AvroWriter::new(out_file, original.schema().as_ref().clone())?;
688            writer.write(&original)?;
689            writer.finish()?;
690        }
691        let rt_file = File::open(&out_path).expect("open round_trip avro");
692        let rt_reader = ReaderBuilder::new()
693            .build(BufReader::new(rt_file))
694            .expect("build round_trip reader");
695        let rt_schema = rt_reader.schema();
696        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
697        let round_trip =
698            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
699        assert_eq!(
700            round_trip, original,
701            "Round-trip batch mismatch for nested_records.avro"
702        );
703        Ok(())
704    }
705
706    #[test]
707    #[cfg(feature = "snappy")]
708    fn test_roundtrip_nested_lists_writer() -> Result<(), ArrowError> {
709        let path = arrow_test_data("avro/nested_lists.snappy.avro");
710        let rdr_file = File::open(&path).expect("open nested_lists.snappy.avro");
711        let reader = ReaderBuilder::new()
712            .build(BufReader::new(rdr_file))
713            .expect("build reader for nested_lists.snappy.avro");
714        let schema = reader.schema();
715        let batches = reader.collect::<Result<Vec<_>, _>>()?;
716        let original = arrow::compute::concat_batches(&schema, &batches).expect("concat original");
717        let tmp = NamedTempFile::new().expect("create temp file");
718        let out_path = tmp.into_temp_path();
719        {
720            let out_file = File::create(&out_path).expect("create output avro");
721            let mut writer = WriterBuilder::new(original.schema().as_ref().clone())
722                .with_compression(Some(CompressionCodec::Snappy))
723                .build::<_, AvroOcfFormat>(out_file)?;
724            writer.write(&original)?;
725            writer.finish()?;
726        }
727        let rt_file = File::open(&out_path).expect("open round_trip avro");
728        let rt_reader = ReaderBuilder::new()
729            .build(BufReader::new(rt_file))
730            .expect("build round_trip reader");
731        let rt_schema = rt_reader.schema();
732        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
733        let round_trip =
734            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
735        assert_eq!(
736            round_trip, original,
737            "Round-trip batch mismatch for nested_lists.snappy.avro"
738        );
739        Ok(())
740    }
741
742    #[test]
743    fn test_round_trip_simple_fixed_ocf() -> Result<(), ArrowError> {
744        let path = arrow_test_data("avro/simple_fixed.avro");
745        let rdr_file = File::open(&path).expect("open avro/simple_fixed.avro");
746        let reader = ReaderBuilder::new()
747            .build(BufReader::new(rdr_file))
748            .expect("build avro reader");
749        let schema = reader.schema();
750        let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
751        let original =
752            arrow::compute::concat_batches(&schema, &input_batches).expect("concat input");
753        let tmp = NamedTempFile::new().expect("create temp file");
754        let out_file = File::create(tmp.path()).expect("create temp avro");
755        let mut writer = AvroWriter::new(out_file, original.schema().as_ref().clone())?;
756        writer.write(&original)?;
757        writer.finish()?;
758        drop(writer);
759        let rt_file = File::open(tmp.path()).expect("open round_trip avro");
760        let rt_reader = ReaderBuilder::new()
761            .build(BufReader::new(rt_file))
762            .expect("build round_trip reader");
763        let rt_schema = rt_reader.schema();
764        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
765        let round_trip =
766            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
767        assert_eq!(round_trip, original);
768        Ok(())
769    }
770
771    // Strict equality (schema + values) only when canonical extension types are enabled
772    #[test]
773    #[cfg(feature = "canonical_extension_types")]
774    fn test_round_trip_duration_and_uuid_ocf() -> Result<(), ArrowError> {
775        use arrow_schema::{DataType, IntervalUnit};
776        let in_file =
777            File::open("test/data/duration_uuid.avro").expect("open test/data/duration_uuid.avro");
778        let reader = ReaderBuilder::new()
779            .build(BufReader::new(in_file))
780            .expect("build reader for duration_uuid.avro");
781        let in_schema = reader.schema();
782        let has_mdn = in_schema.fields().iter().any(|f| {
783            matches!(
784                f.data_type(),
785                DataType::Interval(IntervalUnit::MonthDayNano)
786            )
787        });
788        assert!(
789            has_mdn,
790            "expected at least one Interval(MonthDayNano) field in duration_uuid.avro"
791        );
792        let has_uuid_fixed = in_schema
793            .fields()
794            .iter()
795            .any(|f| matches!(f.data_type(), DataType::FixedSizeBinary(16)));
796        assert!(
797            has_uuid_fixed,
798            "expected at least one FixedSizeBinary(16) (uuid) field in duration_uuid.avro"
799        );
800        let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
801        let input =
802            arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
803        // Write to an in‑memory OCF and read back
804        let mut writer = AvroWriter::new(Vec::<u8>::new(), in_schema.as_ref().clone())?;
805        writer.write(&input)?;
806        writer.finish()?;
807        let bytes = writer.into_inner();
808        let rt_reader = ReaderBuilder::new()
809            .build(Cursor::new(bytes))
810            .expect("build round_trip reader");
811        let rt_schema = rt_reader.schema();
812        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
813        let round_trip =
814            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
815        assert_eq!(round_trip, input);
816        Ok(())
817    }
818
819    // Feature OFF: only values are asserted equal; schema may legitimately differ (uuid as fixed(16))
820    #[test]
821    #[cfg(not(feature = "canonical_extension_types"))]
822    fn test_duration_and_uuid_ocf_without_extensions_round_trips_values() -> Result<(), ArrowError>
823    {
824        use arrow::datatypes::{DataType, IntervalUnit};
825        use std::io::BufReader;
826
827        // Read input Avro (duration + uuid)
828        let in_file =
829            File::open("test/data/duration_uuid.avro").expect("open test/data/duration_uuid.avro");
830        let reader = ReaderBuilder::new()
831            .build(BufReader::new(in_file))
832            .expect("build reader for duration_uuid.avro");
833        let in_schema = reader.schema();
834
835        // Sanity checks: has MonthDayNano and a FixedSizeBinary(16)
836        assert!(
837            in_schema.fields().iter().any(|f| {
838                matches!(
839                    f.data_type(),
840                    DataType::Interval(IntervalUnit::MonthDayNano)
841                )
842            }),
843            "expected at least one Interval(MonthDayNano) field"
844        );
845        assert!(
846            in_schema
847                .fields()
848                .iter()
849                .any(|f| matches!(f.data_type(), DataType::FixedSizeBinary(16))),
850            "expected a FixedSizeBinary(16) field (uuid)"
851        );
852
853        let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
854        let input =
855            arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
856
857        // Write to a temp OCF and read back
858        let mut writer = AvroWriter::new(Vec::<u8>::new(), in_schema.as_ref().clone())?;
859        writer.write(&input)?;
860        writer.finish()?;
861        let bytes = writer.into_inner();
862        let rt_reader = ReaderBuilder::new()
863            .build(Cursor::new(bytes))
864            .expect("build round_trip reader");
865        let rt_schema = rt_reader.schema();
866        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
867        let round_trip =
868            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
869
870        // 1) Values must round-trip for both columns
871        assert_eq!(
872            round_trip.column(0),
873            input.column(0),
874            "duration column values differ"
875        );
876        assert_eq!(round_trip.column(1), input.column(1), "uuid bytes differ");
877
878        // 2) Schema expectation without extensions:
879        //    uuid is written as named fixed(16), so reader attaches avro.name
880        let uuid_rt = rt_schema.field_with_name("uuid_field")?;
881        assert_eq!(uuid_rt.data_type(), &DataType::FixedSizeBinary(16));
882        assert_eq!(
883            uuid_rt.metadata().get("logicalType").map(|s| s.as_str()),
884            Some("uuid"),
885            "expected `logicalType = \"uuid\"` on round-tripped field metadata"
886        );
887
888        // 3) Duration remains Interval(MonthDayNano)
889        let dur_rt = rt_schema.field_with_name("duration_field")?;
890        assert!(matches!(
891            dur_rt.data_type(),
892            DataType::Interval(IntervalUnit::MonthDayNano)
893        ));
894
895        Ok(())
896    }
897
898    // This test reads the same 'nonnullable.impala.avro' used by the reader tests,
899    // writes it back out with the writer (hitting Map encoding paths), then reads it
900    // again and asserts exact Arrow equivalence.
901    #[test]
902    // TODO: avoid requiring snappy for this file
903    #[cfg(feature = "snappy")]
904    fn test_nonnullable_impala_roundtrip_writer() -> Result<(), ArrowError> {
905        // Load source Avro with Map fields
906        let path = arrow_test_data("avro/nonnullable.impala.avro");
907        let rdr_file = File::open(&path).expect("open avro/nonnullable.impala.avro");
908        let reader = ReaderBuilder::new()
909            .build(BufReader::new(rdr_file))
910            .expect("build reader for nonnullable.impala.avro");
911        // Collect all input batches and concatenate to a single RecordBatch
912        let in_schema = reader.schema();
913        // Sanity: ensure the file actually contains at least one Map field
914        let has_map = in_schema
915            .fields()
916            .iter()
917            .any(|f| matches!(f.data_type(), DataType::Map(_, _)));
918        assert!(
919            has_map,
920            "expected at least one Map field in avro/nonnullable.impala.avro"
921        );
922
923        let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
924        let original =
925            arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
926        // Write out using the OCF writer into an in-memory Vec<u8>
927        let buffer = Vec::<u8>::new();
928        let mut writer = AvroWriter::new(buffer, in_schema.as_ref().clone())?;
929        writer.write(&original)?;
930        writer.finish()?;
931        let out_bytes = writer.into_inner();
932        // Read the produced bytes back with the Reader
933        let rt_reader = ReaderBuilder::new()
934            .build(Cursor::new(out_bytes))
935            .expect("build reader for round-tripped in-memory OCF");
936        let rt_schema = rt_reader.schema();
937        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
938        let roundtrip =
939            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip");
940        // Exact value fidelity (schema + data)
941        assert_eq!(
942            roundtrip, original,
943            "Round-trip Avro map data mismatch for nonnullable.impala.avro"
944        );
945        Ok(())
946    }
947
948    #[test]
949    // TODO: avoid requiring snappy for these files
950    #[cfg(feature = "snappy")]
951    fn test_roundtrip_decimals_via_writer() -> Result<(), ArrowError> {
952        // (file, resolve via ARROW_TEST_DATA?)
953        let files: [(&str, bool); 8] = [
954            ("avro/fixed_length_decimal.avro", true), // fixed-backed -> Decimal128(25,2)
955            ("avro/fixed_length_decimal_legacy.avro", true), // legacy fixed[8] -> Decimal64(13,2)
956            ("avro/int32_decimal.avro", true),        // bytes-backed -> Decimal32(4,2)
957            ("avro/int64_decimal.avro", true),        // bytes-backed -> Decimal64(10,2)
958            ("test/data/int256_decimal.avro", false), // bytes-backed -> Decimal256(76,2)
959            ("test/data/fixed256_decimal.avro", false), // fixed[32]-backed -> Decimal256(76,10)
960            ("test/data/fixed_length_decimal_legacy_32.avro", false), // legacy fixed[4] -> Decimal32(9,2)
961            ("test/data/int128_decimal.avro", false), // bytes-backed -> Decimal128(38,2)
962        ];
963        for (rel, in_test_data_dir) in files {
964            // Resolve path the same way as reader::test_decimal
965            let path: String = if in_test_data_dir {
966                arrow_test_data(rel)
967            } else {
968                PathBuf::from(env!("CARGO_MANIFEST_DIR"))
969                    .join(rel)
970                    .to_string_lossy()
971                    .into_owned()
972            };
973            // Read original file into a single RecordBatch for comparison
974            let f_in = File::open(&path).expect("open input avro");
975            let rdr = ReaderBuilder::new().build(BufReader::new(f_in))?;
976            let in_schema = rdr.schema();
977            let in_batches = rdr.collect::<Result<Vec<_>, _>>()?;
978            let original =
979                arrow::compute::concat_batches(&in_schema, &in_batches).expect("concat input");
980            // Write it out with the OCF writer (no special compression)
981            let tmp = NamedTempFile::new().expect("create temp file");
982            let out_path = tmp.into_temp_path();
983            let out_file = File::create(&out_path).expect("create temp avro");
984            let mut writer = AvroWriter::new(out_file, original.schema().as_ref().clone())?;
985            writer.write(&original)?;
986            writer.finish()?;
987            // Read back the file we just wrote and compare equality (schema + data)
988            let f_rt = File::open(&out_path).expect("open roundtrip avro");
989            let rt_rdr = ReaderBuilder::new().build(BufReader::new(f_rt))?;
990            let rt_schema = rt_rdr.schema();
991            let rt_batches = rt_rdr.collect::<Result<Vec<_>, _>>()?;
992            let roundtrip =
993                arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat rt");
994            assert_eq!(roundtrip, original, "decimal round-trip mismatch for {rel}");
995        }
996        Ok(())
997    }
998
999    #[test]
1000    fn test_named_types_complex_roundtrip() -> Result<(), ArrowError> {
1001        // 1. Read the new, more complex named references file.
1002        let path =
1003            PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test/data/named_types_complex.avro");
1004        let rdr_file = File::open(&path).expect("open avro/named_types_complex.avro");
1005
1006        let reader = ReaderBuilder::new()
1007            .build(BufReader::new(rdr_file))
1008            .expect("build reader for named_types_complex.avro");
1009
1010        // 2. Concatenate all batches to one RecordBatch.
1011        let in_schema = reader.schema();
1012        let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
1013        let original =
1014            arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
1015
1016        // 3. Sanity Checks: Validate that all named types were reused correctly.
1017        {
1018            let arrow_schema = original.schema();
1019
1020            // --- A. Validate 'User' record reuse ---
1021            let author_field = arrow_schema.field_with_name("author")?;
1022            let author_type = author_field.data_type();
1023            let editors_field = arrow_schema.field_with_name("editors")?;
1024            let editors_item_type = match editors_field.data_type() {
1025                DataType::List(item_field) => item_field.data_type(),
1026                other => panic!("Editors field should be a List, but was {:?}", other),
1027            };
1028            assert_eq!(
1029                author_type, editors_item_type,
1030                "The DataType for the 'author' struct and the 'editors' list items must be identical"
1031            );
1032
1033            // --- B. Validate 'PostStatus' enum reuse ---
1034            let status_field = arrow_schema.field_with_name("status")?;
1035            let status_type = status_field.data_type();
1036            assert!(
1037                matches!(status_type, DataType::Dictionary(_, _)),
1038                "Status field should be a Dictionary (Enum)"
1039            );
1040
1041            let prev_status_field = arrow_schema.field_with_name("previous_status")?;
1042            let prev_status_type = prev_status_field.data_type();
1043            assert_eq!(
1044                status_type, prev_status_type,
1045                "The DataType for 'status' and 'previous_status' enums must be identical"
1046            );
1047
1048            // --- C. Validate 'MD5' fixed reuse ---
1049            let content_hash_field = arrow_schema.field_with_name("content_hash")?;
1050            let content_hash_type = content_hash_field.data_type();
1051            assert!(
1052                matches!(content_hash_type, DataType::FixedSizeBinary(16)),
1053                "Content hash should be FixedSizeBinary(16)"
1054            );
1055
1056            let thumb_hash_field = arrow_schema.field_with_name("thumbnail_hash")?;
1057            let thumb_hash_type = thumb_hash_field.data_type();
1058            assert_eq!(
1059                content_hash_type, thumb_hash_type,
1060                "The DataType for 'content_hash' and 'thumbnail_hash' fixed types must be identical"
1061            );
1062        }
1063
1064        // 4. Write the data to an in-memory buffer.
1065        let buffer: Vec<u8> = Vec::new();
1066        let mut writer = AvroWriter::new(buffer, original.schema().as_ref().clone())?;
1067        writer.write(&original)?;
1068        writer.finish()?;
1069        let bytes = writer.into_inner();
1070
1071        // 5. Read the data back and compare for exact equality.
1072        let rt_reader = ReaderBuilder::new()
1073            .build(Cursor::new(bytes))
1074            .expect("build reader for round-trip");
1075        let rt_schema = rt_reader.schema();
1076        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1077        let roundtrip =
1078            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip");
1079
1080        assert_eq!(
1081            roundtrip, original,
1082            "Avro complex named types round-trip mismatch"
1083        );
1084
1085        Ok(())
1086    }
1087
1088    // Union Roundtrip Test Helpers
1089
1090    // Asserts that the `actual` schema is a semantically equivalent superset of the `expected` one.
1091    // This allows the `actual` schema to contain additional metadata keys
1092    // (`arrowUnionMode`, `arrowUnionTypeIds`, `avro.name`) that are added during an Arrow-to-Avro-to-Arrow
1093    // roundtrip, while ensuring no other information was lost or changed.
1094    fn assert_schema_is_semantically_equivalent(expected: &Schema, actual: &Schema) {
1095        // Compare top-level schema metadata using the same superset logic.
1096        assert_metadata_is_superset(expected.metadata(), actual.metadata(), "Schema");
1097
1098        // Compare fields.
1099        assert_eq!(
1100            expected.fields().len(),
1101            actual.fields().len(),
1102            "Schema must have the same number of fields"
1103        );
1104
1105        for (expected_field, actual_field) in expected.fields().iter().zip(actual.fields().iter()) {
1106            assert_field_is_semantically_equivalent(expected_field, actual_field);
1107        }
1108    }
1109
1110    fn assert_field_is_semantically_equivalent(expected: &Field, actual: &Field) {
1111        let context = format!("Field '{}'", expected.name());
1112
1113        assert_eq!(
1114            expected.name(),
1115            actual.name(),
1116            "{context}: names must match"
1117        );
1118        assert_eq!(
1119            expected.is_nullable(),
1120            actual.is_nullable(),
1121            "{context}: nullability must match"
1122        );
1123
1124        // Recursively check the data types.
1125        assert_datatype_is_semantically_equivalent(
1126            expected.data_type(),
1127            actual.data_type(),
1128            &context,
1129        );
1130
1131        // Check that metadata is a valid superset.
1132        assert_metadata_is_superset(expected.metadata(), actual.metadata(), &context);
1133    }
1134
1135    fn assert_datatype_is_semantically_equivalent(
1136        expected: &DataType,
1137        actual: &DataType,
1138        context: &str,
1139    ) {
1140        match (expected, actual) {
1141            (DataType::List(expected_field), DataType::List(actual_field))
1142            | (DataType::LargeList(expected_field), DataType::LargeList(actual_field))
1143            | (DataType::Map(expected_field, _), DataType::Map(actual_field, _)) => {
1144                assert_field_is_semantically_equivalent(expected_field, actual_field);
1145            }
1146            (DataType::Struct(expected_fields), DataType::Struct(actual_fields)) => {
1147                assert_eq!(
1148                    expected_fields.len(),
1149                    actual_fields.len(),
1150                    "{context}: struct must have same number of fields"
1151                );
1152                for (ef, af) in expected_fields.iter().zip(actual_fields.iter()) {
1153                    assert_field_is_semantically_equivalent(ef, af);
1154                }
1155            }
1156            (
1157                DataType::Union(expected_fields, expected_mode),
1158                DataType::Union(actual_fields, actual_mode),
1159            ) => {
1160                assert_eq!(
1161                    expected_mode, actual_mode,
1162                    "{context}: union mode must match"
1163                );
1164                assert_eq!(
1165                    expected_fields.len(),
1166                    actual_fields.len(),
1167                    "{context}: union must have same number of variants"
1168                );
1169                for ((exp_id, exp_field), (act_id, act_field)) in
1170                    expected_fields.iter().zip(actual_fields.iter())
1171                {
1172                    assert_eq!(exp_id, act_id, "{context}: union type ids must match");
1173                    assert_field_is_semantically_equivalent(exp_field, act_field);
1174                }
1175            }
1176            _ => {
1177                assert_eq!(expected, actual, "{context}: data types must be identical");
1178            }
1179        }
1180    }
1181
1182    fn assert_batch_data_is_identical(expected: &RecordBatch, actual: &RecordBatch) {
1183        assert_eq!(
1184            expected.num_columns(),
1185            actual.num_columns(),
1186            "RecordBatches must have the same number of columns"
1187        );
1188        assert_eq!(
1189            expected.num_rows(),
1190            actual.num_rows(),
1191            "RecordBatches must have the same number of rows"
1192        );
1193
1194        for i in 0..expected.num_columns() {
1195            let context = format!("Column {i}");
1196            let expected_col = expected.column(i);
1197            let actual_col = actual.column(i);
1198            assert_array_data_is_identical(expected_col, actual_col, &context);
1199        }
1200    }
1201
1202    /// Recursively asserts that the data content of two Arrays is identical.
1203    fn assert_array_data_is_identical(expected: &dyn Array, actual: &dyn Array, context: &str) {
1204        assert_eq!(
1205            expected.nulls(),
1206            actual.nulls(),
1207            "{context}: null buffers must match"
1208        );
1209        assert_eq!(
1210            expected.len(),
1211            actual.len(),
1212            "{context}: array lengths must match"
1213        );
1214
1215        match (expected.data_type(), actual.data_type()) {
1216            (DataType::Union(expected_fields, _), DataType::Union(..)) => {
1217                let expected_union = expected.as_any().downcast_ref::<UnionArray>().unwrap();
1218                let actual_union = actual.as_any().downcast_ref::<UnionArray>().unwrap();
1219
1220                // Compare the type_ids buffer (always the first buffer).
1221                assert_eq!(
1222                    &expected.to_data().buffers()[0],
1223                    &actual.to_data().buffers()[0],
1224                    "{context}: union type_ids buffer mismatch"
1225                );
1226
1227                // For dense unions, compare the value_offsets buffer (the second buffer).
1228                if expected.to_data().buffers().len() > 1 {
1229                    assert_eq!(
1230                        &expected.to_data().buffers()[1],
1231                        &actual.to_data().buffers()[1],
1232                        "{context}: union value_offsets buffer mismatch"
1233                    );
1234                }
1235
1236                // Recursively compare children based on the fields in the DataType.
1237                for (type_id, _) in expected_fields.iter() {
1238                    let child_context = format!("{context} -> child variant {type_id}");
1239                    assert_array_data_is_identical(
1240                        expected_union.child(type_id),
1241                        actual_union.child(type_id),
1242                        &child_context,
1243                    );
1244                }
1245            }
1246            (DataType::Struct(_), DataType::Struct(_)) => {
1247                let expected_struct = expected.as_any().downcast_ref::<StructArray>().unwrap();
1248                let actual_struct = actual.as_any().downcast_ref::<StructArray>().unwrap();
1249                for i in 0..expected_struct.num_columns() {
1250                    let child_context = format!("{context} -> struct child {i}");
1251                    assert_array_data_is_identical(
1252                        expected_struct.column(i),
1253                        actual_struct.column(i),
1254                        &child_context,
1255                    );
1256                }
1257            }
1258            // Fallback for primitive types and other types where buffer comparison is sufficient.
1259            _ => {
1260                assert_eq!(
1261                    expected.to_data().buffers(),
1262                    actual.to_data().buffers(),
1263                    "{context}: data buffers must match"
1264                );
1265            }
1266        }
1267    }
1268
1269    /// Checks that `actual_meta` contains all of `expected_meta`, and any additional
1270    /// keys in `actual_meta` are from a permitted set.
1271    fn assert_metadata_is_superset(
1272        expected_meta: &HashMap<String, String>,
1273        actual_meta: &HashMap<String, String>,
1274        context: &str,
1275    ) {
1276        let allowed_additions: HashSet<&str> =
1277            vec!["arrowUnionMode", "arrowUnionTypeIds", "avro.name"]
1278                .into_iter()
1279                .collect();
1280        for (key, expected_value) in expected_meta {
1281            match actual_meta.get(key) {
1282                Some(actual_value) => assert_eq!(
1283                    expected_value, actual_value,
1284                    "{context}: preserved metadata for key '{key}' must have the same value"
1285                ),
1286                None => panic!("{context}: metadata key '{key}' was lost during roundtrip"),
1287            }
1288        }
1289        for key in actual_meta.keys() {
1290            if !expected_meta.contains_key(key) && !allowed_additions.contains(key.as_str()) {
1291                panic!("{context}: unexpected metadata key '{key}' was added during roundtrip");
1292            }
1293        }
1294    }
1295
1296    #[test]
1297    fn test_union_roundtrip() -> Result<(), ArrowError> {
1298        let file_path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
1299            .join("test/data/union_fields.avro")
1300            .to_string_lossy()
1301            .into_owned();
1302        let rdr_file = File::open(&file_path).expect("open avro/union_fields.avro");
1303        let reader = ReaderBuilder::new()
1304            .build(BufReader::new(rdr_file))
1305            .expect("build reader for union_fields.avro");
1306        let schema = reader.schema();
1307        let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
1308        let original =
1309            arrow::compute::concat_batches(&schema, &input_batches).expect("concat input");
1310        let mut writer = AvroWriter::new(Vec::<u8>::new(), original.schema().as_ref().clone())?;
1311        writer.write(&original)?;
1312        writer.finish()?;
1313        let bytes = writer.into_inner();
1314        let rt_reader = ReaderBuilder::new()
1315            .build(Cursor::new(bytes))
1316            .expect("build round_trip reader");
1317        let rt_schema = rt_reader.schema();
1318        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1319        let round_trip =
1320            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
1321
1322        // The nature of the crate is such that metadata gets appended during the roundtrip,
1323        // so we can't compare the schemas directly. Instead, we semantically compare the schemas and data.
1324        assert_schema_is_semantically_equivalent(&original.schema(), &round_trip.schema());
1325
1326        assert_batch_data_is_identical(&original, &round_trip);
1327        Ok(())
1328    }
1329
1330    #[test]
1331    fn test_enum_roundtrip_uses_reader_fixture() -> Result<(), ArrowError> {
1332        // Read the known-good enum file (same as reader::test_simple)
1333        let path = arrow_test_data("avro/simple_enum.avro");
1334        let rdr_file = File::open(&path).expect("open avro/simple_enum.avro");
1335        let reader = ReaderBuilder::new()
1336            .build(BufReader::new(rdr_file))
1337            .expect("build reader for simple_enum.avro");
1338        // Concatenate all batches to one RecordBatch for a clean equality check
1339        let in_schema = reader.schema();
1340        let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
1341        let original =
1342            arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
1343        // Sanity: expect at least one Dictionary(Int32, Utf8) column (enum)
1344        let has_enum_dict = in_schema.fields().iter().any(|f| {
1345            matches!(
1346                f.data_type(),
1347                DataType::Dictionary(k, v) if **k == DataType::Int32 && **v == DataType::Utf8
1348            )
1349        });
1350        assert!(
1351            has_enum_dict,
1352            "Expected at least one enum-mapped Dictionary<Int32, Utf8> field"
1353        );
1354        // Write with OCF writer into memory using the reader-provided Arrow schema.
1355        // The writer will embed the Avro JSON from `avro.schema` metadata if present.
1356        let buffer: Vec<u8> = Vec::new();
1357        let mut writer = AvroWriter::new(buffer, in_schema.as_ref().clone())?;
1358        writer.write(&original)?;
1359        writer.finish()?;
1360        let bytes = writer.into_inner();
1361        // Read back and compare for exact equality (schema + data)
1362        let rt_reader = ReaderBuilder::new()
1363            .build(Cursor::new(bytes))
1364            .expect("reader for round-trip");
1365        let rt_schema = rt_reader.schema();
1366        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1367        let roundtrip =
1368            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip");
1369        assert_eq!(roundtrip, original, "Avro enum round-trip mismatch");
1370        Ok(())
1371    }
1372
1373    #[test]
1374    fn test_builder_propagates_capacity_to_writer() -> Result<(), ArrowError> {
1375        let cap = 64 * 1024;
1376        let buffer = Vec::<u8>::new();
1377        let mut writer = WriterBuilder::new(make_schema())
1378            .with_capacity(cap)
1379            .build::<_, AvroOcfFormat>(buffer)?;
1380        assert_eq!(writer.capacity, cap, "builder capacity not propagated");
1381        let batch = make_batch();
1382        writer.write(&batch)?;
1383        writer.finish()?;
1384        let out = writer.into_inner();
1385        assert_eq!(&out[..4], b"Obj\x01", "OCF magic missing/incorrect");
1386        Ok(())
1387    }
1388
1389    #[test]
1390    fn test_stream_writer_stores_capacity_direct_writes() -> Result<(), ArrowError> {
1391        use arrow_array::{ArrayRef, Int32Array};
1392        use arrow_schema::{DataType, Field, Schema};
1393        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1394        let batch = RecordBatch::try_new(
1395            Arc::new(schema.clone()),
1396            vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef],
1397        )?;
1398        let cap = 8192;
1399        let mut writer = WriterBuilder::new(schema)
1400            .with_capacity(cap)
1401            .build::<_, AvroSoeFormat>(Vec::new())?;
1402        assert_eq!(writer.capacity, cap);
1403        writer.write(&batch)?;
1404        let _bytes = writer.into_inner();
1405        Ok(())
1406    }
1407
1408    #[cfg(feature = "avro_custom_types")]
1409    #[test]
1410    fn test_roundtrip_duration_logical_types_ocf() -> Result<(), ArrowError> {
1411        let file_path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
1412            .join("test/data/duration_logical_types.avro")
1413            .to_string_lossy()
1414            .into_owned();
1415
1416        let in_file = File::open(&file_path)
1417            .unwrap_or_else(|_| panic!("Failed to open test file: {}", file_path));
1418
1419        let reader = ReaderBuilder::new()
1420            .build(BufReader::new(in_file))
1421            .expect("build reader for duration_logical_types.avro");
1422        let in_schema = reader.schema();
1423
1424        let expected_units: HashSet<TimeUnit> = [
1425            TimeUnit::Nanosecond,
1426            TimeUnit::Microsecond,
1427            TimeUnit::Millisecond,
1428            TimeUnit::Second,
1429        ]
1430        .into_iter()
1431        .collect();
1432
1433        let found_units: HashSet<TimeUnit> = in_schema
1434            .fields()
1435            .iter()
1436            .filter_map(|f| match f.data_type() {
1437                DataType::Duration(unit) => Some(*unit),
1438                _ => None,
1439            })
1440            .collect();
1441
1442        assert_eq!(
1443            found_units, expected_units,
1444            "Expected to find all four Duration TimeUnits in the schema from the initial read"
1445        );
1446
1447        let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
1448        let input =
1449            arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
1450
1451        let tmp = NamedTempFile::new().expect("create temp file");
1452        {
1453            let out_file = File::create(tmp.path()).expect("create temp avro");
1454            let mut writer = AvroWriter::new(out_file, in_schema.as_ref().clone())?;
1455            writer.write(&input)?;
1456            writer.finish()?;
1457        }
1458
1459        let rt_file = File::open(tmp.path()).expect("open round_trip avro");
1460        let rt_reader = ReaderBuilder::new()
1461            .build(BufReader::new(rt_file))
1462            .expect("build round_trip reader");
1463        let rt_schema = rt_reader.schema();
1464        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1465        let round_trip =
1466            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
1467
1468        assert_eq!(round_trip, input);
1469        Ok(())
1470    }
1471
1472    #[cfg(feature = "avro_custom_types")]
1473    #[test]
1474    fn test_run_end_encoded_roundtrip_writer() -> Result<(), ArrowError> {
1475        let run_ends = Int32Array::from(vec![3, 5, 7, 8]);
1476        let run_values = Int32Array::from(vec![Some(1), Some(2), None, Some(3)]);
1477        let ree = RunArray::<Int32Type>::try_new(&run_ends, &run_values)?;
1478        let field = Field::new("x", ree.data_type().clone(), true);
1479        let schema = Schema::new(vec![field]);
1480        let batch = RecordBatch::try_new(
1481            Arc::new(schema.clone()),
1482            vec![Arc::new(ree.clone()) as ArrayRef],
1483        )?;
1484        let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
1485        writer.write(&batch)?;
1486        writer.finish()?;
1487        let bytes = writer.into_inner();
1488        let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
1489        let out_schema = reader.schema();
1490        let batches = reader.collect::<Result<Vec<_>, _>>()?;
1491        let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output");
1492        assert_eq!(out.num_columns(), 1);
1493        assert_eq!(out.num_rows(), 8);
1494        match out.schema().field(0).data_type() {
1495            DataType::RunEndEncoded(run_ends_field, values_field) => {
1496                assert_eq!(run_ends_field.name(), "run_ends");
1497                assert_eq!(run_ends_field.data_type(), &DataType::Int32);
1498                assert_eq!(values_field.name(), "values");
1499                assert_eq!(values_field.data_type(), &DataType::Int32);
1500                assert!(values_field.is_nullable());
1501                let got_ree = out
1502                    .column(0)
1503                    .as_any()
1504                    .downcast_ref::<RunArray<Int32Type>>()
1505                    .expect("RunArray<Int32Type>");
1506                assert_eq!(got_ree, &ree);
1507            }
1508            other => panic!(
1509                "Unexpected DataType for round-tripped RunEndEncoded column: {:?}",
1510                other
1511            ),
1512        }
1513        Ok(())
1514    }
1515
1516    #[cfg(feature = "avro_custom_types")]
1517    #[test]
1518    fn test_run_end_encoded_string_values_int16_run_ends_roundtrip_writer() -> Result<(), ArrowError>
1519    {
1520        let run_ends = Int16Array::from(vec![2, 5, 7]); // end indices
1521        let run_values = StringArray::from(vec![Some("a"), None, Some("c")]);
1522        let ree = RunArray::<Int16Type>::try_new(&run_ends, &run_values)?;
1523        let field = Field::new("s", ree.data_type().clone(), true);
1524        let schema = Schema::new(vec![field]);
1525        let batch = RecordBatch::try_new(
1526            Arc::new(schema.clone()),
1527            vec![Arc::new(ree.clone()) as ArrayRef],
1528        )?;
1529        let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
1530        writer.write(&batch)?;
1531        writer.finish()?;
1532        let bytes = writer.into_inner();
1533        let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
1534        let out_schema = reader.schema();
1535        let batches = reader.collect::<Result<Vec<_>, _>>()?;
1536        let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output");
1537        assert_eq!(out.num_columns(), 1);
1538        assert_eq!(out.num_rows(), 7);
1539        match out.schema().field(0).data_type() {
1540            DataType::RunEndEncoded(run_ends_field, values_field) => {
1541                assert_eq!(run_ends_field.data_type(), &DataType::Int16);
1542                assert_eq!(values_field.data_type(), &DataType::Utf8);
1543                assert!(
1544                    values_field.is_nullable(),
1545                    "REE 'values' child should be nullable"
1546                );
1547                let got = out
1548                    .column(0)
1549                    .as_any()
1550                    .downcast_ref::<RunArray<Int16Type>>()
1551                    .expect("RunArray<Int16Type>");
1552                assert_eq!(got, &ree);
1553            }
1554            other => panic!("Unexpected DataType: {:?}", other),
1555        }
1556        Ok(())
1557    }
1558
1559    #[cfg(feature = "avro_custom_types")]
1560    #[test]
1561    fn test_run_end_encoded_int64_run_ends_numeric_values_roundtrip_writer()
1562    -> Result<(), ArrowError> {
1563        let run_ends = Int64Array::from(vec![4_i64, 8_i64]);
1564        let run_values = Int32Array::from(vec![Some(999), Some(-5)]);
1565        let ree = RunArray::<Int64Type>::try_new(&run_ends, &run_values)?;
1566        let field = Field::new("y", ree.data_type().clone(), true);
1567        let schema = Schema::new(vec![field]);
1568        let batch = RecordBatch::try_new(
1569            Arc::new(schema.clone()),
1570            vec![Arc::new(ree.clone()) as ArrayRef],
1571        )?;
1572        let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
1573        writer.write(&batch)?;
1574        writer.finish()?;
1575        let bytes = writer.into_inner();
1576        let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
1577        let out_schema = reader.schema();
1578        let batches = reader.collect::<Result<Vec<_>, _>>()?;
1579        let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output");
1580        assert_eq!(out.num_columns(), 1);
1581        assert_eq!(out.num_rows(), 8);
1582        match out.schema().field(0).data_type() {
1583            DataType::RunEndEncoded(run_ends_field, values_field) => {
1584                assert_eq!(run_ends_field.data_type(), &DataType::Int64);
1585                assert_eq!(values_field.data_type(), &DataType::Int32);
1586                assert!(values_field.is_nullable());
1587                let got = out
1588                    .column(0)
1589                    .as_any()
1590                    .downcast_ref::<RunArray<Int64Type>>()
1591                    .expect("RunArray<Int64Type>");
1592                assert_eq!(got, &ree);
1593            }
1594            other => panic!("Unexpected DataType for REE column: {:?}", other),
1595        }
1596        Ok(())
1597    }
1598
1599    #[cfg(feature = "avro_custom_types")]
1600    #[test]
1601    fn test_run_end_encoded_sliced_roundtrip_writer() -> Result<(), ArrowError> {
1602        let run_ends = Int32Array::from(vec![3, 5, 7, 8]);
1603        let run_values = Int32Array::from(vec![Some(1), Some(2), None, Some(3)]);
1604        let base = RunArray::<Int32Type>::try_new(&run_ends, &run_values)?;
1605        let offset = 1usize;
1606        let length = 6usize;
1607        let base_values = base
1608            .values()
1609            .as_any()
1610            .downcast_ref::<Int32Array>()
1611            .expect("REE values as Int32Array");
1612        let mut logical_window: Vec<Option<i32>> = Vec::with_capacity(length);
1613        for i in offset..offset + length {
1614            let phys = base.get_physical_index(i);
1615            let v = if base_values.is_null(phys) {
1616                None
1617            } else {
1618                Some(base_values.value(phys))
1619            };
1620            logical_window.push(v);
1621        }
1622
1623        fn compress_run_ends_i32(vals: &[Option<i32>]) -> (Int32Array, Int32Array) {
1624            if vals.is_empty() {
1625                return (Int32Array::new_null(0), Int32Array::new_null(0));
1626            }
1627            let mut run_ends_out: Vec<i32> = Vec::new();
1628            let mut run_vals_out: Vec<Option<i32>> = Vec::new();
1629            let mut cur = vals[0];
1630            let mut len = 1i32;
1631            for v in &vals[1..] {
1632                if *v == cur {
1633                    len += 1;
1634                } else {
1635                    let last_end = run_ends_out.last().copied().unwrap_or(0);
1636                    run_ends_out.push(last_end + len);
1637                    run_vals_out.push(cur);
1638                    cur = *v;
1639                    len = 1;
1640                }
1641            }
1642            let last_end = run_ends_out.last().copied().unwrap_or(0);
1643            run_ends_out.push(last_end + len);
1644            run_vals_out.push(cur);
1645            (
1646                Int32Array::from(run_ends_out),
1647                Int32Array::from(run_vals_out),
1648            )
1649        }
1650        let (owned_run_ends, owned_run_values) = compress_run_ends_i32(&logical_window);
1651        let owned_slice = RunArray::<Int32Type>::try_new(&owned_run_ends, &owned_run_values)?;
1652        let field = Field::new("x", owned_slice.data_type().clone(), true);
1653        let schema = Schema::new(vec![field]);
1654        let batch = RecordBatch::try_new(
1655            Arc::new(schema.clone()),
1656            vec![Arc::new(owned_slice.clone()) as ArrayRef],
1657        )?;
1658        let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
1659        writer.write(&batch)?;
1660        writer.finish()?;
1661        let bytes = writer.into_inner();
1662        let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
1663        let out_schema = reader.schema();
1664        let batches = reader.collect::<Result<Vec<_>, _>>()?;
1665        let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output");
1666        assert_eq!(out.num_columns(), 1);
1667        assert_eq!(out.num_rows(), length);
1668        match out.schema().field(0).data_type() {
1669            DataType::RunEndEncoded(run_ends_field, values_field) => {
1670                assert_eq!(run_ends_field.data_type(), &DataType::Int32);
1671                assert_eq!(values_field.data_type(), &DataType::Int32);
1672                assert!(values_field.is_nullable());
1673                let got = out
1674                    .column(0)
1675                    .as_any()
1676                    .downcast_ref::<RunArray<Int32Type>>()
1677                    .expect("RunArray<Int32Type>");
1678                fn expand_ree_to_int32(a: &RunArray<Int32Type>) -> Int32Array {
1679                    let vals = a
1680                        .values()
1681                        .as_any()
1682                        .downcast_ref::<Int32Array>()
1683                        .expect("REE values as Int32Array");
1684                    let mut out: Vec<Option<i32>> = Vec::with_capacity(a.len());
1685                    for i in 0..a.len() {
1686                        let phys = a.get_physical_index(i);
1687                        out.push(if vals.is_null(phys) {
1688                            None
1689                        } else {
1690                            Some(vals.value(phys))
1691                        });
1692                    }
1693                    Int32Array::from(out)
1694                }
1695                let got_logical = expand_ree_to_int32(got);
1696                let expected_logical = Int32Array::from(logical_window);
1697                assert_eq!(
1698                    got_logical, expected_logical,
1699                    "Logical values differ after REE slice round-trip"
1700                );
1701            }
1702            other => panic!("Unexpected DataType for REE column: {:?}", other),
1703        }
1704        Ok(())
1705    }
1706
1707    #[cfg(not(feature = "avro_custom_types"))]
1708    #[test]
1709    fn test_run_end_encoded_roundtrip_writer_feature_off() -> Result<(), ArrowError> {
1710        use arrow_schema::{DataType, Field, Schema};
1711        let run_ends = arrow_array::Int32Array::from(vec![3, 5, 7, 8]);
1712        let run_values = arrow_array::Int32Array::from(vec![Some(1), Some(2), None, Some(3)]);
1713        let ree = arrow_array::RunArray::<arrow_array::types::Int32Type>::try_new(
1714            &run_ends,
1715            &run_values,
1716        )?;
1717        let field = Field::new("x", ree.data_type().clone(), true);
1718        let schema = Schema::new(vec![field]);
1719        let batch =
1720            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(ree) as ArrayRef])?;
1721        let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
1722        writer.write(&batch)?;
1723        writer.finish()?;
1724        let bytes = writer.into_inner();
1725        let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
1726        let out_schema = reader.schema();
1727        let batches = reader.collect::<Result<Vec<_>, _>>()?;
1728        let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output");
1729        assert_eq!(out.num_columns(), 1);
1730        assert_eq!(out.num_rows(), 8);
1731        assert_eq!(out.schema().field(0).data_type(), &DataType::Int32);
1732        let got = out
1733            .column(0)
1734            .as_any()
1735            .downcast_ref::<Int32Array>()
1736            .expect("Int32Array");
1737        let expected = Int32Array::from(vec![
1738            Some(1),
1739            Some(1),
1740            Some(1),
1741            Some(2),
1742            Some(2),
1743            None,
1744            None,
1745            Some(3),
1746        ]);
1747        assert_eq!(got, &expected);
1748        Ok(())
1749    }
1750
1751    #[cfg(not(feature = "avro_custom_types"))]
1752    #[test]
1753    fn test_run_end_encoded_string_values_int16_run_ends_roundtrip_writer_feature_off()
1754    -> Result<(), ArrowError> {
1755        use arrow_schema::{DataType, Field, Schema};
1756        let run_ends = arrow_array::Int16Array::from(vec![2, 5, 7]);
1757        let run_values = arrow_array::StringArray::from(vec![Some("a"), None, Some("c")]);
1758        let ree = arrow_array::RunArray::<arrow_array::types::Int16Type>::try_new(
1759            &run_ends,
1760            &run_values,
1761        )?;
1762        let field = Field::new("s", ree.data_type().clone(), true);
1763        let schema = Schema::new(vec![field]);
1764        let batch =
1765            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(ree) as ArrayRef])?;
1766        let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
1767        writer.write(&batch)?;
1768        writer.finish()?;
1769        let bytes = writer.into_inner();
1770        let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
1771        let out_schema = reader.schema();
1772        let batches = reader.collect::<Result<Vec<_>, _>>()?;
1773        let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output");
1774        assert_eq!(out.num_columns(), 1);
1775        assert_eq!(out.num_rows(), 7);
1776        assert_eq!(out.schema().field(0).data_type(), &DataType::Utf8);
1777        let got = out
1778            .column(0)
1779            .as_any()
1780            .downcast_ref::<arrow_array::StringArray>()
1781            .expect("StringArray");
1782        let expected = arrow_array::StringArray::from(vec![
1783            Some("a"),
1784            Some("a"),
1785            None,
1786            None,
1787            None,
1788            Some("c"),
1789            Some("c"),
1790        ]);
1791        assert_eq!(got, &expected);
1792        Ok(())
1793    }
1794
1795    #[cfg(not(feature = "avro_custom_types"))]
1796    #[test]
1797    fn test_run_end_encoded_int64_run_ends_numeric_values_roundtrip_writer_feature_off()
1798    -> Result<(), ArrowError> {
1799        use arrow_schema::{DataType, Field, Schema};
1800        let run_ends = arrow_array::Int64Array::from(vec![4_i64, 8_i64]);
1801        let run_values = Int32Array::from(vec![Some(999), Some(-5)]);
1802        let ree = arrow_array::RunArray::<arrow_array::types::Int64Type>::try_new(
1803            &run_ends,
1804            &run_values,
1805        )?;
1806        let field = Field::new("y", ree.data_type().clone(), true);
1807        let schema = Schema::new(vec![field]);
1808        let batch =
1809            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(ree) as ArrayRef])?;
1810        let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
1811        writer.write(&batch)?;
1812        writer.finish()?;
1813        let bytes = writer.into_inner();
1814        let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
1815        let out_schema = reader.schema();
1816        let batches = reader.collect::<Result<Vec<_>, _>>()?;
1817        let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output");
1818        assert_eq!(out.num_columns(), 1);
1819        assert_eq!(out.num_rows(), 8);
1820        assert_eq!(out.schema().field(0).data_type(), &DataType::Int32);
1821        let got = out
1822            .column(0)
1823            .as_any()
1824            .downcast_ref::<Int32Array>()
1825            .expect("Int32Array");
1826        let expected = Int32Array::from(vec![
1827            Some(999),
1828            Some(999),
1829            Some(999),
1830            Some(999),
1831            Some(-5),
1832            Some(-5),
1833            Some(-5),
1834            Some(-5),
1835        ]);
1836        assert_eq!(got, &expected);
1837        Ok(())
1838    }
1839
1840    #[cfg(not(feature = "avro_custom_types"))]
1841    #[test]
1842    fn test_run_end_encoded_sliced_roundtrip_writer_feature_off() -> Result<(), ArrowError> {
1843        use arrow_schema::{DataType, Field, Schema};
1844        let run_ends = Int32Array::from(vec![2, 4, 6]);
1845        let run_values = Int32Array::from(vec![Some(1), Some(2), None]);
1846        let ree = arrow_array::RunArray::<arrow_array::types::Int32Type>::try_new(
1847            &run_ends,
1848            &run_values,
1849        )?;
1850        let field = Field::new("x", ree.data_type().clone(), true);
1851        let schema = Schema::new(vec![field]);
1852        let batch =
1853            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(ree) as ArrayRef])?;
1854        let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
1855        writer.write(&batch)?;
1856        writer.finish()?;
1857        let bytes = writer.into_inner();
1858        let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
1859        let out_schema = reader.schema();
1860        let batches = reader.collect::<Result<Vec<_>, _>>()?;
1861        let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output");
1862        assert_eq!(out.num_columns(), 1);
1863        assert_eq!(out.num_rows(), 6);
1864        assert_eq!(out.schema().field(0).data_type(), &DataType::Int32);
1865        let got = out
1866            .column(0)
1867            .as_any()
1868            .downcast_ref::<Int32Array>()
1869            .expect("Int32Array");
1870        let expected = Int32Array::from(vec![Some(1), Some(1), Some(2), Some(2), None, None]);
1871        assert_eq!(got, &expected);
1872        Ok(())
1873    }
1874
1875    #[test]
1876    // TODO: avoid requiring snappy for this file
1877    #[cfg(feature = "snappy")]
1878    fn test_nullable_impala_roundtrip() -> Result<(), ArrowError> {
1879        let path = arrow_test_data("avro/nullable.impala.avro");
1880        let rdr_file = File::open(&path).expect("open avro/nullable.impala.avro");
1881        let reader = ReaderBuilder::new()
1882            .build(BufReader::new(rdr_file))
1883            .expect("build reader for nullable.impala.avro");
1884        let in_schema = reader.schema();
1885        assert!(
1886            in_schema.fields().iter().any(|f| f.is_nullable()),
1887            "expected at least one nullable field in avro/nullable.impala.avro"
1888        );
1889        let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
1890        let original =
1891            arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
1892        let buffer: Vec<u8> = Vec::new();
1893        let mut writer = AvroWriter::new(buffer, in_schema.as_ref().clone())?;
1894        writer.write(&original)?;
1895        writer.finish()?;
1896        let out_bytes = writer.into_inner();
1897        let rt_reader = ReaderBuilder::new()
1898            .build(Cursor::new(out_bytes))
1899            .expect("build reader for round-tripped in-memory OCF");
1900        let rt_schema = rt_reader.schema();
1901        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1902        let roundtrip =
1903            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip");
1904        assert_eq!(
1905            roundtrip, original,
1906            "Round-trip Avro data mismatch for nullable.impala.avro"
1907        );
1908        Ok(())
1909    }
1910
1911    #[test]
1912    #[cfg(feature = "snappy")]
1913    fn test_datapage_v2_roundtrip() -> Result<(), ArrowError> {
1914        let path = arrow_test_data("avro/datapage_v2.snappy.avro");
1915        let rdr_file = File::open(&path).expect("open avro/datapage_v2.snappy.avro");
1916        let reader = ReaderBuilder::new()
1917            .build(BufReader::new(rdr_file))
1918            .expect("build reader for datapage_v2.snappy.avro");
1919        let in_schema = reader.schema();
1920        let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
1921        let original =
1922            arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
1923        let mut writer = AvroWriter::new(Vec::<u8>::new(), in_schema.as_ref().clone())?;
1924        writer.write(&original)?;
1925        writer.finish()?;
1926        let bytes = writer.into_inner();
1927        let rt_reader = ReaderBuilder::new()
1928            .build(Cursor::new(bytes))
1929            .expect("build round-trip reader");
1930        let rt_schema = rt_reader.schema();
1931        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1932        let round_trip =
1933            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
1934        assert_eq!(
1935            round_trip, original,
1936            "Round-trip batch mismatch for datapage_v2.snappy.avro"
1937        );
1938        Ok(())
1939    }
1940
1941    #[test]
1942    #[cfg(feature = "snappy")]
1943    fn test_single_nan_roundtrip() -> Result<(), ArrowError> {
1944        let path = arrow_test_data("avro/single_nan.avro");
1945        let in_file = File::open(&path).expect("open avro/single_nan.avro");
1946        let reader = ReaderBuilder::new()
1947            .build(BufReader::new(in_file))
1948            .expect("build reader for single_nan.avro");
1949        let in_schema = reader.schema();
1950        let in_batches = reader.collect::<Result<Vec<_>, _>>()?;
1951        let original =
1952            arrow::compute::concat_batches(&in_schema, &in_batches).expect("concat input");
1953        let mut writer = AvroWriter::new(Vec::<u8>::new(), original.schema().as_ref().clone())?;
1954        writer.write(&original)?;
1955        writer.finish()?;
1956        let bytes = writer.into_inner();
1957        let rt_reader = ReaderBuilder::new()
1958            .build(Cursor::new(bytes))
1959            .expect("build round_trip reader");
1960        let rt_schema = rt_reader.schema();
1961        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1962        let round_trip =
1963            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
1964        assert_eq!(
1965            round_trip, original,
1966            "Round-trip batch mismatch for avro/single_nan.avro"
1967        );
1968        Ok(())
1969    }
1970    #[test]
1971    // TODO: avoid requiring snappy for this file
1972    #[cfg(feature = "snappy")]
1973    fn test_dict_pages_offset_zero_roundtrip() -> Result<(), ArrowError> {
1974        let path = arrow_test_data("avro/dict-page-offset-zero.avro");
1975        let rdr_file = File::open(&path).expect("open avro/dict-page-offset-zero.avro");
1976        let reader = ReaderBuilder::new()
1977            .build(BufReader::new(rdr_file))
1978            .expect("build reader for dict-page-offset-zero.avro");
1979        let in_schema = reader.schema();
1980        let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
1981        let original =
1982            arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
1983        let buffer: Vec<u8> = Vec::new();
1984        let mut writer = AvroWriter::new(buffer, original.schema().as_ref().clone())?;
1985        writer.write(&original)?;
1986        writer.finish()?;
1987        let bytes = writer.into_inner();
1988        let rt_reader = ReaderBuilder::new()
1989            .build(Cursor::new(bytes))
1990            .expect("build reader for round-trip");
1991        let rt_schema = rt_reader.schema();
1992        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1993        let roundtrip =
1994            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip");
1995        assert_eq!(
1996            roundtrip, original,
1997            "Round-trip batch mismatch for avro/dict-page-offset-zero.avro"
1998        );
1999        Ok(())
2000    }
2001
2002    #[test]
2003    #[cfg(feature = "snappy")]
2004    fn test_repeated_no_annotation_roundtrip() -> Result<(), ArrowError> {
2005        let path = arrow_test_data("avro/repeated_no_annotation.avro");
2006        let in_file = File::open(&path).expect("open avro/repeated_no_annotation.avro");
2007        let reader = ReaderBuilder::new()
2008            .build(BufReader::new(in_file))
2009            .expect("build reader for repeated_no_annotation.avro");
2010        let in_schema = reader.schema();
2011        let in_batches = reader.collect::<Result<Vec<_>, _>>()?;
2012        let original =
2013            arrow::compute::concat_batches(&in_schema, &in_batches).expect("concat input");
2014        let mut writer = AvroWriter::new(Vec::<u8>::new(), original.schema().as_ref().clone())?;
2015        writer.write(&original)?;
2016        writer.finish()?;
2017        let bytes = writer.into_inner();
2018        let rt_reader = ReaderBuilder::new()
2019            .build(Cursor::new(bytes))
2020            .expect("build reader for round-trip buffer");
2021        let rt_schema = rt_reader.schema();
2022        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2023        let round_trip =
2024            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round-trip");
2025        assert_eq!(
2026            round_trip, original,
2027            "Round-trip batch mismatch for avro/repeated_no_annotation.avro"
2028        );
2029        Ok(())
2030    }
2031
2032    #[test]
2033    fn test_nested_record_type_reuse_roundtrip() -> Result<(), ArrowError> {
2034        let path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
2035            .join("test/data/nested_record_reuse.avro")
2036            .to_string_lossy()
2037            .into_owned();
2038        let in_file = File::open(&path).expect("open avro/nested_record_reuse.avro");
2039        let reader = ReaderBuilder::new()
2040            .build(BufReader::new(in_file))
2041            .expect("build reader for nested_record_reuse.avro");
2042        let in_schema = reader.schema();
2043        let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
2044        let input =
2045            arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
2046        let mut writer = AvroWriter::new(Vec::<u8>::new(), in_schema.as_ref().clone())?;
2047        writer.write(&input)?;
2048        writer.finish()?;
2049        let bytes = writer.into_inner();
2050        let rt_reader = ReaderBuilder::new()
2051            .build(Cursor::new(bytes))
2052            .expect("build round_trip reader");
2053        let rt_schema = rt_reader.schema();
2054        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2055        let round_trip =
2056            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
2057        assert_eq!(
2058            round_trip, input,
2059            "Round-trip batch mismatch for nested_record_reuse.avro"
2060        );
2061        Ok(())
2062    }
2063
2064    #[test]
2065    fn test_enum_type_reuse_roundtrip() -> Result<(), ArrowError> {
2066        let path =
2067            std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test/data/enum_reuse.avro");
2068        let rdr_file = std::fs::File::open(&path).expect("open test/data/enum_reuse.avro");
2069        let reader = ReaderBuilder::new()
2070            .build(std::io::BufReader::new(rdr_file))
2071            .expect("build reader for enum_reuse.avro");
2072        let in_schema = reader.schema();
2073        let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
2074        let original =
2075            arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
2076        let mut writer = AvroWriter::new(Vec::<u8>::new(), original.schema().as_ref().clone())?;
2077        writer.write(&original)?;
2078        writer.finish()?;
2079        let bytes = writer.into_inner();
2080        let rt_reader = ReaderBuilder::new()
2081            .build(std::io::Cursor::new(bytes))
2082            .expect("build round_trip reader");
2083        let rt_schema = rt_reader.schema();
2084        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2085        let round_trip =
2086            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
2087        assert_eq!(
2088            round_trip, original,
2089            "Avro enum type reuse round-trip mismatch"
2090        );
2091        Ok(())
2092    }
2093
2094    #[test]
2095    fn comprehensive_e2e_test_roundtrip() -> Result<(), ArrowError> {
2096        let path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
2097            .join("test/data/comprehensive_e2e.avro");
2098        let rdr_file = File::open(&path).expect("open test/data/comprehensive_e2e.avro");
2099        let reader = ReaderBuilder::new()
2100            .build(BufReader::new(rdr_file))
2101            .expect("build reader for comprehensive_e2e.avro");
2102        let in_schema = reader.schema();
2103        let in_batches = reader.collect::<Result<Vec<_>, _>>()?;
2104        let original =
2105            arrow::compute::concat_batches(&in_schema, &in_batches).expect("concat input");
2106        let sink: Vec<u8> = Vec::new();
2107        let mut writer = AvroWriter::new(sink, original.schema().as_ref().clone())?;
2108        writer.write(&original)?;
2109        writer.finish()?;
2110        let bytes = writer.into_inner();
2111        let rt_reader = ReaderBuilder::new()
2112            .build(Cursor::new(bytes))
2113            .expect("build round-trip reader");
2114        let rt_schema = rt_reader.schema();
2115        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2116        let roundtrip =
2117            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip");
2118        assert_eq!(
2119            roundtrip, original,
2120            "Round-trip batch mismatch for comprehensive_e2e.avro"
2121        );
2122        Ok(())
2123    }
2124
2125    #[test]
2126    fn test_roundtrip_new_time_encoders_writer() -> Result<(), ArrowError> {
2127        let schema = Schema::new(vec![
2128            Field::new("d32", DataType::Date32, false),
2129            Field::new("t32_ms", DataType::Time32(TimeUnit::Millisecond), false),
2130            Field::new("t64_us", DataType::Time64(TimeUnit::Microsecond), false),
2131            Field::new(
2132                "ts_ms",
2133                DataType::Timestamp(TimeUnit::Millisecond, None),
2134                false,
2135            ),
2136            Field::new(
2137                "ts_us",
2138                DataType::Timestamp(TimeUnit::Microsecond, None),
2139                false,
2140            ),
2141            Field::new(
2142                "ts_ns",
2143                DataType::Timestamp(TimeUnit::Nanosecond, None),
2144                false,
2145            ),
2146        ]);
2147        let d32 = Date32Array::from(vec![0, 1, -1]);
2148        let t32_ms: PrimitiveArray<Time32MillisecondType> =
2149            vec![0_i32, 12_345_i32, 86_399_999_i32].into();
2150        let t64_us: PrimitiveArray<Time64MicrosecondType> =
2151            vec![0_i64, 1_234_567_i64, 86_399_999_999_i64].into();
2152        let ts_ms: PrimitiveArray<TimestampMillisecondType> =
2153            vec![0_i64, -1_i64, 1_700_000_000_000_i64].into();
2154        let ts_us: PrimitiveArray<TimestampMicrosecondType> = vec![0_i64, 1_i64, -1_i64].into();
2155        let ts_ns: PrimitiveArray<TimestampNanosecondType> = vec![0_i64, 1_i64, -1_i64].into();
2156        let batch = RecordBatch::try_new(
2157            Arc::new(schema.clone()),
2158            vec![
2159                Arc::new(d32) as ArrayRef,
2160                Arc::new(t32_ms) as ArrayRef,
2161                Arc::new(t64_us) as ArrayRef,
2162                Arc::new(ts_ms) as ArrayRef,
2163                Arc::new(ts_us) as ArrayRef,
2164                Arc::new(ts_ns) as ArrayRef,
2165            ],
2166        )?;
2167        let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
2168        writer.write(&batch)?;
2169        writer.finish()?;
2170        let bytes = writer.into_inner();
2171        let rt_reader = ReaderBuilder::new()
2172            .build(std::io::Cursor::new(bytes))
2173            .expect("build reader for round-trip of new time encoders");
2174        let rt_schema = rt_reader.schema();
2175        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2176        let roundtrip =
2177            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip");
2178        assert_eq!(roundtrip, batch);
2179        Ok(())
2180    }
2181}