Skip to main content

arrow_avro/writer/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Avro writer implementation for the `arrow-avro` crate.
19//!
20//! # Overview
21//!
22//! Use this module to serialize Arrow [`arrow_array::RecordBatch`] values into Avro. Three output
23//! modes are supported:
24//!
25//! * **[`crate::writer::AvroWriter`]** — writes an **Object Container File (OCF)**: a self‑describing
26//!   file with header (schema JSON and metadata), optional compression, data blocks, and
27//!   sync markers. See Avro 1.11.1 "Object Container Files."
28//!   <https://avro.apache.org/docs/1.11.1/specification/#object-container-files>
29//!
30//! * **[`crate::writer::AvroStreamWriter`]** — writes a **Single Object Encoding (SOE) Stream** without
31//!   any container framing. This is useful when the schema is known out‑of‑band (i.e.,
32//!   via a registry) and you want minimal overhead.
33//!
34//! * **[`crate::writer::Encoder`]** — a row-by-row encoder that buffers encoded records into a single
35//!   contiguous byte buffer and returns per-row [`bytes::Bytes`] slices.
36//!   Ideal for publishing individual messages to Kafka, Pulsar, or other message queues
37//!   where each message must be a self-contained Avro payload.
38//!
39//! ## Which writer should you use?
40//!
41//! | Use Case | Recommended Type |
42//! |----------|------------------|
43//! | Write an OCF file to disk | [`crate::writer::AvroWriter`] |
44//! | Stream records continuously to a file/socket | [`crate::writer::AvroStreamWriter`] |
45//! | Publish individual records to Kafka/Pulsar | [`crate::writer::Encoder`] |
46//! | Need per-row byte slices for custom framing | [`crate::writer::Encoder`] |
47//!
48//! ## Per-Record Prefix Formats
49//!
50//! For [`crate::writer::AvroStreamWriter`] and [`crate::writer::Encoder`], each record is automatically prefixed
51//! based on the fingerprint strategy:
52//!
53//! | Strategy | Prefix | Use Case |
54//! |----------|--------|----------|
55//! | `FingerprintStrategy::Rabin` (default) | `0xC3 0x01` + 8-byte LE Rabin fingerprint | Standard Avro SOE |
56//! | `FingerprintStrategy::Id(id)` | `0x00` + 4-byte BE schema ID | [Confluent Schema Registry] |
57//! | `FingerprintStrategy::Id64(id)` | `0x00` + 8-byte BE schema ID | [Apicurio Registry] |
58//!
59//! [Confluent Schema Registry]: https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format
60//! [Apicurio Registry]: https://www.apicur.io/registry/docs/apicurio-registry/1.3.3.Final/getting-started/assembly-using-kafka-client-serdes.html#registry-serdes-types-avro-registry
61//!
62//! ## Choosing the Avro Schema
63//!
64//! By default, the writer converts your Arrow schema to Avro (including a top‑level record
65//! name). If you already have an Avro schema JSON you want to use verbatim, put it into the
66//! Arrow schema metadata under the [`SCHEMA_METADATA_KEY`](crate::schema::SCHEMA_METADATA_KEY)
67//! key before constructing the writer. The builder will use that schema instead of generating
68//! a new one.
69//!
70//! ## Compression
71//!
72//! For OCF ([`crate::writer::AvroWriter`]), you may enable a compression codec via
73//! [`crate::writer::WriterBuilder::with_compression`]. The chosen codec is written into the file header
74//! and used for subsequent blocks. SOE stream writing ([`crate::writer::AvroStreamWriter`], [`crate::writer::Encoder`])
75//! does not apply container‑level compression.
76//!
77//! # Examples
78//!
79//! ## Writing an OCF File
80//!
81//! ```
82//! use std::sync::Arc;
83//! use arrow_array::{ArrayRef, Int64Array, StringArray, RecordBatch};
84//! use arrow_schema::{DataType, Field, Schema};
85//! use arrow_avro::writer::AvroWriter;
86//!
87//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
88//! let schema = Schema::new(vec![
89//!     Field::new("id", DataType::Int64, false),
90//!     Field::new("name", DataType::Utf8, false),
91//! ]);
92//!
93//! let batch = RecordBatch::try_new(
94//!     Arc::new(schema.clone()),
95//!     vec![
96//!         Arc::new(Int64Array::from(vec![1, 2])) as ArrayRef,
97//!         Arc::new(StringArray::from(vec!["alice", "bob"])) as ArrayRef,
98//!     ],
99//! )?;
100//!
101//! let mut writer = AvroWriter::new(Vec::<u8>::new(), schema)?;
102//! writer.write(&batch)?;
103//! writer.finish()?;
104//! let bytes = writer.into_inner();
105//! assert!(!bytes.is_empty());
106//! # Ok(())
107//! # }
108//! ```
109//!
110//! ## Using the Row-by-Row Encoder for Message Queues
111//!
112//! ```
113//! use std::sync::Arc;
114//! use arrow_array::{ArrayRef, Int32Array, RecordBatch};
115//! use arrow_schema::{DataType, Field, Schema};
116//! use arrow_avro::writer::{WriterBuilder, format::AvroSoeFormat};
117//! use arrow_avro::schema::FingerprintStrategy;
118//!
119//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
120//! let schema = Schema::new(vec![Field::new("x", DataType::Int32, false)]);
121//! let batch = RecordBatch::try_new(
122//!     Arc::new(schema.clone()),
123//!     vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef],
124//! )?;
125//!
126//! // Build an Encoder with Confluent wire format (schema ID = 42)
127//! let mut encoder = WriterBuilder::new(schema)
128//!     .with_fingerprint_strategy(FingerprintStrategy::Id(42))
129//!     .build_encoder::<AvroSoeFormat>()?;
130//!
131//! encoder.encode(&batch)?;
132//!
133//! // Get the buffered rows (zero-copy views into a single backing buffer)
134//! let rows = encoder.flush();
135//! assert_eq!(rows.len(), 3);
136//!
137//! // Each row has Confluent wire format: magic byte + 4-byte schema ID + body
138//! for row in rows.iter() {
139//!     assert_eq!(row[0], 0x00); // Confluent magic byte
140//! }
141//! # Ok(())
142//! # }
143//! ```
144//!
145//! ---
146use crate::codec::AvroFieldBuilder;
147use crate::compression::CompressionCodec;
148use crate::errors::AvroError;
149use crate::schema::{
150    AvroSchema, Fingerprint, FingerprintAlgorithm, FingerprintStrategy, SCHEMA_METADATA_KEY,
151};
152use crate::writer::encoder::{RecordEncoder, RecordEncoderBuilder, write_long};
153use crate::writer::format::{AvroFormat, AvroOcfFormat, AvroSoeFormat};
154use arrow_array::RecordBatch;
155use arrow_schema::{Schema, SchemaRef};
156use bytes::{Bytes, BytesMut};
157use std::io::Write;
158use std::sync::Arc;
159
160/// Encodes `RecordBatch` into the Avro binary format.
161mod encoder;
162/// Logic for different Avro container file formats.
163pub mod format;
164
165/// A contiguous set of Avro encoded rows.
166///
167/// `EncodedRows` stores:
168/// - a single backing byte buffer (`bytes::Bytes`)
169/// - a `Vec<usize>` of row boundary offsets (length = `rows + 1`)
170///
171/// This lets callers get per-row payloads as zero-copy `Bytes` slices.
172///
173/// For compatibility with APIs that require owned `Vec<u8>`, use:
174/// `let vecs: Vec<Vec<u8>> = rows.iter().map(|b| b.to_vec()).collect();`
175#[derive(Debug, Clone)]
176pub struct EncodedRows {
177    data: Bytes,
178    offsets: Vec<usize>,
179}
180
181impl EncodedRows {
182    /// Create a new `EncodedRows` from a backing buffer and row boundary offsets.
183    ///
184    /// `offsets` must have length `rows + 1`, and be monotonically non-decreasing.
185    /// The last offset should equal `data.len()`.
186    pub fn new(data: Bytes, offsets: Vec<usize>) -> Self {
187        Self { data, offsets }
188    }
189
190    /// Returns the number of encoded rows stored in this container.
191    #[inline]
192    pub fn len(&self) -> usize {
193        self.offsets.len().saturating_sub(1)
194    }
195
196    /// Returns `true` if this container holds no encoded rows.
197    #[inline]
198    pub fn is_empty(&self) -> bool {
199        self.len() == 0
200    }
201
202    /// Returns a reference to the single contiguous backing buffer.
203    ///
204    /// This buffer contains the payloads of all rows concatenated together.
205    ///
206    /// # Note
207    ///
208    /// To access individual row payloads, prefer using [`Self::row`] or [`Self::iter`]
209    /// rather than slicing this buffer manually.
210    #[inline]
211    pub fn bytes(&self) -> &Bytes {
212        &self.data
213    }
214
215    /// Returns the row boundary offsets.
216    ///
217    /// The returned slice always has the length `self.len() + 1`. The `n`th row payload
218    /// corresponds to `bytes[offsets[n] ... offsets[n+1]]`.
219    #[inline]
220    pub fn offsets(&self) -> &[usize] {
221        &self.offsets
222    }
223
224    /// Return the `n`th row as a zero-copy `Bytes` slice.
225    ///
226    /// # Errors
227    ///
228    /// Returns an error if `n` is out of bounds or if the internal offsets are invalid
229    /// (e.g., offsets are not within the backing buffer).
230    ///
231    /// # Examples
232    ///
233    /// ```
234    /// use std::sync::Arc;
235    /// use arrow_array::{ArrayRef, Int32Array, RecordBatch};
236    /// use arrow_schema::{DataType, Field, Schema};
237    /// use arrow_avro::writer::WriterBuilder;
238    /// use arrow_avro::writer::format::AvroSoeFormat;
239    ///
240    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
241    /// let schema = Schema::new(vec![Field::new("x", DataType::Int32, false)]);
242    /// let batch = RecordBatch::try_new(
243    ///     Arc::new(schema.clone()),
244    ///     vec![Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef],
245    /// )?;
246    ///
247    /// let mut encoder = WriterBuilder::new(schema).build_encoder::<AvroSoeFormat>()?;
248    /// encoder.encode(&batch)?;
249    /// let rows = encoder.flush();
250    ///
251    /// assert_eq!(rows.iter().count(), 2);
252    /// # Ok(())
253    /// # }
254    /// ```
255    pub fn row(&self, n: usize) -> Result<Bytes, AvroError> {
256        if n >= self.len() {
257            return Err(AvroError::General(format!(
258                "Row index {n} out of bounds for len {}",
259                self.len()
260            )));
261        }
262        // SAFETY:
263        // self.len() is defined as self.offsets.len().saturating_sub(1).
264        // The check `n >= self.len()` above ensures that `n < self.offsets.len() - 1`.
265        // Therefore, both `n` and `n + 1` are strictly within the bounds of `self.offsets`.
266        let (start, end) = unsafe {
267            (
268                *self.offsets.get_unchecked(n),
269                *self.offsets.get_unchecked(n + 1),
270            )
271        };
272        if start > end || end > self.data.len() {
273            return Err(AvroError::General(format!(
274                "Invalid row offsets for row {n}: start={start}, end={end}, data_len={}",
275                self.data.len()
276            )));
277        }
278        Ok(self.data.slice(start..end))
279    }
280
281    /// Iterate over rows as zero-copy `Bytes` slices.
282    ///
283    /// This iterator is infallible and is intended for the common case where
284    /// `EncodedRows` is produced by [`Encoder::flush`], which guarantees valid offsets.
285    ///
286    /// # Examples
287    ///
288    /// ```
289    /// use std::sync::Arc;
290    /// use arrow_array::{ArrayRef, Int32Array, RecordBatch};
291    /// use arrow_schema::{DataType, Field, Schema};
292    /// use arrow_avro::writer::WriterBuilder;
293    /// use arrow_avro::writer::format::AvroSoeFormat;
294    ///
295    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
296    /// let schema = Schema::new(vec![Field::new("x", DataType::Int32, false)]);
297    /// let batch = RecordBatch::try_new(
298    ///     Arc::new(schema.clone()),
299    ///     vec![Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef],
300    /// )?;
301    ///
302    /// let mut encoder = WriterBuilder::new(schema).build_encoder::<AvroSoeFormat>()?;
303    /// encoder.encode(&batch)?;
304    /// let rows = encoder.flush();
305    ///
306    /// assert_eq!(rows.iter().count(), 2);
307    /// # Ok(())
308    /// # }
309    /// ```
310    #[inline]
311    pub fn iter(&self) -> impl ExactSizeIterator<Item = Bytes> + '_ {
312        self.offsets.windows(2).map(|w| self.data.slice(w[0]..w[1]))
313    }
314}
315
316/// Builder to configure and create a `Writer`.
317#[derive(Debug, Clone)]
318pub struct WriterBuilder {
319    schema: Schema,
320    codec: Option<CompressionCodec>,
321    row_capacity: Option<usize>,
322    capacity: usize,
323    fingerprint_strategy: Option<FingerprintStrategy>,
324}
325
326impl WriterBuilder {
327    /// Create a new builder with default settings.
328    ///
329    /// The Avro schema used for writing is determined as follows:
330    /// 1) If the Arrow schema metadata contains `avro::schema` (see `SCHEMA_METADATA_KEY`),
331    ///    that JSON is used verbatim.
332    /// 2) Otherwise, the Arrow schema is converted to an Avro record schema.
333    pub fn new(schema: Schema) -> Self {
334        Self {
335            schema,
336            codec: None,
337            row_capacity: None,
338            capacity: 1024,
339            fingerprint_strategy: None,
340        }
341    }
342
343    /// Set the fingerprinting strategy for the stream writer.
344    /// This determines the per-record prefix format.
345    pub fn with_fingerprint_strategy(mut self, strategy: FingerprintStrategy) -> Self {
346        self.fingerprint_strategy = Some(strategy);
347        self
348    }
349
350    /// Change the compression codec.
351    pub fn with_compression(mut self, codec: Option<CompressionCodec>) -> Self {
352        self.codec = codec;
353        self
354    }
355
356    /// Sets the expected capacity (in bytes) for internal buffers.
357    ///
358    /// This is used as a hint to pre-allocate staging buffers for writing.
359    pub fn with_capacity(mut self, capacity: usize) -> Self {
360        self.capacity = capacity;
361        self
362    }
363
364    /// Sets the expected byte size for each encoded row.
365    ///
366    /// This setting affects [`Encoder`] created via [`build_encoder`](Self::build_encoder).
367    /// It is used as a hint to reduce reallocations when the typical encoded row size is known.
368    pub fn with_row_capacity(mut self, capacity: usize) -> Self {
369        self.row_capacity = Some(capacity);
370        self
371    }
372
373    fn prepare_encoder<F: AvroFormat>(&self) -> Result<(Arc<Schema>, RecordEncoder), AvroError> {
374        let avro_schema = match self.schema.metadata.get(SCHEMA_METADATA_KEY) {
375            Some(json) => AvroSchema::new(json.clone()),
376            None => AvroSchema::try_from(&self.schema)?,
377        };
378        let maybe_fingerprint = if F::NEEDS_PREFIX {
379            match &self.fingerprint_strategy {
380                Some(FingerprintStrategy::Id(id)) => Some(Fingerprint::Id(*id)),
381                Some(FingerprintStrategy::Id64(id)) => Some(Fingerprint::Id64(*id)),
382                Some(strategy) => {
383                    Some(avro_schema.fingerprint(FingerprintAlgorithm::from(*strategy))?)
384                }
385                None => Some(
386                    avro_schema
387                        .fingerprint(FingerprintAlgorithm::from(FingerprintStrategy::Rabin))?,
388                ),
389            }
390        } else {
391            None
392        };
393        let mut md = self.schema.metadata().clone();
394        md.insert(
395            SCHEMA_METADATA_KEY.to_string(),
396            avro_schema.clone().json_string,
397        );
398        let schema = Arc::new(Schema::new_with_metadata(self.schema.fields().clone(), md));
399        let avro_root = AvroFieldBuilder::new(&avro_schema.schema()?).build()?;
400        let encoder = RecordEncoderBuilder::new(&avro_root, schema.as_ref())
401            .with_fingerprint(maybe_fingerprint)
402            .build()?;
403        Ok((schema, encoder))
404    }
405
406    /// Build a new [`Encoder`] for the given [`AvroFormat`].
407    ///
408    /// `Encoder` only supports stream formats (no OCF sync markers). Attempting to build an
409    /// encoder with an OCF format (e.g. [`AvroOcfFormat`]) will return an error.
410    pub fn build_encoder<F: AvroFormat>(self) -> Result<Encoder, AvroError> {
411        if F::default().sync_marker().is_some() {
412            return Err(AvroError::InvalidArgument(
413                "Encoder only supports stream formats (no OCF header/sync marker)".to_string(),
414            ));
415        }
416        let (schema, encoder) = self.prepare_encoder::<F>()?;
417        Ok(Encoder {
418            schema,
419            encoder,
420            row_capacity: self.row_capacity,
421            buffer: BytesMut::with_capacity(self.capacity),
422            offsets: vec![0],
423        })
424    }
425
426    /// Build a new [`Writer`] with the specified [`AvroFormat`] and builder options.
427    pub fn build<W, F>(self, mut writer: W) -> Result<Writer<W, F>, AvroError>
428    where
429        W: Write,
430        F: AvroFormat,
431    {
432        let mut format = F::default();
433        if format.sync_marker().is_none() && !F::NEEDS_PREFIX {
434            return Err(AvroError::InvalidArgument(
435                "AvroBinaryFormat is only supported with Encoder, use build_encoder instead"
436                    .to_string(),
437            ));
438        }
439        let (schema, encoder) = self.prepare_encoder::<F>()?;
440        format.start_stream(&mut writer, &schema, self.codec)?;
441        Ok(Writer {
442            writer,
443            schema,
444            format,
445            compression: self.codec,
446            capacity: self.capacity,
447            encoder,
448        })
449    }
450}
451
452/// A row-by-row encoder for Avro *stream/message* formats (SOE / registry wire formats / raw binary).
453///
454/// Unlike [`Writer`], which emits a single continuous byte stream to a [`std::io::Write`] sink,
455/// `Encoder` tracks row boundaries during encoding and returns an [`EncodedRows`] containing:
456/// - one backing buffer (`Bytes`)
457/// - row boundary offsets
458///
459/// This enables zero-copy per-row payloads (for instance, one Kafka message per Arrow row) without
460/// re-encoding or decoding the byte stream to recover record boundaries.
461///
462/// ### Example
463///
464/// ```
465/// use std::sync::Arc;
466/// use arrow_array::{ArrayRef, Int32Array, RecordBatch};
467/// use arrow_schema::{DataType, Field, Schema};
468/// use arrow_avro::writer::{WriterBuilder, format::AvroSoeFormat};
469/// use arrow_avro::schema::FingerprintStrategy;
470///
471/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
472/// let schema = Schema::new(vec![Field::new("value", DataType::Int32, false)]);
473/// let batch = RecordBatch::try_new(
474///     Arc::new(schema.clone()),
475///     vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef],
476/// )?;
477///
478/// // Configure the encoder (here: Confluent Wire Format with schema ID 100)
479/// let mut encoder = WriterBuilder::new(schema)
480///     .with_fingerprint_strategy(FingerprintStrategy::Id(100))
481///     .build_encoder::<AvroSoeFormat>()?;
482///
483/// // Encode the batch
484/// encoder.encode(&batch)?;
485///
486/// // Get the encoded rows
487/// let rows = encoder.flush();
488///
489/// // Convert to owned Vec<u8> payloads (e.g., for a Kafka producer)
490/// let payloads: Vec<Vec<u8>> = rows.iter().map(|row| row.to_vec()).collect();
491///
492/// assert_eq!(payloads.len(), 3);
493/// assert_eq!(payloads[0][0], 0x00); // Magic byte
494/// # Ok(())
495/// # }
496/// ```
497#[derive(Debug)]
498pub struct Encoder {
499    schema: SchemaRef,
500    encoder: RecordEncoder,
501    row_capacity: Option<usize>,
502    buffer: BytesMut,
503    offsets: Vec<usize>,
504}
505
506impl Encoder {
507    /// Serialize one [`RecordBatch`] into the internal buffer.
508    pub fn encode(&mut self, batch: &RecordBatch) -> Result<(), AvroError> {
509        if batch.schema().fields() != self.schema.fields() {
510            return Err(AvroError::SchemaError(
511                "Schema of RecordBatch differs from Writer schema".to_string(),
512            ));
513        }
514        self.encoder.encode_rows(
515            batch,
516            self.row_capacity.unwrap_or(0),
517            &mut self.buffer,
518            &mut self.offsets,
519        )?;
520        Ok(())
521    }
522
523    /// A convenience method to write a slice of [`RecordBatch`] values.
524    pub fn encode_batches(&mut self, batches: &[RecordBatch]) -> Result<(), AvroError> {
525        for b in batches {
526            self.encode(b)?;
527        }
528        Ok(())
529    }
530
531    /// Drain and return all currently buffered encoded rows.
532    ///
533    /// The returned [`EncodedRows`] provides per-row payloads as `Bytes` slices.
534    pub fn flush(&mut self) -> EncodedRows {
535        let data = self.buffer.split().freeze();
536        let mut offsets = Vec::with_capacity(self.offsets.len());
537        offsets.append(&mut self.offsets);
538        self.offsets.push(0);
539        EncodedRows::new(data, offsets)
540    }
541
542    /// Returns the Arrow schema used by this encoder.
543    ///
544    /// The returned schema includes metadata with the Avro schema JSON under
545    /// the `avro.schema` key.
546    pub fn schema(&self) -> SchemaRef {
547        self.schema.clone()
548    }
549
550    /// Returns the number of encoded rows currently buffered.
551    pub fn buffered_len(&self) -> usize {
552        self.offsets.len().saturating_sub(1)
553    }
554}
555
556/// Generic Avro writer.
557///
558/// This type is generic over the output Write sink (`W`) and the Avro format (`F`).
559/// You’ll usually use the concrete aliases:
560///
561/// * **[`AvroWriter`]** for **OCF** (self‑describing container file)
562/// * **[`AvroStreamWriter`]** for **SOE** Avro streams
563#[derive(Debug)]
564pub struct Writer<W: Write, F: AvroFormat> {
565    writer: W,
566    schema: SchemaRef,
567    format: F,
568    compression: Option<CompressionCodec>,
569    capacity: usize,
570    encoder: RecordEncoder,
571}
572
573/// Alias for an Avro **Object Container File** writer.
574///
575/// ### Quickstart (runnable)
576///
577/// ```
578/// use std::io::Cursor;
579/// use std::sync::Arc;
580/// use arrow_array::{ArrayRef, Int64Array, StringArray, RecordBatch};
581/// use arrow_schema::{DataType, Field, Schema};
582/// use arrow_avro::writer::AvroWriter;
583/// use arrow_avro::reader::ReaderBuilder;
584///
585/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
586/// // Writer schema: { id: long, name: string }
587/// let writer_schema = Schema::new(vec![
588///     Field::new("id", DataType::Int64, false),
589///     Field::new("name", DataType::Utf8, false),
590/// ]);
591///
592/// // Build a RecordBatch with two rows
593/// let batch = RecordBatch::try_new(
594///     Arc::new(writer_schema.clone()),
595///     vec![
596///         Arc::new(Int64Array::from(vec![1, 2])) as ArrayRef,
597///         Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
598///     ],
599/// )?;
600///
601/// // Write an Avro **Object Container File** (OCF) to memory
602/// let mut w = AvroWriter::new(Vec::<u8>::new(), writer_schema.clone())?;
603/// w.write(&batch)?;
604/// w.finish()?;
605/// let bytes = w.into_inner();
606///
607/// // Build a Reader and decode the batch back
608/// let mut r = ReaderBuilder::new().build(Cursor::new(bytes))?;
609/// let out = r.next().unwrap()?;
610/// assert_eq!(out.num_rows(), 2);
611/// # Ok(()) }
612/// ```
613pub type AvroWriter<W> = Writer<W, AvroOcfFormat>;
614
615/// Alias for an Avro **Single Object Encoding** stream writer.
616///
617/// ### Example
618///
619/// This writer automatically adds the appropriate per-record prefix (based on the
620/// fingerprint strategy) before the Avro body of each record. The default is Single
621/// Object Encoding (SOE) with a Rabin fingerprint.
622///
623/// ```
624/// use std::sync::Arc;
625/// use arrow_array::{ArrayRef, Int64Array, RecordBatch};
626/// use arrow_schema::{DataType, Field, Schema};
627/// use arrow_avro::writer::AvroStreamWriter;
628///
629/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
630/// // One‑column Arrow batch
631/// let schema = Schema::new(vec![Field::new("x", DataType::Int64, false)]);
632/// let batch = RecordBatch::try_new(
633///     Arc::new(schema.clone()),
634///     vec![Arc::new(Int64Array::from(vec![10, 20])) as ArrayRef],
635/// )?;
636///
637/// // Write an Avro Single Object Encoding stream to a Vec<u8>
638/// let sink: Vec<u8> = Vec::new();
639/// let mut w = AvroStreamWriter::new(sink, schema)?;
640/// w.write(&batch)?;
641/// w.finish()?;
642/// let bytes = w.into_inner();
643/// assert!(!bytes.is_empty());
644/// # Ok(()) }
645/// ```
646pub type AvroStreamWriter<W> = Writer<W, AvroSoeFormat>;
647
648impl<W: Write> Writer<W, AvroOcfFormat> {
649    /// Convenience constructor – same as [`WriterBuilder::build`] with `AvroOcfFormat`.
650    ///
651    /// ### Example
652    ///
653    /// ```
654    /// use std::sync::Arc;
655    /// use arrow_array::{ArrayRef, Int32Array, RecordBatch};
656    /// use arrow_schema::{DataType, Field, Schema};
657    /// use arrow_avro::writer::AvroWriter;
658    ///
659    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
660    /// let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
661    /// let batch = RecordBatch::try_new(
662    ///     Arc::new(schema.clone()),
663    ///     vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef],
664    /// )?;
665    ///
666    /// let buf: Vec<u8> = Vec::new();
667    /// let mut w = AvroWriter::new(buf, schema)?;
668    /// w.write(&batch)?;
669    /// w.finish()?;
670    /// let bytes = w.into_inner();
671    /// assert!(!bytes.is_empty());
672    /// # Ok(()) }
673    /// ```
674    pub fn new(writer: W, schema: Schema) -> Result<Self, AvroError> {
675        WriterBuilder::new(schema).build::<W, AvroOcfFormat>(writer)
676    }
677
678    /// Return a reference to the 16‑byte sync marker generated for this file.
679    pub fn sync_marker(&self) -> Option<&[u8; 16]> {
680        self.format.sync_marker()
681    }
682}
683
684impl<W: Write> Writer<W, AvroSoeFormat> {
685    /// Convenience constructor to create a new [`AvroStreamWriter`].
686    ///
687    /// The resulting stream contains **Single Object Encodings** (no OCF header/sync).
688    ///
689    /// ### Example
690    ///
691    /// ```
692    /// use std::sync::Arc;
693    /// use arrow_array::{ArrayRef, Int64Array, RecordBatch};
694    /// use arrow_schema::{DataType, Field, Schema};
695    /// use arrow_avro::writer::AvroStreamWriter;
696    ///
697    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
698    /// let schema = Schema::new(vec![Field::new("x", DataType::Int64, false)]);
699    /// let batch = RecordBatch::try_new(
700    ///     Arc::new(schema.clone()),
701    ///     vec![Arc::new(Int64Array::from(vec![10, 20])) as ArrayRef],
702    /// )?;
703    ///
704    /// let sink: Vec<u8> = Vec::new();
705    /// let mut w = AvroStreamWriter::new(sink, schema)?;
706    /// w.write(&batch)?;
707    /// w.finish()?;
708    /// let bytes = w.into_inner();
709    /// assert!(!bytes.is_empty());
710    /// # Ok(()) }
711    /// ```
712    pub fn new(writer: W, schema: Schema) -> Result<Self, AvroError> {
713        WriterBuilder::new(schema).build::<W, AvroSoeFormat>(writer)
714    }
715}
716
717impl<W: Write, F: AvroFormat> Writer<W, F> {
718    /// Serialize one [`RecordBatch`] to the output.
719    pub fn write(&mut self, batch: &RecordBatch) -> Result<(), AvroError> {
720        if batch.schema().fields() != self.schema.fields() {
721            return Err(AvroError::SchemaError(
722                "Schema of RecordBatch differs from Writer schema".to_string(),
723            ));
724        }
725        match self.format.sync_marker() {
726            Some(&sync) => self.write_ocf_block(batch, &sync),
727            None => self.write_stream(batch),
728        }
729    }
730
731    /// A convenience method to write a slice of [`RecordBatch`].
732    ///
733    /// This is equivalent to calling `write` for each batch in the slice.
734    pub fn write_batches(&mut self, batches: &[&RecordBatch]) -> Result<(), AvroError> {
735        for b in batches {
736            self.write(b)?;
737        }
738        Ok(())
739    }
740
741    /// Flush remaining buffered data and (for OCF) ensure the header is present.
742    pub fn finish(&mut self) -> Result<(), AvroError> {
743        self.writer
744            .flush()
745            .map_err(|e| AvroError::IoError(format!("Error flushing writer: {e}"), e))
746    }
747
748    /// Consume the writer, returning the underlying output object.
749    pub fn into_inner(self) -> W {
750        self.writer
751    }
752
753    fn write_ocf_block(&mut self, batch: &RecordBatch, sync: &[u8; 16]) -> Result<(), AvroError> {
754        let mut buf = Vec::<u8>::with_capacity(self.capacity);
755        self.encoder.encode(&mut buf, batch)?;
756        let encoded = match self.compression {
757            Some(codec) => codec.compress(&buf)?,
758            None => buf,
759        };
760        write_long(&mut self.writer, batch.num_rows() as i64)?;
761        write_long(&mut self.writer, encoded.len() as i64)?;
762        self.writer
763            .write_all(&encoded)
764            .map_err(|e| AvroError::IoError(format!("Error writing Avro block: {e}"), e))?;
765        self.writer
766            .write_all(sync)
767            .map_err(|e| AvroError::IoError(format!("Error writing Avro sync: {e}"), e))?;
768        Ok(())
769    }
770
771    fn write_stream(&mut self, batch: &RecordBatch) -> Result<(), AvroError> {
772        self.encoder.encode(&mut self.writer, batch)?;
773        Ok(())
774    }
775}
776
777#[cfg(test)]
778mod tests {
779    use super::*;
780    use crate::compression::CompressionCodec;
781    use crate::reader::ReaderBuilder;
782    use crate::schema::AVRO_NAME_METADATA_KEY;
783    use crate::schema::{AvroSchema, SchemaStore};
784    use crate::test_util::arrow_test_data;
785    use arrow::datatypes::TimeUnit;
786    use arrow::util::pretty::pretty_format_batches;
787    #[cfg(not(feature = "avro_custom_types"))]
788    use arrow_array::Float32Array;
789    #[cfg(feature = "avro_custom_types")]
790    use arrow_array::RunArray;
791    use arrow_array::builder::{Int32Builder, ListBuilder};
792    use arrow_array::cast::AsArray;
793    #[cfg(feature = "avro_custom_types")]
794    use arrow_array::types::{Int16Type, Int64Type};
795    use arrow_array::types::{
796        Int32Type, Time32MillisecondType, Time64MicrosecondType, TimestampMicrosecondType,
797        TimestampMillisecondType, TimestampNanosecondType,
798    };
799    use arrow_array::{
800        Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Date64Array, Float16Array,
801        Int8Array, Int16Array, Int32Array, Int64Array, IntervalDayTimeArray,
802        IntervalMonthDayNanoArray, IntervalYearMonthArray, PrimitiveArray, RecordBatch,
803        StringArray, StructArray, Time32MillisecondArray, Time32SecondArray,
804        Time64MicrosecondArray, Time64NanosecondArray, TimestampMillisecondArray,
805        TimestampSecondArray, UInt8Array, UInt16Array, UInt32Array, UInt64Array, UnionArray,
806    };
807    use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano};
808    #[cfg(not(feature = "avro_custom_types"))]
809    use arrow_schema::{DataType, Field, Schema};
810    #[cfg(feature = "avro_custom_types")]
811    use arrow_schema::{DataType, Field, Schema};
812    use arrow_schema::{IntervalUnit, UnionMode};
813    use bytes::BytesMut;
814    use half::f16;
815    use serde_json::{Value, json};
816    use std::collections::HashMap;
817    use std::collections::HashSet;
818    use std::fs::File;
819    use std::io::{BufReader, Cursor};
820    use std::path::PathBuf;
821    use std::sync::Arc;
822    use tempfile::NamedTempFile;
823
824    fn files() -> impl Iterator<Item = &'static str> {
825        [
826            // TODO: avoid requiring snappy for this file
827            #[cfg(feature = "snappy")]
828            "avro/alltypes_plain.avro",
829            #[cfg(feature = "snappy")]
830            "avro/alltypes_plain.snappy.avro",
831            #[cfg(feature = "zstd")]
832            "avro/alltypes_plain.zstandard.avro",
833            #[cfg(feature = "bzip2")]
834            "avro/alltypes_plain.bzip2.avro",
835            #[cfg(feature = "xz")]
836            "avro/alltypes_plain.xz.avro",
837        ]
838        .into_iter()
839    }
840
841    fn make_schema() -> Schema {
842        Schema::new(vec![
843            Field::new("id", DataType::Int32, false),
844            Field::new("name", DataType::Binary, false),
845        ])
846    }
847
848    fn make_batch() -> RecordBatch {
849        let ids = Int32Array::from(vec![1, 2, 3]);
850        let names = BinaryArray::from_vec(vec![b"a".as_ref(), b"b".as_ref(), b"c".as_ref()]);
851        RecordBatch::try_new(
852            Arc::new(make_schema()),
853            vec![Arc::new(ids) as ArrayRef, Arc::new(names) as ArrayRef],
854        )
855        .expect("failed to build test RecordBatch")
856    }
857
858    #[test]
859    fn test_stream_writer_writes_prefix_per_row_rt() -> Result<(), AvroError> {
860        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
861        let batch = RecordBatch::try_new(
862            Arc::new(schema.clone()),
863            vec![Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef],
864        )?;
865        let buf: Vec<u8> = Vec::new();
866        let mut writer = AvroStreamWriter::new(buf, schema.clone())?;
867        writer.write(&batch)?;
868        let encoded = writer.into_inner();
869        let mut store = SchemaStore::new(); // Rabin by default
870        let avro_schema = AvroSchema::try_from(&schema)?;
871        let _fp = store.register(avro_schema)?;
872        let mut decoder = ReaderBuilder::new()
873            .with_writer_schema_store(store)
874            .build_decoder()?;
875        let _consumed = decoder.decode(&encoded)?;
876        let decoded = decoder
877            .flush()?
878            .expect("expected at least one batch from decoder");
879        assert_eq!(decoded.num_columns(), 1);
880        assert_eq!(decoded.num_rows(), 2);
881        let col = decoded.column(0).as_primitive::<Int32Type>();
882        assert_eq!(col, &Int32Array::from(vec![10, 20]));
883        Ok(())
884    }
885
886    #[test]
887    fn test_nullable_struct_with_nonnullable_field_sliced_encoding() {
888        use arrow_array::{ArrayRef, Int32Array, StringArray, StructArray};
889        use arrow_buffer::NullBuffer;
890        use arrow_schema::{DataType, Field, Fields, Schema};
891        use std::sync::Arc;
892        let inner_fields = Fields::from(vec![
893            Field::new("id", DataType::Int32, false), // non-nullable
894            Field::new("name", DataType::Utf8, true), // nullable
895        ]);
896        let inner_struct_type = DataType::Struct(inner_fields.clone());
897        let schema = Schema::new(vec![
898            Field::new("before", inner_struct_type.clone(), true), // nullable struct
899            Field::new("after", inner_struct_type.clone(), true),  // nullable struct
900            Field::new("op", DataType::Utf8, false),               // non-nullable
901        ]);
902        let before_ids = Int32Array::from(vec![None, None]);
903        let before_names = StringArray::from(vec![None::<&str>, None]);
904        let before_struct = StructArray::new(
905            inner_fields.clone(),
906            vec![
907                Arc::new(before_ids) as ArrayRef,
908                Arc::new(before_names) as ArrayRef,
909            ],
910            Some(NullBuffer::from(vec![false, false])),
911        );
912        let after_ids = Int32Array::from(vec![1, 2]); // non-nullable, no nulls
913        let after_names = StringArray::from(vec![Some("Alice"), Some("Bob")]);
914        let after_struct = StructArray::new(
915            inner_fields.clone(),
916            vec![
917                Arc::new(after_ids) as ArrayRef,
918                Arc::new(after_names) as ArrayRef,
919            ],
920            Some(NullBuffer::from(vec![true, true])),
921        );
922        let op_col = StringArray::from(vec!["r", "r"]);
923        let batch = RecordBatch::try_new(
924            Arc::new(schema.clone()),
925            vec![
926                Arc::new(before_struct) as ArrayRef,
927                Arc::new(after_struct) as ArrayRef,
928                Arc::new(op_col) as ArrayRef,
929            ],
930        )
931        .expect("failed to create test batch");
932        let mut sink = Vec::new();
933        let mut writer = WriterBuilder::new(schema)
934            .with_fingerprint_strategy(FingerprintStrategy::Id(1))
935            .build::<_, AvroSoeFormat>(&mut sink)
936            .expect("failed to create writer");
937        for row_idx in 0..batch.num_rows() {
938            let single_row = batch.slice(row_idx, 1);
939            let after_col = single_row.column(1);
940            assert_eq!(
941                after_col.null_count(),
942                0,
943                "after column should have no nulls in sliced row"
944            );
945            writer
946                .write(&single_row)
947                .unwrap_or_else(|e| panic!("Failed to encode row {row_idx}: {e}"));
948        }
949        writer.finish().expect("failed to finish writer");
950        assert!(!sink.is_empty(), "encoded output should not be empty");
951    }
952
953    #[test]
954    fn test_nullable_struct_with_decimal_and_timestamp_sliced() {
955        use arrow_array::{
956            ArrayRef, Decimal128Array, Int32Array, StringArray, StructArray,
957            TimestampMicrosecondArray,
958        };
959        use arrow_buffer::NullBuffer;
960        use arrow_schema::{DataType, Field, Fields, Schema};
961        use std::sync::Arc;
962        let row_fields = Fields::from(vec![
963            Field::new("id", DataType::Int32, false),
964            Field::new("name", DataType::Utf8, true),
965            Field::new("category", DataType::Utf8, true),
966            Field::new("price", DataType::Decimal128(10, 2), true),
967            Field::new("stock_quantity", DataType::Int32, true),
968            Field::new(
969                "created_at",
970                DataType::Timestamp(TimeUnit::Microsecond, None),
971                true,
972            ),
973        ]);
974        let row_struct_type = DataType::Struct(row_fields.clone());
975        let schema = Schema::new(vec![
976            Field::new("before", row_struct_type.clone(), true),
977            Field::new("after", row_struct_type.clone(), true),
978            Field::new("op", DataType::Utf8, false),
979        ]);
980        let before_struct = StructArray::new_null(row_fields.clone(), 2);
981        let ids = Int32Array::from(vec![1, 2]);
982        let names = StringArray::from(vec![Some("Widget"), Some("Gadget")]);
983        let categories = StringArray::from(vec![Some("Electronics"), Some("Electronics")]);
984        let prices = Decimal128Array::from(vec![Some(1999), Some(2999)])
985            .with_precision_and_scale(10, 2)
986            .unwrap();
987        let quantities = Int32Array::from(vec![Some(100), Some(50)]);
988        let timestamps = TimestampMicrosecondArray::from(vec![
989            Some(1700000000000000i64),
990            Some(1700000001000000i64),
991        ]);
992        let after_struct = StructArray::new(
993            row_fields.clone(),
994            vec![
995                Arc::new(ids) as ArrayRef,
996                Arc::new(names) as ArrayRef,
997                Arc::new(categories) as ArrayRef,
998                Arc::new(prices) as ArrayRef,
999                Arc::new(quantities) as ArrayRef,
1000                Arc::new(timestamps) as ArrayRef,
1001            ],
1002            Some(NullBuffer::from(vec![true, true])),
1003        );
1004        let op_col = StringArray::from(vec!["r", "r"]);
1005        let batch = RecordBatch::try_new(
1006            Arc::new(schema.clone()),
1007            vec![
1008                Arc::new(before_struct) as ArrayRef,
1009                Arc::new(after_struct) as ArrayRef,
1010                Arc::new(op_col) as ArrayRef,
1011            ],
1012        )
1013        .expect("failed to create products batch");
1014        let mut sink = Vec::new();
1015        let mut writer = WriterBuilder::new(schema)
1016            .with_fingerprint_strategy(FingerprintStrategy::Id(1))
1017            .build::<_, AvroSoeFormat>(&mut sink)
1018            .expect("failed to create writer");
1019        // Encode row by row
1020        for row_idx in 0..batch.num_rows() {
1021            let single_row = batch.slice(row_idx, 1);
1022            writer
1023                .write(&single_row)
1024                .unwrap_or_else(|e| panic!("Failed to encode product row {row_idx}: {e}"));
1025        }
1026        writer.finish().expect("failed to finish writer");
1027        assert!(!sink.is_empty());
1028    }
1029
1030    #[test]
1031    fn non_nullable_child_in_nullable_struct_should_encode_per_row() {
1032        use arrow_array::{
1033            ArrayRef, Int32Array, Int64Array, RecordBatch, StringArray, StructArray,
1034        };
1035        use arrow_schema::{DataType, Field, Fields, Schema};
1036        use std::sync::Arc;
1037        let row_fields = Fields::from(vec![
1038            Field::new("id", DataType::Int32, false),
1039            Field::new("name", DataType::Utf8, true),
1040        ]);
1041        let row_struct_dt = DataType::Struct(row_fields.clone());
1042        let before: ArrayRef = Arc::new(StructArray::new_null(row_fields.clone(), 1));
1043        let id_col: ArrayRef = Arc::new(Int32Array::from(vec![1]));
1044        let name_col: ArrayRef = Arc::new(StringArray::from(vec![None::<&str>]));
1045        let after: ArrayRef = Arc::new(StructArray::new(
1046            row_fields.clone(),
1047            vec![id_col, name_col],
1048            None,
1049        ));
1050        let schema = Arc::new(Schema::new(vec![
1051            Field::new("before", row_struct_dt.clone(), true),
1052            Field::new("after", row_struct_dt, true),
1053            Field::new("op", DataType::Utf8, false),
1054            Field::new("ts_ms", DataType::Int64, false),
1055        ]));
1056        let op = Arc::new(StringArray::from(vec!["r"])) as ArrayRef;
1057        let ts_ms = Arc::new(Int64Array::from(vec![1732900000000_i64])) as ArrayRef;
1058        let batch = RecordBatch::try_new(schema.clone(), vec![before, after, op, ts_ms]).unwrap();
1059        let mut buf = Vec::new();
1060        let mut writer = WriterBuilder::new(schema.as_ref().clone())
1061            .build::<_, AvroSoeFormat>(&mut buf)
1062            .unwrap();
1063        let single = batch.slice(0, 1);
1064        let res = writer.write(&single);
1065        assert!(
1066            res.is_ok(),
1067            "expected to encode successfully, got: {:?}",
1068            res.err()
1069        );
1070    }
1071
1072    #[test]
1073    fn test_union_nonzero_type_ids() -> Result<(), AvroError> {
1074        use arrow_array::UnionArray;
1075        use arrow_buffer::Buffer;
1076        use arrow_schema::UnionFields;
1077        let union_fields = UnionFields::try_new(
1078            vec![2, 5],
1079            vec![
1080                Field::new("v_str", DataType::Utf8, true),
1081                Field::new("v_int", DataType::Int32, true),
1082            ],
1083        )
1084        .unwrap();
1085        let strings = StringArray::from(vec!["hello", "world"]);
1086        let ints = Int32Array::from(vec![10, 20, 30]);
1087        let type_ids = Buffer::from_slice_ref([2_i8, 5, 5, 2, 5]);
1088        let offsets = Buffer::from_slice_ref([0_i32, 0, 1, 1, 2]);
1089        let union_array = UnionArray::try_new(
1090            union_fields.clone(),
1091            type_ids.into(),
1092            Some(offsets.into()),
1093            vec![Arc::new(strings) as ArrayRef, Arc::new(ints) as ArrayRef],
1094        )?;
1095        let schema = Schema::new(vec![Field::new(
1096            "union_col",
1097            DataType::Union(union_fields, UnionMode::Dense),
1098            false,
1099        )]);
1100        let batch = RecordBatch::try_new(
1101            Arc::new(schema.clone()),
1102            vec![Arc::new(union_array) as ArrayRef],
1103        )?;
1104        let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
1105        assert!(
1106            writer.write(&batch).is_ok(),
1107            "Expected no error from writing"
1108        );
1109        writer.finish()?;
1110        assert!(
1111            writer.finish().is_ok(),
1112            "Expected no error from finishing writer"
1113        );
1114        Ok(())
1115    }
1116
1117    #[test]
1118    fn test_stream_writer_with_id_fingerprint_rt() -> Result<(), AvroError> {
1119        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1120        let batch = RecordBatch::try_new(
1121            Arc::new(schema.clone()),
1122            vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef],
1123        )?;
1124        let schema_id: u32 = 42;
1125        let mut writer = WriterBuilder::new(schema.clone())
1126            .with_fingerprint_strategy(FingerprintStrategy::Id(schema_id))
1127            .build::<_, AvroSoeFormat>(Vec::new())?;
1128        writer.write(&batch)?;
1129        let encoded = writer.into_inner();
1130        let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id);
1131        let avro_schema = AvroSchema::try_from(&schema)?;
1132        let _ = store.set(Fingerprint::Id(schema_id), avro_schema)?;
1133        let mut decoder = ReaderBuilder::new()
1134            .with_writer_schema_store(store)
1135            .build_decoder()?;
1136        let _ = decoder.decode(&encoded)?;
1137        let decoded = decoder
1138            .flush()?
1139            .expect("expected at least one batch from decoder");
1140        assert_eq!(decoded.num_columns(), 1);
1141        assert_eq!(decoded.num_rows(), 3);
1142        let col = decoded.column(0).as_primitive::<Int32Type>();
1143        assert_eq!(col, &Int32Array::from(vec![1, 2, 3]));
1144        Ok(())
1145    }
1146
1147    #[test]
1148    fn test_stream_writer_with_id64_fingerprint_rt() -> Result<(), AvroError> {
1149        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1150        let batch = RecordBatch::try_new(
1151            Arc::new(schema.clone()),
1152            vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef],
1153        )?;
1154        let schema_id: u64 = 42;
1155        let mut writer = WriterBuilder::new(schema.clone())
1156            .with_fingerprint_strategy(FingerprintStrategy::Id64(schema_id))
1157            .build::<_, AvroSoeFormat>(Vec::new())?;
1158        writer.write(&batch)?;
1159        let encoded = writer.into_inner();
1160        let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id64);
1161        let avro_schema = AvroSchema::try_from(&schema)?;
1162        let _ = store.set(Fingerprint::Id64(schema_id), avro_schema)?;
1163        let mut decoder = ReaderBuilder::new()
1164            .with_writer_schema_store(store)
1165            .build_decoder()?;
1166        let _ = decoder.decode(&encoded)?;
1167        let decoded = decoder
1168            .flush()?
1169            .expect("expected at least one batch from decoder");
1170        assert_eq!(decoded.num_columns(), 1);
1171        assert_eq!(decoded.num_rows(), 3);
1172        let col = decoded.column(0).as_primitive::<Int32Type>();
1173        assert_eq!(col, &Int32Array::from(vec![1, 2, 3]));
1174        Ok(())
1175    }
1176
1177    #[test]
1178    fn test_ocf_writer_generates_header_and_sync() -> Result<(), AvroError> {
1179        let batch = make_batch();
1180        let buffer: Vec<u8> = Vec::new();
1181        let mut writer = AvroWriter::new(buffer, make_schema())?;
1182        writer.write(&batch)?;
1183        writer.finish()?;
1184        let out = writer.into_inner();
1185        assert_eq!(&out[..4], b"Obj\x01", "OCF magic bytes missing/incorrect");
1186        let trailer = &out[out.len() - 16..];
1187        assert_eq!(trailer.len(), 16, "expected 16‑byte sync marker");
1188        Ok(())
1189    }
1190
1191    #[test]
1192    fn test_schema_mismatch_yields_error() {
1193        let batch = make_batch();
1194        let alt_schema = Schema::new(vec![Field::new("x", DataType::Int32, false)]);
1195        let buffer = Vec::<u8>::new();
1196        let mut writer = AvroWriter::new(buffer, alt_schema).unwrap();
1197        let err = writer.write(&batch).unwrap_err();
1198        assert!(matches!(err, AvroError::SchemaError(_)));
1199    }
1200
1201    #[test]
1202    fn test_write_batches_accumulates_multiple() -> Result<(), AvroError> {
1203        let batch1 = make_batch();
1204        let batch2 = make_batch();
1205        let buffer = Vec::<u8>::new();
1206        let mut writer = AvroWriter::new(buffer, make_schema())?;
1207        writer.write_batches(&[&batch1, &batch2])?;
1208        writer.finish()?;
1209        let out = writer.into_inner();
1210        assert!(out.len() > 4, "combined batches produced tiny file");
1211        Ok(())
1212    }
1213
1214    #[test]
1215    fn test_finish_without_write_adds_header() -> Result<(), AvroError> {
1216        let buffer = Vec::<u8>::new();
1217        let mut writer = AvroWriter::new(buffer, make_schema())?;
1218        writer.finish()?;
1219        let out = writer.into_inner();
1220        assert_eq!(&out[..4], b"Obj\x01", "finish() should emit OCF header");
1221        Ok(())
1222    }
1223
1224    #[test]
1225    fn test_write_long_encodes_zigzag_varint() -> Result<(), AvroError> {
1226        let mut buf = Vec::new();
1227        write_long(&mut buf, 0)?;
1228        write_long(&mut buf, -1)?;
1229        write_long(&mut buf, 1)?;
1230        write_long(&mut buf, -2)?;
1231        write_long(&mut buf, 2147483647)?;
1232        assert!(
1233            buf.starts_with(&[0x00, 0x01, 0x02, 0x03]),
1234            "zig‑zag varint encodings incorrect: {buf:?}"
1235        );
1236        Ok(())
1237    }
1238
1239    #[test]
1240    fn test_roundtrip_alltypes_roundtrip_writer() -> Result<(), AvroError> {
1241        for rel in files() {
1242            let path = arrow_test_data(rel);
1243            let rdr_file = File::open(&path).expect("open input avro");
1244            let reader = ReaderBuilder::new()
1245                .build(BufReader::new(rdr_file))
1246                .expect("build reader");
1247            let schema = reader.schema();
1248            let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
1249            let original =
1250                arrow::compute::concat_batches(&schema, &input_batches).expect("concat input");
1251            let tmp = NamedTempFile::new().expect("create temp file");
1252            let out_path = tmp.into_temp_path();
1253            let out_file = File::create(&out_path).expect("create temp avro");
1254            let codec = if rel.contains(".snappy.") {
1255                Some(CompressionCodec::Snappy)
1256            } else if rel.contains(".zstandard.") {
1257                Some(CompressionCodec::ZStandard)
1258            } else if rel.contains(".bzip2.") {
1259                Some(CompressionCodec::Bzip2)
1260            } else if rel.contains(".xz.") {
1261                Some(CompressionCodec::Xz)
1262            } else {
1263                None
1264            };
1265            let mut writer = WriterBuilder::new(original.schema().as_ref().clone())
1266                .with_compression(codec)
1267                .build::<_, AvroOcfFormat>(out_file)?;
1268            writer.write(&original)?;
1269            writer.finish()?;
1270            drop(writer);
1271            let rt_file = File::open(&out_path).expect("open roundtrip avro");
1272            let rt_reader = ReaderBuilder::new()
1273                .build(BufReader::new(rt_file))
1274                .expect("build roundtrip reader");
1275            let rt_schema = rt_reader.schema();
1276            let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1277            let roundtrip =
1278                arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip");
1279            assert_eq!(
1280                roundtrip, original,
1281                "Round-trip batch mismatch for file: {}",
1282                rel
1283            );
1284        }
1285        Ok(())
1286    }
1287
1288    #[test]
1289    fn test_roundtrip_nested_records_writer() -> Result<(), AvroError> {
1290        let path = arrow_test_data("avro/nested_records.avro");
1291        let rdr_file = File::open(&path).expect("open nested_records.avro");
1292        let reader = ReaderBuilder::new()
1293            .build(BufReader::new(rdr_file))
1294            .expect("build reader for nested_records.avro");
1295        let schema = reader.schema();
1296        let batches = reader.collect::<Result<Vec<_>, _>>()?;
1297        let original = arrow::compute::concat_batches(&schema, &batches).expect("concat original");
1298        let tmp = NamedTempFile::new().expect("create temp file");
1299        let out_path = tmp.into_temp_path();
1300        {
1301            let out_file = File::create(&out_path).expect("create output avro");
1302            let mut writer = AvroWriter::new(out_file, original.schema().as_ref().clone())?;
1303            writer.write(&original)?;
1304            writer.finish()?;
1305        }
1306        let rt_file = File::open(&out_path).expect("open round_trip avro");
1307        let rt_reader = ReaderBuilder::new()
1308            .build(BufReader::new(rt_file))
1309            .expect("build round_trip reader");
1310        let rt_schema = rt_reader.schema();
1311        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1312        let round_trip =
1313            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
1314        assert_eq!(
1315            round_trip, original,
1316            "Round-trip batch mismatch for nested_records.avro"
1317        );
1318        Ok(())
1319    }
1320
1321    #[test]
1322    #[cfg(feature = "snappy")]
1323    fn test_roundtrip_nested_lists_writer() -> Result<(), AvroError> {
1324        let path = arrow_test_data("avro/nested_lists.snappy.avro");
1325        let rdr_file = File::open(&path).expect("open nested_lists.snappy.avro");
1326        let reader = ReaderBuilder::new()
1327            .build(BufReader::new(rdr_file))
1328            .expect("build reader for nested_lists.snappy.avro");
1329        let schema = reader.schema();
1330        let batches = reader.collect::<Result<Vec<_>, _>>()?;
1331        let original = arrow::compute::concat_batches(&schema, &batches).expect("concat original");
1332        let tmp = NamedTempFile::new().expect("create temp file");
1333        let out_path = tmp.into_temp_path();
1334        {
1335            let out_file = File::create(&out_path).expect("create output avro");
1336            let mut writer = WriterBuilder::new(original.schema().as_ref().clone())
1337                .with_compression(Some(CompressionCodec::Snappy))
1338                .build::<_, AvroOcfFormat>(out_file)?;
1339            writer.write(&original)?;
1340            writer.finish()?;
1341        }
1342        let rt_file = File::open(&out_path).expect("open round_trip avro");
1343        let rt_reader = ReaderBuilder::new()
1344            .build(BufReader::new(rt_file))
1345            .expect("build round_trip reader");
1346        let rt_schema = rt_reader.schema();
1347        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1348        let round_trip =
1349            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
1350        assert_eq!(
1351            round_trip, original,
1352            "Round-trip batch mismatch for nested_lists.snappy.avro"
1353        );
1354        Ok(())
1355    }
1356
1357    #[test]
1358    fn test_round_trip_simple_fixed_ocf() -> Result<(), AvroError> {
1359        let path = arrow_test_data("avro/simple_fixed.avro");
1360        let rdr_file = File::open(&path).expect("open avro/simple_fixed.avro");
1361        let reader = ReaderBuilder::new()
1362            .build(BufReader::new(rdr_file))
1363            .expect("build avro reader");
1364        let schema = reader.schema();
1365        let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
1366        let original =
1367            arrow::compute::concat_batches(&schema, &input_batches).expect("concat input");
1368        let tmp = NamedTempFile::new().expect("create temp file");
1369        let out_file = File::create(tmp.path()).expect("create temp avro");
1370        let mut writer = AvroWriter::new(out_file, original.schema().as_ref().clone())?;
1371        writer.write(&original)?;
1372        writer.finish()?;
1373        drop(writer);
1374        let rt_file = File::open(tmp.path()).expect("open round_trip avro");
1375        let rt_reader = ReaderBuilder::new()
1376            .build(BufReader::new(rt_file))
1377            .expect("build round_trip reader");
1378        let rt_schema = rt_reader.schema();
1379        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1380        let round_trip =
1381            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
1382        assert_eq!(round_trip, original);
1383        Ok(())
1384    }
1385
1386    // Strict equality (schema + values) only when canonical extension types are enabled
1387    #[test]
1388    #[cfg(feature = "canonical_extension_types")]
1389    fn test_round_trip_duration_and_uuid_ocf() -> Result<(), AvroError> {
1390        use arrow_schema::{DataType, IntervalUnit};
1391        let in_file =
1392            File::open("test/data/duration_uuid.avro").expect("open test/data/duration_uuid.avro");
1393        let reader = ReaderBuilder::new()
1394            .build(BufReader::new(in_file))
1395            .expect("build reader for duration_uuid.avro");
1396        let in_schema = reader.schema();
1397        let has_mdn = in_schema.fields().iter().any(|f| {
1398            matches!(
1399                f.data_type(),
1400                DataType::Interval(IntervalUnit::MonthDayNano)
1401            )
1402        });
1403        assert!(
1404            has_mdn,
1405            "expected at least one Interval(MonthDayNano) field in duration_uuid.avro"
1406        );
1407        let has_uuid_fixed = in_schema
1408            .fields()
1409            .iter()
1410            .any(|f| matches!(f.data_type(), DataType::FixedSizeBinary(16)));
1411        assert!(
1412            has_uuid_fixed,
1413            "expected at least one FixedSizeBinary(16) (uuid) field in duration_uuid.avro"
1414        );
1415        let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
1416        let input =
1417            arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
1418        // Write to an in‑memory OCF and read back
1419        let mut writer = AvroWriter::new(Vec::<u8>::new(), in_schema.as_ref().clone())?;
1420        writer.write(&input)?;
1421        writer.finish()?;
1422        let bytes = writer.into_inner();
1423        let rt_reader = ReaderBuilder::new()
1424            .build(Cursor::new(bytes))
1425            .expect("build round_trip reader");
1426        let rt_schema = rt_reader.schema();
1427        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1428        let round_trip =
1429            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
1430        assert_eq!(round_trip, input);
1431        Ok(())
1432    }
1433
1434    // Feature OFF: only values are asserted equal; schema may legitimately differ (uuid as fixed(16))
1435    #[test]
1436    #[cfg(not(feature = "canonical_extension_types"))]
1437    fn test_duration_and_uuid_ocf_without_extensions_round_trips_values() -> Result<(), AvroError> {
1438        use arrow::datatypes::{DataType, IntervalUnit};
1439        use std::io::BufReader;
1440
1441        // Read input Avro (duration + uuid)
1442        let in_file =
1443            File::open("test/data/duration_uuid.avro").expect("open test/data/duration_uuid.avro");
1444        let reader = ReaderBuilder::new()
1445            .build(BufReader::new(in_file))
1446            .expect("build reader for duration_uuid.avro");
1447        let in_schema = reader.schema();
1448
1449        // Sanity checks: has MonthDayNano and a FixedSizeBinary(16)
1450        assert!(
1451            in_schema.fields().iter().any(|f| {
1452                matches!(
1453                    f.data_type(),
1454                    DataType::Interval(IntervalUnit::MonthDayNano)
1455                )
1456            }),
1457            "expected at least one Interval(MonthDayNano) field"
1458        );
1459        assert!(
1460            in_schema
1461                .fields()
1462                .iter()
1463                .any(|f| matches!(f.data_type(), DataType::FixedSizeBinary(16))),
1464            "expected a FixedSizeBinary(16) field (uuid)"
1465        );
1466
1467        let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
1468        let input =
1469            arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
1470
1471        // Write to a temp OCF and read back
1472        let mut writer = AvroWriter::new(Vec::<u8>::new(), in_schema.as_ref().clone())?;
1473        writer.write(&input)?;
1474        writer.finish()?;
1475        let bytes = writer.into_inner();
1476        let rt_reader = ReaderBuilder::new()
1477            .build(Cursor::new(bytes))
1478            .expect("build round_trip reader");
1479        let rt_schema = rt_reader.schema();
1480        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1481        let round_trip =
1482            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
1483
1484        // 1) Values must round-trip for both columns
1485        assert_eq!(
1486            round_trip.column(0),
1487            input.column(0),
1488            "duration column values differ"
1489        );
1490        assert_eq!(round_trip.column(1), input.column(1), "uuid bytes differ");
1491
1492        // 2) Schema expectation without extensions:
1493        //    uuid is written as named fixed(16), so reader attaches avro.name
1494        let uuid_rt = rt_schema.field_with_name("uuid_field")?;
1495        assert_eq!(uuid_rt.data_type(), &DataType::FixedSizeBinary(16));
1496        assert_eq!(
1497            uuid_rt.metadata().get("logicalType").map(|s| s.as_str()),
1498            Some("uuid"),
1499            "expected `logicalType = \"uuid\"` on round-tripped field metadata"
1500        );
1501
1502        // 3) Duration remains Interval(MonthDayNano)
1503        let dur_rt = rt_schema.field_with_name("duration_field")?;
1504        assert!(matches!(
1505            dur_rt.data_type(),
1506            DataType::Interval(IntervalUnit::MonthDayNano)
1507        ));
1508
1509        Ok(())
1510    }
1511
1512    // This test reads the same 'nonnullable.impala.avro' used by the reader tests,
1513    // writes it back out with the writer (hitting Map encoding paths), then reads it
1514    // again and asserts exact Arrow equivalence.
1515    #[test]
1516    // TODO: avoid requiring snappy for this file
1517    #[cfg(feature = "snappy")]
1518    fn test_nonnullable_impala_roundtrip_writer() -> Result<(), AvroError> {
1519        // Load source Avro with Map fields
1520        let path = arrow_test_data("avro/nonnullable.impala.avro");
1521        let rdr_file = File::open(&path).expect("open avro/nonnullable.impala.avro");
1522        let reader = ReaderBuilder::new()
1523            .build(BufReader::new(rdr_file))
1524            .expect("build reader for nonnullable.impala.avro");
1525        // Collect all input batches and concatenate to a single RecordBatch
1526        let in_schema = reader.schema();
1527        // Sanity: ensure the file actually contains at least one Map field
1528        let has_map = in_schema
1529            .fields()
1530            .iter()
1531            .any(|f| matches!(f.data_type(), DataType::Map(_, _)));
1532        assert!(
1533            has_map,
1534            "expected at least one Map field in avro/nonnullable.impala.avro"
1535        );
1536
1537        let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
1538        let original =
1539            arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
1540        // Write out using the OCF writer into an in-memory Vec<u8>
1541        let buffer = Vec::<u8>::new();
1542        let mut writer = AvroWriter::new(buffer, in_schema.as_ref().clone())?;
1543        writer.write(&original)?;
1544        writer.finish()?;
1545        let out_bytes = writer.into_inner();
1546        // Read the produced bytes back with the Reader
1547        let rt_reader = ReaderBuilder::new()
1548            .build(Cursor::new(out_bytes))
1549            .expect("build reader for round-tripped in-memory OCF");
1550        let rt_schema = rt_reader.schema();
1551        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1552        let roundtrip =
1553            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip");
1554        // Exact value fidelity (schema + data)
1555        assert_eq!(
1556            roundtrip, original,
1557            "Round-trip Avro map data mismatch for nonnullable.impala.avro"
1558        );
1559        Ok(())
1560    }
1561
1562    #[test]
1563    // TODO: avoid requiring snappy for these files
1564    #[cfg(feature = "snappy")]
1565    fn test_roundtrip_decimals_via_writer() -> Result<(), AvroError> {
1566        // (file, resolve via ARROW_TEST_DATA?)
1567        let files: [(&str, bool); 8] = [
1568            ("avro/fixed_length_decimal.avro", true), // fixed-backed -> Decimal128(25,2)
1569            ("avro/fixed_length_decimal_legacy.avro", true), // legacy fixed[8] -> Decimal64(13,2)
1570            ("avro/int32_decimal.avro", true),        // bytes-backed -> Decimal32(4,2)
1571            ("avro/int64_decimal.avro", true),        // bytes-backed -> Decimal64(10,2)
1572            ("test/data/int256_decimal.avro", false), // bytes-backed -> Decimal256(76,2)
1573            ("test/data/fixed256_decimal.avro", false), // fixed[32]-backed -> Decimal256(76,10)
1574            ("test/data/fixed_length_decimal_legacy_32.avro", false), // legacy fixed[4] -> Decimal32(9,2)
1575            ("test/data/int128_decimal.avro", false), // bytes-backed -> Decimal128(38,2)
1576        ];
1577        for (rel, in_test_data_dir) in files {
1578            // Resolve path the same way as reader::test_decimal
1579            let path: String = if in_test_data_dir {
1580                arrow_test_data(rel)
1581            } else {
1582                PathBuf::from(env!("CARGO_MANIFEST_DIR"))
1583                    .join(rel)
1584                    .to_string_lossy()
1585                    .into_owned()
1586            };
1587            // Read original file into a single RecordBatch for comparison
1588            let f_in = File::open(&path).expect("open input avro");
1589            let rdr = ReaderBuilder::new().build(BufReader::new(f_in))?;
1590            let in_schema = rdr.schema();
1591            let in_batches = rdr.collect::<Result<Vec<_>, _>>()?;
1592            let original =
1593                arrow::compute::concat_batches(&in_schema, &in_batches).expect("concat input");
1594            // Write it out with the OCF writer (no special compression)
1595            let tmp = NamedTempFile::new().expect("create temp file");
1596            let out_path = tmp.into_temp_path();
1597            let out_file = File::create(&out_path).expect("create temp avro");
1598            let mut writer = AvroWriter::new(out_file, original.schema().as_ref().clone())?;
1599            writer.write(&original)?;
1600            writer.finish()?;
1601            // Read back the file we just wrote and compare equality (schema + data)
1602            let f_rt = File::open(&out_path).expect("open roundtrip avro");
1603            let rt_rdr = ReaderBuilder::new().build(BufReader::new(f_rt))?;
1604            let rt_schema = rt_rdr.schema();
1605            let rt_batches = rt_rdr.collect::<Result<Vec<_>, _>>()?;
1606            let roundtrip =
1607                arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat rt");
1608            assert_eq!(roundtrip, original, "decimal round-trip mismatch for {rel}");
1609        }
1610        Ok(())
1611    }
1612
1613    #[test]
1614    fn test_named_types_complex_roundtrip() -> Result<(), AvroError> {
1615        // 1. Read the new, more complex named references file.
1616        let path =
1617            PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test/data/named_types_complex.avro");
1618        let rdr_file = File::open(&path).expect("open avro/named_types_complex.avro");
1619
1620        let reader = ReaderBuilder::new()
1621            .build(BufReader::new(rdr_file))
1622            .expect("build reader for named_types_complex.avro");
1623
1624        // 2. Concatenate all batches to one RecordBatch.
1625        let in_schema = reader.schema();
1626        let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
1627        let original =
1628            arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
1629
1630        // 3. Sanity Checks: Validate that all named types were reused correctly.
1631        {
1632            let arrow_schema = original.schema();
1633
1634            // --- A. Validate 'User' record reuse ---
1635            let author_field = arrow_schema.field_with_name("author")?;
1636            let author_type = author_field.data_type();
1637            let editors_field = arrow_schema.field_with_name("editors")?;
1638            let editors_item_type = match editors_field.data_type() {
1639                DataType::List(item_field) => item_field.data_type(),
1640                other => panic!("Editors field should be a List, but was {:?}", other),
1641            };
1642            assert_eq!(
1643                author_type, editors_item_type,
1644                "The DataType for the 'author' struct and the 'editors' list items must be identical"
1645            );
1646
1647            // --- B. Validate 'PostStatus' enum reuse ---
1648            let status_field = arrow_schema.field_with_name("status")?;
1649            let status_type = status_field.data_type();
1650            assert!(
1651                matches!(status_type, DataType::Dictionary(_, _)),
1652                "Status field should be a Dictionary (Enum)"
1653            );
1654
1655            let prev_status_field = arrow_schema.field_with_name("previous_status")?;
1656            let prev_status_type = prev_status_field.data_type();
1657            assert_eq!(
1658                status_type, prev_status_type,
1659                "The DataType for 'status' and 'previous_status' enums must be identical"
1660            );
1661
1662            // --- C. Validate 'MD5' fixed reuse ---
1663            let content_hash_field = arrow_schema.field_with_name("content_hash")?;
1664            let content_hash_type = content_hash_field.data_type();
1665            assert!(
1666                matches!(content_hash_type, DataType::FixedSizeBinary(16)),
1667                "Content hash should be FixedSizeBinary(16)"
1668            );
1669
1670            let thumb_hash_field = arrow_schema.field_with_name("thumbnail_hash")?;
1671            let thumb_hash_type = thumb_hash_field.data_type();
1672            assert_eq!(
1673                content_hash_type, thumb_hash_type,
1674                "The DataType for 'content_hash' and 'thumbnail_hash' fixed types must be identical"
1675            );
1676        }
1677
1678        // 4. Write the data to an in-memory buffer.
1679        let buffer: Vec<u8> = Vec::new();
1680        let mut writer = AvroWriter::new(buffer, original.schema().as_ref().clone())?;
1681        writer.write(&original)?;
1682        writer.finish()?;
1683        let bytes = writer.into_inner();
1684
1685        // 5. Read the data back and compare for exact equality.
1686        let rt_reader = ReaderBuilder::new()
1687            .build(Cursor::new(bytes))
1688            .expect("build reader for round-trip");
1689        let rt_schema = rt_reader.schema();
1690        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1691        let roundtrip =
1692            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip");
1693
1694        assert_eq!(
1695            roundtrip, original,
1696            "Avro complex named types round-trip mismatch"
1697        );
1698
1699        Ok(())
1700    }
1701
1702    // Union Roundtrip Test Helpers
1703
1704    // Asserts that the `actual` schema is a semantically equivalent superset of the `expected` one.
1705    // This allows the `actual` schema to contain additional metadata keys
1706    // (`arrowUnionMode`, `arrowUnionTypeIds`, `avro.name`) that are added during an Arrow-to-Avro-to-Arrow
1707    // roundtrip, while ensuring no other information was lost or changed.
1708    fn assert_schema_is_semantically_equivalent(expected: &Schema, actual: &Schema) {
1709        // Compare top-level schema metadata using the same superset logic.
1710        assert_metadata_is_superset(expected.metadata(), actual.metadata(), "Schema");
1711
1712        // Compare fields.
1713        assert_eq!(
1714            expected.fields().len(),
1715            actual.fields().len(),
1716            "Schema must have the same number of fields"
1717        );
1718
1719        for (expected_field, actual_field) in expected.fields().iter().zip(actual.fields().iter()) {
1720            assert_field_is_semantically_equivalent(expected_field, actual_field);
1721        }
1722    }
1723
1724    fn assert_field_is_semantically_equivalent(expected: &Field, actual: &Field) {
1725        let context = format!("Field '{}'", expected.name());
1726
1727        assert_eq!(
1728            expected.name(),
1729            actual.name(),
1730            "{context}: names must match"
1731        );
1732        assert_eq!(
1733            expected.is_nullable(),
1734            actual.is_nullable(),
1735            "{context}: nullability must match"
1736        );
1737
1738        // Recursively check the data types.
1739        assert_datatype_is_semantically_equivalent(
1740            expected.data_type(),
1741            actual.data_type(),
1742            &context,
1743        );
1744
1745        // Check that metadata is a valid superset.
1746        assert_metadata_is_superset(expected.metadata(), actual.metadata(), &context);
1747    }
1748
1749    fn assert_datatype_is_semantically_equivalent(
1750        expected: &DataType,
1751        actual: &DataType,
1752        context: &str,
1753    ) {
1754        match (expected, actual) {
1755            (DataType::List(expected_field), DataType::List(actual_field))
1756            | (DataType::LargeList(expected_field), DataType::LargeList(actual_field))
1757            | (DataType::Map(expected_field, _), DataType::Map(actual_field, _)) => {
1758                assert_field_is_semantically_equivalent(expected_field, actual_field);
1759            }
1760            (DataType::Struct(expected_fields), DataType::Struct(actual_fields)) => {
1761                assert_eq!(
1762                    expected_fields.len(),
1763                    actual_fields.len(),
1764                    "{context}: struct must have same number of fields"
1765                );
1766                for (ef, af) in expected_fields.iter().zip(actual_fields.iter()) {
1767                    assert_field_is_semantically_equivalent(ef, af);
1768                }
1769            }
1770            (
1771                DataType::Union(expected_fields, expected_mode),
1772                DataType::Union(actual_fields, actual_mode),
1773            ) => {
1774                assert_eq!(
1775                    expected_mode, actual_mode,
1776                    "{context}: union mode must match"
1777                );
1778                assert_eq!(
1779                    expected_fields.len(),
1780                    actual_fields.len(),
1781                    "{context}: union must have same number of variants"
1782                );
1783                for ((exp_id, exp_field), (act_id, act_field)) in
1784                    expected_fields.iter().zip(actual_fields.iter())
1785                {
1786                    assert_eq!(exp_id, act_id, "{context}: union type ids must match");
1787                    assert_field_is_semantically_equivalent(exp_field, act_field);
1788                }
1789            }
1790            _ => {
1791                assert_eq!(expected, actual, "{context}: data types must be identical");
1792            }
1793        }
1794    }
1795
1796    fn assert_batch_data_is_identical(expected: &RecordBatch, actual: &RecordBatch) {
1797        assert_eq!(
1798            expected.num_columns(),
1799            actual.num_columns(),
1800            "RecordBatches must have the same number of columns"
1801        );
1802        assert_eq!(
1803            expected.num_rows(),
1804            actual.num_rows(),
1805            "RecordBatches must have the same number of rows"
1806        );
1807
1808        for i in 0..expected.num_columns() {
1809            let context = format!("Column {i}");
1810            let expected_col = expected.column(i);
1811            let actual_col = actual.column(i);
1812            assert_array_data_is_identical(expected_col, actual_col, &context);
1813        }
1814    }
1815
1816    /// Recursively asserts that the data content of two Arrays is identical.
1817    fn assert_array_data_is_identical(expected: &dyn Array, actual: &dyn Array, context: &str) {
1818        assert_eq!(
1819            expected.nulls(),
1820            actual.nulls(),
1821            "{context}: null buffers must match"
1822        );
1823        assert_eq!(
1824            expected.len(),
1825            actual.len(),
1826            "{context}: array lengths must match"
1827        );
1828
1829        match (expected.data_type(), actual.data_type()) {
1830            (DataType::Union(expected_fields, _), DataType::Union(..)) => {
1831                let expected_union = expected.as_any().downcast_ref::<UnionArray>().unwrap();
1832                let actual_union = actual.as_any().downcast_ref::<UnionArray>().unwrap();
1833
1834                // Compare the type_ids buffer (always the first buffer).
1835                assert_eq!(
1836                    &expected.to_data().buffers()[0],
1837                    &actual.to_data().buffers()[0],
1838                    "{context}: union type_ids buffer mismatch"
1839                );
1840
1841                // For dense unions, compare the value_offsets buffer (the second buffer).
1842                if expected.to_data().buffers().len() > 1 {
1843                    assert_eq!(
1844                        &expected.to_data().buffers()[1],
1845                        &actual.to_data().buffers()[1],
1846                        "{context}: union value_offsets buffer mismatch"
1847                    );
1848                }
1849
1850                // Recursively compare children based on the fields in the DataType.
1851                for (type_id, _) in expected_fields.iter() {
1852                    let child_context = format!("{context} -> child variant {type_id}");
1853                    assert_array_data_is_identical(
1854                        expected_union.child(type_id),
1855                        actual_union.child(type_id),
1856                        &child_context,
1857                    );
1858                }
1859            }
1860            (DataType::Struct(_), DataType::Struct(_)) => {
1861                let expected_struct = expected.as_any().downcast_ref::<StructArray>().unwrap();
1862                let actual_struct = actual.as_any().downcast_ref::<StructArray>().unwrap();
1863                for i in 0..expected_struct.num_columns() {
1864                    let child_context = format!("{context} -> struct child {i}");
1865                    assert_array_data_is_identical(
1866                        expected_struct.column(i),
1867                        actual_struct.column(i),
1868                        &child_context,
1869                    );
1870                }
1871            }
1872            // Fallback for primitive types and other types where buffer comparison is sufficient.
1873            _ => {
1874                assert_eq!(
1875                    expected.to_data().buffers(),
1876                    actual.to_data().buffers(),
1877                    "{context}: data buffers must match"
1878                );
1879            }
1880        }
1881    }
1882
1883    /// Checks that `actual_meta` contains all of `expected_meta`, and any additional
1884    /// keys in `actual_meta` are from a permitted set.
1885    fn assert_metadata_is_superset(
1886        expected_meta: &HashMap<String, String>,
1887        actual_meta: &HashMap<String, String>,
1888        context: &str,
1889    ) {
1890        let allowed_additions: HashSet<&str> =
1891            vec!["arrowUnionMode", "arrowUnionTypeIds", "avro.name"]
1892                .into_iter()
1893                .collect();
1894        for (key, expected_value) in expected_meta {
1895            match actual_meta.get(key) {
1896                Some(actual_value) => assert_eq!(
1897                    expected_value, actual_value,
1898                    "{context}: preserved metadata for key '{key}' must have the same value"
1899                ),
1900                None => panic!("{context}: metadata key '{key}' was lost during roundtrip"),
1901            }
1902        }
1903        for key in actual_meta.keys() {
1904            if !expected_meta.contains_key(key) && !allowed_additions.contains(key.as_str()) {
1905                panic!("{context}: unexpected metadata key '{key}' was added during roundtrip");
1906            }
1907        }
1908    }
1909
1910    #[test]
1911    fn test_union_roundtrip() -> Result<(), AvroError> {
1912        let file_path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
1913            .join("test/data/union_fields.avro")
1914            .to_string_lossy()
1915            .into_owned();
1916        let rdr_file = File::open(&file_path).expect("open avro/union_fields.avro");
1917        let reader = ReaderBuilder::new()
1918            .build(BufReader::new(rdr_file))
1919            .expect("build reader for union_fields.avro");
1920        let schema = reader.schema();
1921        let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
1922        let original =
1923            arrow::compute::concat_batches(&schema, &input_batches).expect("concat input");
1924        let mut writer = AvroWriter::new(Vec::<u8>::new(), original.schema().as_ref().clone())?;
1925        writer.write(&original)?;
1926        writer.finish()?;
1927        let bytes = writer.into_inner();
1928        let rt_reader = ReaderBuilder::new()
1929            .build(Cursor::new(bytes))
1930            .expect("build round_trip reader");
1931        let rt_schema = rt_reader.schema();
1932        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1933        let round_trip =
1934            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
1935
1936        // The nature of the crate is such that metadata gets appended during the roundtrip,
1937        // so we can't compare the schemas directly. Instead, we semantically compare the schemas and data.
1938        assert_schema_is_semantically_equivalent(&original.schema(), &round_trip.schema());
1939
1940        assert_batch_data_is_identical(&original, &round_trip);
1941        Ok(())
1942    }
1943
1944    #[test]
1945    fn test_enum_roundtrip_uses_reader_fixture() -> Result<(), AvroError> {
1946        // Read the known-good enum file (same as reader::test_simple)
1947        let path = arrow_test_data("avro/simple_enum.avro");
1948        let rdr_file = File::open(&path).expect("open avro/simple_enum.avro");
1949        let reader = ReaderBuilder::new()
1950            .build(BufReader::new(rdr_file))
1951            .expect("build reader for simple_enum.avro");
1952        // Concatenate all batches to one RecordBatch for a clean equality check
1953        let in_schema = reader.schema();
1954        let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
1955        let original =
1956            arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
1957        // Sanity: expect at least one Dictionary(Int32, Utf8) column (enum)
1958        let has_enum_dict = in_schema.fields().iter().any(|f| {
1959            matches!(
1960                f.data_type(),
1961                DataType::Dictionary(k, v) if **k == DataType::Int32 && **v == DataType::Utf8
1962            )
1963        });
1964        assert!(
1965            has_enum_dict,
1966            "Expected at least one enum-mapped Dictionary<Int32, Utf8> field"
1967        );
1968        // Write with OCF writer into memory using the reader-provided Arrow schema.
1969        // The writer will embed the Avro JSON from `avro.schema` metadata if present.
1970        let buffer: Vec<u8> = Vec::new();
1971        let mut writer = AvroWriter::new(buffer, in_schema.as_ref().clone())?;
1972        writer.write(&original)?;
1973        writer.finish()?;
1974        let bytes = writer.into_inner();
1975        // Read back and compare for exact equality (schema + data)
1976        let rt_reader = ReaderBuilder::new()
1977            .build(Cursor::new(bytes))
1978            .expect("reader for round-trip");
1979        let rt_schema = rt_reader.schema();
1980        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1981        let roundtrip =
1982            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip");
1983        assert_eq!(roundtrip, original, "Avro enum round-trip mismatch");
1984        Ok(())
1985    }
1986
1987    #[test]
1988    fn test_builder_propagates_capacity_to_writer() -> Result<(), AvroError> {
1989        let cap = 64 * 1024;
1990        let buffer = Vec::<u8>::new();
1991        let mut writer = WriterBuilder::new(make_schema())
1992            .with_capacity(cap)
1993            .build::<_, AvroOcfFormat>(buffer)?;
1994        assert_eq!(writer.capacity, cap, "builder capacity not propagated");
1995        let batch = make_batch();
1996        writer.write(&batch)?;
1997        writer.finish()?;
1998        let out = writer.into_inner();
1999        assert_eq!(&out[..4], b"Obj\x01", "OCF magic missing/incorrect");
2000        Ok(())
2001    }
2002
2003    #[test]
2004    fn test_stream_writer_stores_capacity_direct_writes() -> Result<(), AvroError> {
2005        use arrow_array::{ArrayRef, Int32Array};
2006        use arrow_schema::{DataType, Field, Schema};
2007        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
2008        let batch = RecordBatch::try_new(
2009            Arc::new(schema.clone()),
2010            vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef],
2011        )?;
2012        let cap = 8192;
2013        let mut writer = WriterBuilder::new(schema)
2014            .with_capacity(cap)
2015            .build::<_, AvroSoeFormat>(Vec::new())?;
2016        assert_eq!(writer.capacity, cap);
2017        writer.write(&batch)?;
2018        let _bytes = writer.into_inner();
2019        Ok(())
2020    }
2021
2022    #[cfg(feature = "avro_custom_types")]
2023    #[test]
2024    fn test_roundtrip_duration_logical_types_ocf() -> Result<(), AvroError> {
2025        let file_path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
2026            .join("test/data/duration_logical_types.avro")
2027            .to_string_lossy()
2028            .into_owned();
2029
2030        let in_file = File::open(&file_path)
2031            .unwrap_or_else(|_| panic!("Failed to open test file: {}", file_path));
2032
2033        let reader = ReaderBuilder::new()
2034            .build(BufReader::new(in_file))
2035            .expect("build reader for duration_logical_types.avro");
2036        let in_schema = reader.schema();
2037
2038        let expected_units: HashSet<TimeUnit> = [
2039            TimeUnit::Nanosecond,
2040            TimeUnit::Microsecond,
2041            TimeUnit::Millisecond,
2042            TimeUnit::Second,
2043        ]
2044        .into_iter()
2045        .collect();
2046
2047        let found_units: HashSet<TimeUnit> = in_schema
2048            .fields()
2049            .iter()
2050            .filter_map(|f| match f.data_type() {
2051                DataType::Duration(unit) => Some(*unit),
2052                _ => None,
2053            })
2054            .collect();
2055
2056        assert_eq!(
2057            found_units, expected_units,
2058            "Expected to find all four Duration TimeUnits in the schema from the initial read"
2059        );
2060
2061        let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
2062        let input =
2063            arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
2064
2065        let tmp = NamedTempFile::new().expect("create temp file");
2066        {
2067            let out_file = File::create(tmp.path()).expect("create temp avro");
2068            let mut writer = AvroWriter::new(out_file, in_schema.as_ref().clone())?;
2069            writer.write(&input)?;
2070            writer.finish()?;
2071        }
2072
2073        let rt_file = File::open(tmp.path()).expect("open round_trip avro");
2074        let rt_reader = ReaderBuilder::new()
2075            .build(BufReader::new(rt_file))
2076            .expect("build round_trip reader");
2077        let rt_schema = rt_reader.schema();
2078        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2079        let round_trip =
2080            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
2081
2082        assert_eq!(round_trip, input);
2083        Ok(())
2084    }
2085
2086    #[cfg(feature = "avro_custom_types")]
2087    #[test]
2088    fn test_run_end_encoded_roundtrip_writer() -> Result<(), AvroError> {
2089        let run_ends = Int32Array::from(vec![3, 5, 7, 8]);
2090        let run_values = Int32Array::from(vec![Some(1), Some(2), None, Some(3)]);
2091        let ree = RunArray::<Int32Type>::try_new(&run_ends, &run_values)?;
2092        let field = Field::new("x", ree.data_type().clone(), true);
2093        let schema = Schema::new(vec![field]);
2094        let batch = RecordBatch::try_new(
2095            Arc::new(schema.clone()),
2096            vec![Arc::new(ree.clone()) as ArrayRef],
2097        )?;
2098        let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
2099        writer.write(&batch)?;
2100        writer.finish()?;
2101        let bytes = writer.into_inner();
2102        let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
2103        let out_schema = reader.schema();
2104        let batches = reader.collect::<Result<Vec<_>, _>>()?;
2105        let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output");
2106        assert_eq!(out.num_columns(), 1);
2107        assert_eq!(out.num_rows(), 8);
2108        match out.schema().field(0).data_type() {
2109            DataType::RunEndEncoded(run_ends_field, values_field) => {
2110                assert_eq!(run_ends_field.name(), "run_ends");
2111                assert_eq!(run_ends_field.data_type(), &DataType::Int32);
2112                assert_eq!(values_field.name(), "values");
2113                assert_eq!(values_field.data_type(), &DataType::Int32);
2114                assert!(values_field.is_nullable());
2115                let got_ree = out
2116                    .column(0)
2117                    .as_any()
2118                    .downcast_ref::<RunArray<Int32Type>>()
2119                    .expect("RunArray<Int32Type>");
2120                assert_eq!(got_ree, &ree);
2121            }
2122            other => panic!(
2123                "Unexpected DataType for round-tripped RunEndEncoded column: {:?}",
2124                other
2125            ),
2126        }
2127        Ok(())
2128    }
2129
2130    #[cfg(feature = "avro_custom_types")]
2131    #[test]
2132    fn test_run_end_encoded_string_values_int16_run_ends_roundtrip_writer() -> Result<(), AvroError>
2133    {
2134        let run_ends = Int16Array::from(vec![2, 5, 7]); // end indices
2135        let run_values = StringArray::from(vec![Some("a"), None, Some("c")]);
2136        let ree = RunArray::<Int16Type>::try_new(&run_ends, &run_values)?;
2137        let field = Field::new("s", ree.data_type().clone(), true);
2138        let schema = Schema::new(vec![field]);
2139        let batch = RecordBatch::try_new(
2140            Arc::new(schema.clone()),
2141            vec![Arc::new(ree.clone()) as ArrayRef],
2142        )?;
2143        let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
2144        writer.write(&batch)?;
2145        writer.finish()?;
2146        let bytes = writer.into_inner();
2147        let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
2148        let out_schema = reader.schema();
2149        let batches = reader.collect::<Result<Vec<_>, _>>()?;
2150        let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output");
2151        assert_eq!(out.num_columns(), 1);
2152        assert_eq!(out.num_rows(), 7);
2153        match out.schema().field(0).data_type() {
2154            DataType::RunEndEncoded(run_ends_field, values_field) => {
2155                assert_eq!(run_ends_field.data_type(), &DataType::Int16);
2156                assert_eq!(values_field.data_type(), &DataType::Utf8);
2157                assert!(
2158                    values_field.is_nullable(),
2159                    "REE 'values' child should be nullable"
2160                );
2161                let got = out
2162                    .column(0)
2163                    .as_any()
2164                    .downcast_ref::<RunArray<Int16Type>>()
2165                    .expect("RunArray<Int16Type>");
2166                assert_eq!(got, &ree);
2167            }
2168            other => panic!("Unexpected DataType: {:?}", other),
2169        }
2170        Ok(())
2171    }
2172
2173    #[cfg(feature = "avro_custom_types")]
2174    #[test]
2175    fn test_run_end_encoded_int64_run_ends_numeric_values_roundtrip_writer() -> Result<(), AvroError>
2176    {
2177        let run_ends = Int64Array::from(vec![4_i64, 8_i64]);
2178        let run_values = Int32Array::from(vec![Some(999), Some(-5)]);
2179        let ree = RunArray::<Int64Type>::try_new(&run_ends, &run_values)?;
2180        let field = Field::new("y", ree.data_type().clone(), true);
2181        let schema = Schema::new(vec![field]);
2182        let batch = RecordBatch::try_new(
2183            Arc::new(schema.clone()),
2184            vec![Arc::new(ree.clone()) as ArrayRef],
2185        )?;
2186        let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
2187        writer.write(&batch)?;
2188        writer.finish()?;
2189        let bytes = writer.into_inner();
2190        let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
2191        let out_schema = reader.schema();
2192        let batches = reader.collect::<Result<Vec<_>, _>>()?;
2193        let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output");
2194        assert_eq!(out.num_columns(), 1);
2195        assert_eq!(out.num_rows(), 8);
2196        match out.schema().field(0).data_type() {
2197            DataType::RunEndEncoded(run_ends_field, values_field) => {
2198                assert_eq!(run_ends_field.data_type(), &DataType::Int64);
2199                assert_eq!(values_field.data_type(), &DataType::Int32);
2200                assert!(values_field.is_nullable());
2201                let got = out
2202                    .column(0)
2203                    .as_any()
2204                    .downcast_ref::<RunArray<Int64Type>>()
2205                    .expect("RunArray<Int64Type>");
2206                assert_eq!(got, &ree);
2207            }
2208            other => panic!("Unexpected DataType for REE column: {:?}", other),
2209        }
2210        Ok(())
2211    }
2212
2213    #[cfg(feature = "avro_custom_types")]
2214    #[test]
2215    fn test_run_end_encoded_sliced_roundtrip_writer() -> Result<(), AvroError> {
2216        let run_ends = Int32Array::from(vec![3, 5, 7, 8]);
2217        let run_values = Int32Array::from(vec![Some(1), Some(2), None, Some(3)]);
2218        let base = RunArray::<Int32Type>::try_new(&run_ends, &run_values)?;
2219        let offset = 1usize;
2220        let length = 6usize;
2221        let base_values = base.values().as_primitive::<Int32Type>();
2222        let mut logical_window: Vec<Option<i32>> = Vec::with_capacity(length);
2223        for i in offset..offset + length {
2224            let phys = base.get_physical_index(i);
2225            let v = if base_values.is_null(phys) {
2226                None
2227            } else {
2228                Some(base_values.value(phys))
2229            };
2230            logical_window.push(v);
2231        }
2232
2233        fn compress_run_ends_i32(vals: &[Option<i32>]) -> (Int32Array, Int32Array) {
2234            if vals.is_empty() {
2235                return (Int32Array::new_null(0), Int32Array::new_null(0));
2236            }
2237            let mut run_ends_out: Vec<i32> = Vec::new();
2238            let mut run_vals_out: Vec<Option<i32>> = Vec::new();
2239            let mut cur = vals[0];
2240            let mut len = 1i32;
2241            for v in &vals[1..] {
2242                if *v == cur {
2243                    len += 1;
2244                } else {
2245                    let last_end = run_ends_out.last().copied().unwrap_or(0);
2246                    run_ends_out.push(last_end + len);
2247                    run_vals_out.push(cur);
2248                    cur = *v;
2249                    len = 1;
2250                }
2251            }
2252            let last_end = run_ends_out.last().copied().unwrap_or(0);
2253            run_ends_out.push(last_end + len);
2254            run_vals_out.push(cur);
2255            (
2256                Int32Array::from(run_ends_out),
2257                Int32Array::from(run_vals_out),
2258            )
2259        }
2260        let (owned_run_ends, owned_run_values) = compress_run_ends_i32(&logical_window);
2261        let owned_slice = RunArray::<Int32Type>::try_new(&owned_run_ends, &owned_run_values)?;
2262        let field = Field::new("x", owned_slice.data_type().clone(), true);
2263        let schema = Schema::new(vec![field]);
2264        let batch = RecordBatch::try_new(
2265            Arc::new(schema.clone()),
2266            vec![Arc::new(owned_slice.clone()) as ArrayRef],
2267        )?;
2268        let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
2269        writer.write(&batch)?;
2270        writer.finish()?;
2271        let bytes = writer.into_inner();
2272        let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
2273        let out_schema = reader.schema();
2274        let batches = reader.collect::<Result<Vec<_>, _>>()?;
2275        let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output");
2276        assert_eq!(out.num_columns(), 1);
2277        assert_eq!(out.num_rows(), length);
2278        match out.schema().field(0).data_type() {
2279            DataType::RunEndEncoded(run_ends_field, values_field) => {
2280                assert_eq!(run_ends_field.data_type(), &DataType::Int32);
2281                assert_eq!(values_field.data_type(), &DataType::Int32);
2282                assert!(values_field.is_nullable());
2283                let got = out
2284                    .column(0)
2285                    .as_any()
2286                    .downcast_ref::<RunArray<Int32Type>>()
2287                    .expect("RunArray<Int32Type>");
2288                fn expand_ree_to_int32(a: &RunArray<Int32Type>) -> Int32Array {
2289                    let vals = a.values().as_primitive::<Int32Type>();
2290                    let mut out: Vec<Option<i32>> = Vec::with_capacity(a.len());
2291                    for i in 0..a.len() {
2292                        let phys = a.get_physical_index(i);
2293                        out.push(if vals.is_null(phys) {
2294                            None
2295                        } else {
2296                            Some(vals.value(phys))
2297                        });
2298                    }
2299                    Int32Array::from(out)
2300                }
2301                let got_logical = expand_ree_to_int32(got);
2302                let expected_logical = Int32Array::from(logical_window);
2303                assert_eq!(
2304                    got_logical, expected_logical,
2305                    "Logical values differ after REE slice round-trip"
2306                );
2307            }
2308            other => panic!("Unexpected DataType for REE column: {:?}", other),
2309        }
2310        Ok(())
2311    }
2312
2313    #[cfg(not(feature = "avro_custom_types"))]
2314    #[test]
2315    fn test_run_end_encoded_roundtrip_writer_feature_off() -> Result<(), AvroError> {
2316        use arrow_schema::{DataType, Field, Schema};
2317        let run_ends = arrow_array::Int32Array::from(vec![3, 5, 7, 8]);
2318        let run_values = arrow_array::Int32Array::from(vec![Some(1), Some(2), None, Some(3)]);
2319        let ree = arrow_array::RunArray::<arrow_array::types::Int32Type>::try_new(
2320            &run_ends,
2321            &run_values,
2322        )?;
2323        let field = Field::new("x", ree.data_type().clone(), true);
2324        let schema = Schema::new(vec![field]);
2325        let batch =
2326            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(ree) as ArrayRef])?;
2327        let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
2328        writer.write(&batch)?;
2329        writer.finish()?;
2330        let bytes = writer.into_inner();
2331        let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
2332        let out_schema = reader.schema();
2333        let batches = reader.collect::<Result<Vec<_>, _>>()?;
2334        let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output");
2335        assert_eq!(out.num_columns(), 1);
2336        assert_eq!(out.num_rows(), 8);
2337        assert_eq!(out.schema().field(0).data_type(), &DataType::Int32);
2338        let got = out.column(0).as_primitive::<Int32Type>();
2339        let expected = Int32Array::from(vec![
2340            Some(1),
2341            Some(1),
2342            Some(1),
2343            Some(2),
2344            Some(2),
2345            None,
2346            None,
2347            Some(3),
2348        ]);
2349        assert_eq!(got, &expected);
2350        Ok(())
2351    }
2352
2353    #[cfg(not(feature = "avro_custom_types"))]
2354    #[test]
2355    fn test_run_end_encoded_string_values_int16_run_ends_roundtrip_writer_feature_off()
2356    -> Result<(), AvroError> {
2357        use arrow_schema::{DataType, Field, Schema};
2358        let run_ends = arrow_array::Int16Array::from(vec![2, 5, 7]);
2359        let run_values = arrow_array::StringArray::from(vec![Some("a"), None, Some("c")]);
2360        let ree = arrow_array::RunArray::<arrow_array::types::Int16Type>::try_new(
2361            &run_ends,
2362            &run_values,
2363        )?;
2364        let field = Field::new("s", ree.data_type().clone(), true);
2365        let schema = Schema::new(vec![field]);
2366        let batch =
2367            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(ree) as ArrayRef])?;
2368        let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
2369        writer.write(&batch)?;
2370        writer.finish()?;
2371        let bytes = writer.into_inner();
2372        let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
2373        let out_schema = reader.schema();
2374        let batches = reader.collect::<Result<Vec<_>, _>>()?;
2375        let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output");
2376        assert_eq!(out.num_columns(), 1);
2377        assert_eq!(out.num_rows(), 7);
2378        assert_eq!(out.schema().field(0).data_type(), &DataType::Utf8);
2379        let got = out
2380            .column(0)
2381            .as_any()
2382            .downcast_ref::<arrow_array::StringArray>()
2383            .expect("StringArray");
2384        let expected = arrow_array::StringArray::from(vec![
2385            Some("a"),
2386            Some("a"),
2387            None,
2388            None,
2389            None,
2390            Some("c"),
2391            Some("c"),
2392        ]);
2393        assert_eq!(got, &expected);
2394        Ok(())
2395    }
2396
2397    #[cfg(not(feature = "avro_custom_types"))]
2398    #[test]
2399    fn test_run_end_encoded_int64_run_ends_numeric_values_roundtrip_writer_feature_off()
2400    -> Result<(), AvroError> {
2401        use arrow_schema::{DataType, Field, Schema};
2402        let run_ends = arrow_array::Int64Array::from(vec![4_i64, 8_i64]);
2403        let run_values = Int32Array::from(vec![Some(999), Some(-5)]);
2404        let ree = arrow_array::RunArray::<arrow_array::types::Int64Type>::try_new(
2405            &run_ends,
2406            &run_values,
2407        )?;
2408        let field = Field::new("y", ree.data_type().clone(), true);
2409        let schema = Schema::new(vec![field]);
2410        let batch =
2411            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(ree) as ArrayRef])?;
2412        let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
2413        writer.write(&batch)?;
2414        writer.finish()?;
2415        let bytes = writer.into_inner();
2416        let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
2417        let out_schema = reader.schema();
2418        let batches = reader.collect::<Result<Vec<_>, _>>()?;
2419        let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output");
2420        assert_eq!(out.num_columns(), 1);
2421        assert_eq!(out.num_rows(), 8);
2422        assert_eq!(out.schema().field(0).data_type(), &DataType::Int32);
2423        let got = out.column(0).as_primitive::<Int32Type>();
2424        let expected = Int32Array::from(vec![
2425            Some(999),
2426            Some(999),
2427            Some(999),
2428            Some(999),
2429            Some(-5),
2430            Some(-5),
2431            Some(-5),
2432            Some(-5),
2433        ]);
2434        assert_eq!(got, &expected);
2435        Ok(())
2436    }
2437
2438    #[cfg(not(feature = "avro_custom_types"))]
2439    #[test]
2440    fn test_run_end_encoded_sliced_roundtrip_writer_feature_off() -> Result<(), AvroError> {
2441        use arrow_schema::{DataType, Field, Schema};
2442        let run_ends = Int32Array::from(vec![2, 4, 6]);
2443        let run_values = Int32Array::from(vec![Some(1), Some(2), None]);
2444        let ree = arrow_array::RunArray::<arrow_array::types::Int32Type>::try_new(
2445            &run_ends,
2446            &run_values,
2447        )?;
2448        let field = Field::new("x", ree.data_type().clone(), true);
2449        let schema = Schema::new(vec![field]);
2450        let batch =
2451            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(ree) as ArrayRef])?;
2452        let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
2453        writer.write(&batch)?;
2454        writer.finish()?;
2455        let bytes = writer.into_inner();
2456        let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
2457        let out_schema = reader.schema();
2458        let batches = reader.collect::<Result<Vec<_>, _>>()?;
2459        let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output");
2460        assert_eq!(out.num_columns(), 1);
2461        assert_eq!(out.num_rows(), 6);
2462        assert_eq!(out.schema().field(0).data_type(), &DataType::Int32);
2463        let got = out.column(0).as_primitive::<Int32Type>();
2464        let expected = Int32Array::from(vec![Some(1), Some(1), Some(2), Some(2), None, None]);
2465        assert_eq!(got, &expected);
2466        Ok(())
2467    }
2468
2469    #[test]
2470    // TODO: avoid requiring snappy for this file
2471    #[cfg(feature = "snappy")]
2472    fn test_nullable_impala_roundtrip() -> Result<(), AvroError> {
2473        let path = arrow_test_data("avro/nullable.impala.avro");
2474        let rdr_file = File::open(&path).expect("open avro/nullable.impala.avro");
2475        let reader = ReaderBuilder::new()
2476            .build(BufReader::new(rdr_file))
2477            .expect("build reader for nullable.impala.avro");
2478        let in_schema = reader.schema();
2479        assert!(
2480            in_schema.fields().iter().any(|f| f.is_nullable()),
2481            "expected at least one nullable field in avro/nullable.impala.avro"
2482        );
2483        let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
2484        let original =
2485            arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
2486        let buffer: Vec<u8> = Vec::new();
2487        let mut writer = AvroWriter::new(buffer, in_schema.as_ref().clone())?;
2488        writer.write(&original)?;
2489        writer.finish()?;
2490        let out_bytes = writer.into_inner();
2491        let rt_reader = ReaderBuilder::new()
2492            .build(Cursor::new(out_bytes))
2493            .expect("build reader for round-tripped in-memory OCF");
2494        let rt_schema = rt_reader.schema();
2495        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2496        let roundtrip =
2497            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip");
2498        assert_eq!(
2499            roundtrip, original,
2500            "Round-trip Avro data mismatch for nullable.impala.avro"
2501        );
2502        Ok(())
2503    }
2504
2505    #[test]
2506    #[cfg(feature = "snappy")]
2507    fn test_datapage_v2_roundtrip() -> Result<(), AvroError> {
2508        let path = arrow_test_data("avro/datapage_v2.snappy.avro");
2509        let rdr_file = File::open(&path).expect("open avro/datapage_v2.snappy.avro");
2510        let reader = ReaderBuilder::new()
2511            .build(BufReader::new(rdr_file))
2512            .expect("build reader for datapage_v2.snappy.avro");
2513        let in_schema = reader.schema();
2514        let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
2515        let original =
2516            arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
2517        let mut writer = AvroWriter::new(Vec::<u8>::new(), in_schema.as_ref().clone())?;
2518        writer.write(&original)?;
2519        writer.finish()?;
2520        let bytes = writer.into_inner();
2521        let rt_reader = ReaderBuilder::new()
2522            .build(Cursor::new(bytes))
2523            .expect("build round-trip reader");
2524        let rt_schema = rt_reader.schema();
2525        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2526        let round_trip =
2527            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
2528        assert_eq!(
2529            round_trip, original,
2530            "Round-trip batch mismatch for datapage_v2.snappy.avro"
2531        );
2532        Ok(())
2533    }
2534
2535    #[test]
2536    #[cfg(feature = "snappy")]
2537    fn test_single_nan_roundtrip() -> Result<(), AvroError> {
2538        let path = arrow_test_data("avro/single_nan.avro");
2539        let in_file = File::open(&path).expect("open avro/single_nan.avro");
2540        let reader = ReaderBuilder::new()
2541            .build(BufReader::new(in_file))
2542            .expect("build reader for single_nan.avro");
2543        let in_schema = reader.schema();
2544        let in_batches = reader.collect::<Result<Vec<_>, _>>()?;
2545        let original =
2546            arrow::compute::concat_batches(&in_schema, &in_batches).expect("concat input");
2547        let mut writer = AvroWriter::new(Vec::<u8>::new(), original.schema().as_ref().clone())?;
2548        writer.write(&original)?;
2549        writer.finish()?;
2550        let bytes = writer.into_inner();
2551        let rt_reader = ReaderBuilder::new()
2552            .build(Cursor::new(bytes))
2553            .expect("build round_trip reader");
2554        let rt_schema = rt_reader.schema();
2555        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2556        let round_trip =
2557            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
2558        assert_eq!(
2559            round_trip, original,
2560            "Round-trip batch mismatch for avro/single_nan.avro"
2561        );
2562        Ok(())
2563    }
2564    #[test]
2565    // TODO: avoid requiring snappy for this file
2566    #[cfg(feature = "snappy")]
2567    fn test_dict_pages_offset_zero_roundtrip() -> Result<(), AvroError> {
2568        let path = arrow_test_data("avro/dict-page-offset-zero.avro");
2569        let rdr_file = File::open(&path).expect("open avro/dict-page-offset-zero.avro");
2570        let reader = ReaderBuilder::new()
2571            .build(BufReader::new(rdr_file))
2572            .expect("build reader for dict-page-offset-zero.avro");
2573        let in_schema = reader.schema();
2574        let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
2575        let original =
2576            arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
2577        let buffer: Vec<u8> = Vec::new();
2578        let mut writer = AvroWriter::new(buffer, original.schema().as_ref().clone())?;
2579        writer.write(&original)?;
2580        writer.finish()?;
2581        let bytes = writer.into_inner();
2582        let rt_reader = ReaderBuilder::new()
2583            .build(Cursor::new(bytes))
2584            .expect("build reader for round-trip");
2585        let rt_schema = rt_reader.schema();
2586        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2587        let roundtrip =
2588            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip");
2589        assert_eq!(
2590            roundtrip, original,
2591            "Round-trip batch mismatch for avro/dict-page-offset-zero.avro"
2592        );
2593        Ok(())
2594    }
2595
2596    #[test]
2597    #[cfg(feature = "snappy")]
2598    fn test_repeated_no_annotation_roundtrip() -> Result<(), AvroError> {
2599        let path = arrow_test_data("avro/repeated_no_annotation.avro");
2600        let in_file = File::open(&path).expect("open avro/repeated_no_annotation.avro");
2601        let reader = ReaderBuilder::new()
2602            .build(BufReader::new(in_file))
2603            .expect("build reader for repeated_no_annotation.avro");
2604        let in_schema = reader.schema();
2605        let in_batches = reader.collect::<Result<Vec<_>, _>>()?;
2606        let original =
2607            arrow::compute::concat_batches(&in_schema, &in_batches).expect("concat input");
2608        let mut writer = AvroWriter::new(Vec::<u8>::new(), original.schema().as_ref().clone())?;
2609        writer.write(&original)?;
2610        writer.finish()?;
2611        let bytes = writer.into_inner();
2612        let rt_reader = ReaderBuilder::new()
2613            .build(Cursor::new(bytes))
2614            .expect("build reader for round-trip buffer");
2615        let rt_schema = rt_reader.schema();
2616        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2617        let round_trip =
2618            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round-trip");
2619        assert_eq!(
2620            round_trip, original,
2621            "Round-trip batch mismatch for avro/repeated_no_annotation.avro"
2622        );
2623        Ok(())
2624    }
2625
2626    #[test]
2627    fn test_nested_record_type_reuse_roundtrip() -> Result<(), AvroError> {
2628        let path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
2629            .join("test/data/nested_record_reuse.avro")
2630            .to_string_lossy()
2631            .into_owned();
2632        let in_file = File::open(&path).expect("open avro/nested_record_reuse.avro");
2633        let reader = ReaderBuilder::new()
2634            .build(BufReader::new(in_file))
2635            .expect("build reader for nested_record_reuse.avro");
2636        let in_schema = reader.schema();
2637        let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
2638        let input =
2639            arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
2640        let mut writer = AvroWriter::new(Vec::<u8>::new(), in_schema.as_ref().clone())?;
2641        writer.write(&input)?;
2642        writer.finish()?;
2643        let bytes = writer.into_inner();
2644        let rt_reader = ReaderBuilder::new()
2645            .build(Cursor::new(bytes))
2646            .expect("build round_trip reader");
2647        let rt_schema = rt_reader.schema();
2648        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2649        let round_trip =
2650            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
2651        assert_eq!(
2652            round_trip, input,
2653            "Round-trip batch mismatch for nested_record_reuse.avro"
2654        );
2655        Ok(())
2656    }
2657
2658    #[test]
2659    fn test_enum_type_reuse_roundtrip() -> Result<(), AvroError> {
2660        let path =
2661            std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test/data/enum_reuse.avro");
2662        let rdr_file = std::fs::File::open(&path).expect("open test/data/enum_reuse.avro");
2663        let reader = ReaderBuilder::new()
2664            .build(std::io::BufReader::new(rdr_file))
2665            .expect("build reader for enum_reuse.avro");
2666        let in_schema = reader.schema();
2667        let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
2668        let original =
2669            arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
2670        let mut writer = AvroWriter::new(Vec::<u8>::new(), original.schema().as_ref().clone())?;
2671        writer.write(&original)?;
2672        writer.finish()?;
2673        let bytes = writer.into_inner();
2674        let rt_reader = ReaderBuilder::new()
2675            .build(std::io::Cursor::new(bytes))
2676            .expect("build round_trip reader");
2677        let rt_schema = rt_reader.schema();
2678        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2679        let round_trip =
2680            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
2681        assert_eq!(
2682            round_trip, original,
2683            "Avro enum type reuse round-trip mismatch"
2684        );
2685        Ok(())
2686    }
2687
2688    #[test]
2689    fn comprehensive_e2e_test_roundtrip() -> Result<(), AvroError> {
2690        let path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
2691            .join("test/data/comprehensive_e2e.avro");
2692        let rdr_file = File::open(&path).expect("open test/data/comprehensive_e2e.avro");
2693        let reader = ReaderBuilder::new()
2694            .build(BufReader::new(rdr_file))
2695            .expect("build reader for comprehensive_e2e.avro");
2696        let in_schema = reader.schema();
2697        let in_batches = reader.collect::<Result<Vec<_>, _>>()?;
2698        let original =
2699            arrow::compute::concat_batches(&in_schema, &in_batches).expect("concat input");
2700        let sink: Vec<u8> = Vec::new();
2701        let mut writer = AvroWriter::new(sink, original.schema().as_ref().clone())?;
2702        writer.write(&original)?;
2703        writer.finish()?;
2704        let bytes = writer.into_inner();
2705        let rt_reader = ReaderBuilder::new()
2706            .build(Cursor::new(bytes))
2707            .expect("build round-trip reader");
2708        let rt_schema = rt_reader.schema();
2709        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2710        let roundtrip =
2711            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip");
2712        assert_eq!(
2713            roundtrip, original,
2714            "Round-trip batch mismatch for comprehensive_e2e.avro"
2715        );
2716        Ok(())
2717    }
2718
2719    #[test]
2720    fn test_roundtrip_new_time_encoders_writer() -> Result<(), AvroError> {
2721        let schema = Schema::new(vec![
2722            Field::new("d32", DataType::Date32, false),
2723            Field::new("t32_ms", DataType::Time32(TimeUnit::Millisecond), false),
2724            Field::new("t64_us", DataType::Time64(TimeUnit::Microsecond), false),
2725            Field::new(
2726                "ts_ms",
2727                DataType::Timestamp(TimeUnit::Millisecond, None),
2728                false,
2729            ),
2730            Field::new(
2731                "ts_us",
2732                DataType::Timestamp(TimeUnit::Microsecond, None),
2733                false,
2734            ),
2735            Field::new(
2736                "ts_ns",
2737                DataType::Timestamp(TimeUnit::Nanosecond, None),
2738                false,
2739            ),
2740        ]);
2741        let d32 = Date32Array::from(vec![0, 1, -1]);
2742        let t32_ms: PrimitiveArray<Time32MillisecondType> =
2743            vec![0_i32, 12_345_i32, 86_399_999_i32].into();
2744        let t64_us: PrimitiveArray<Time64MicrosecondType> =
2745            vec![0_i64, 1_234_567_i64, 86_399_999_999_i64].into();
2746        let ts_ms: PrimitiveArray<TimestampMillisecondType> =
2747            vec![0_i64, -1_i64, 1_700_000_000_000_i64].into();
2748        let ts_us: PrimitiveArray<TimestampMicrosecondType> = vec![0_i64, 1_i64, -1_i64].into();
2749        let ts_ns: PrimitiveArray<TimestampNanosecondType> = vec![0_i64, 1_i64, -1_i64].into();
2750        let batch = RecordBatch::try_new(
2751            Arc::new(schema.clone()),
2752            vec![
2753                Arc::new(d32) as ArrayRef,
2754                Arc::new(t32_ms) as ArrayRef,
2755                Arc::new(t64_us) as ArrayRef,
2756                Arc::new(ts_ms) as ArrayRef,
2757                Arc::new(ts_us) as ArrayRef,
2758                Arc::new(ts_ns) as ArrayRef,
2759            ],
2760        )?;
2761        let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
2762        writer.write(&batch)?;
2763        writer.finish()?;
2764        let bytes = writer.into_inner();
2765        let rt_reader = ReaderBuilder::new()
2766            .build(std::io::Cursor::new(bytes))
2767            .expect("build reader for round-trip of new time encoders");
2768        let rt_schema = rt_reader.schema();
2769        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2770        let roundtrip =
2771            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip");
2772        assert_eq!(roundtrip, batch);
2773        Ok(())
2774    }
2775
2776    fn make_encoder_schema() -> Schema {
2777        Schema::new(vec![
2778            Field::new("a", DataType::Int32, false),
2779            Field::new("b", DataType::Int32, false),
2780        ])
2781    }
2782
2783    fn make_encoder_batch(schema: &Schema) -> RecordBatch {
2784        let a = Int32Array::from(vec![1, 2, 3]);
2785        let b = Int32Array::from(vec![10, 20, 30]);
2786        RecordBatch::try_new(
2787            Arc::new(schema.clone()),
2788            vec![Arc::new(a) as ArrayRef, Arc::new(b) as ArrayRef],
2789        )
2790        .expect("failed to build test RecordBatch")
2791    }
2792
2793    fn make_real_avro_schema_and_batch() -> Result<(Schema, RecordBatch, AvroSchema), AvroError> {
2794        let avro_json = r#"
2795        {
2796          "type": "record",
2797          "name": "User",
2798          "fields": [
2799            { "name": "id",     "type": "long" },
2800            { "name": "name",   "type": "string" },
2801            { "name": "active", "type": "boolean" },
2802            { "name": "tags",   "type": { "type": "array", "items": "int" } },
2803            { "name": "opt",    "type": ["null", "string"], "default": null }
2804          ]
2805        }"#;
2806        let avro_schema = AvroSchema::new(avro_json.to_string());
2807        let mut md = HashMap::new();
2808        md.insert(
2809            SCHEMA_METADATA_KEY.to_string(),
2810            avro_schema.json_string.clone(),
2811        );
2812        let item_field = Arc::new(Field::new(
2813            Field::LIST_FIELD_DEFAULT_NAME,
2814            DataType::Int32,
2815            false,
2816        ));
2817        let schema = Schema::new_with_metadata(
2818            vec![
2819                Field::new("id", DataType::Int64, false),
2820                Field::new("name", DataType::Utf8, false),
2821                Field::new("active", DataType::Boolean, false),
2822                Field::new("tags", DataType::List(item_field.clone()), false),
2823                Field::new("opt", DataType::Utf8, true),
2824            ],
2825            md,
2826        );
2827        let id = Int64Array::from(vec![1, 2, 3]);
2828        let name = StringArray::from(vec!["alice", "bob", "carol"]);
2829        let active = BooleanArray::from(vec![true, false, true]);
2830        let mut tags_builder = ListBuilder::new(Int32Builder::new()).with_field(item_field);
2831        tags_builder.values().append_value(1);
2832        tags_builder.values().append_value(2);
2833        tags_builder.append(true);
2834        tags_builder.append(true);
2835        tags_builder.values().append_value(3);
2836        tags_builder.append(true);
2837        let tags = tags_builder.finish();
2838        let opt = StringArray::from(vec![Some("x"), None, Some("z")]);
2839        let batch = RecordBatch::try_new(
2840            Arc::new(schema.clone()),
2841            vec![
2842                Arc::new(id) as ArrayRef,
2843                Arc::new(name) as ArrayRef,
2844                Arc::new(active) as ArrayRef,
2845                Arc::new(tags) as ArrayRef,
2846                Arc::new(opt) as ArrayRef,
2847            ],
2848        )?;
2849        Ok((schema, batch, avro_schema))
2850    }
2851
2852    #[test]
2853    fn test_row_writer_matches_stream_writer_soe() -> Result<(), AvroError> {
2854        let schema = make_encoder_schema();
2855        let batch = make_encoder_batch(&schema);
2856        let mut stream = AvroStreamWriter::new(Vec::<u8>::new(), schema.clone())?;
2857        stream.write(&batch)?;
2858        stream.finish()?;
2859        let stream_bytes = stream.into_inner();
2860        let mut row_writer = WriterBuilder::new(schema).build_encoder::<AvroSoeFormat>()?;
2861        row_writer.encode(&batch)?;
2862        let rows = row_writer.flush();
2863        let row_bytes: Vec<u8> = rows.bytes().to_vec();
2864        assert_eq!(stream_bytes, row_bytes);
2865        Ok(())
2866    }
2867
2868    #[test]
2869    fn test_row_writer_flush_clears_buffer() -> Result<(), AvroError> {
2870        let schema = make_encoder_schema();
2871        let batch = make_encoder_batch(&schema);
2872        let mut row_writer = WriterBuilder::new(schema).build_encoder::<AvroSoeFormat>()?;
2873        row_writer.encode(&batch)?;
2874        assert_eq!(row_writer.buffered_len(), batch.num_rows());
2875        let out1 = row_writer.flush();
2876        assert_eq!(out1.len(), batch.num_rows());
2877        assert_eq!(row_writer.buffered_len(), 0);
2878        let out2 = row_writer.flush();
2879        assert_eq!(out2.len(), 0);
2880        Ok(())
2881    }
2882
2883    #[test]
2884    fn test_row_writer_roundtrip_decoder_soe_real_avro_data() -> Result<(), AvroError> {
2885        let (schema, batch, avro_schema) = make_real_avro_schema_and_batch()?;
2886        let mut store = SchemaStore::new();
2887        store.register(avro_schema.clone())?;
2888        let mut row_writer = WriterBuilder::new(schema).build_encoder::<AvroSoeFormat>()?;
2889        row_writer.encode(&batch)?;
2890        let rows = row_writer.flush();
2891        let mut decoder = ReaderBuilder::new()
2892            .with_writer_schema_store(store)
2893            .with_batch_size(1024)
2894            .build_decoder()?;
2895        for row in rows.iter() {
2896            let consumed = decoder.decode(row.as_ref())?;
2897            assert_eq!(
2898                consumed,
2899                row.len(),
2900                "decoder should consume the full row frame"
2901            );
2902        }
2903        let out = decoder.flush()?.expect("decoded batch");
2904        let expected = pretty_format_batches(std::slice::from_ref(&batch))?.to_string();
2905        let actual = pretty_format_batches(&[out])?.to_string();
2906        assert_eq!(expected, actual);
2907        Ok(())
2908    }
2909
2910    #[test]
2911    fn test_row_writer_roundtrip_decoder_soe_streaming_chunks() -> Result<(), AvroError> {
2912        let (schema, batch, avro_schema) = make_real_avro_schema_and_batch()?;
2913        let mut store = SchemaStore::new();
2914        store.register(avro_schema.clone())?;
2915        let mut row_writer = WriterBuilder::new(schema).build_encoder::<AvroSoeFormat>()?;
2916        row_writer.encode(&batch)?;
2917        let rows = row_writer.flush();
2918        // Build a contiguous stream and frame boundaries (prefix sums) from EncodedRows.
2919        let mut stream: Vec<u8> = Vec::new();
2920        let mut boundaries: Vec<usize> = Vec::with_capacity(rows.len() + 1);
2921        boundaries.push(0usize);
2922        for row in rows.iter() {
2923            stream.extend_from_slice(row.as_ref());
2924            boundaries.push(stream.len());
2925        }
2926        let mut decoder = ReaderBuilder::new()
2927            .with_writer_schema_store(store)
2928            .with_batch_size(1024)
2929            .build_decoder()?;
2930        let mut buffered = BytesMut::new();
2931        let chunk_rows = [1usize, 2, 3, 1, 4, 2];
2932        let mut row_idx = 0usize;
2933        let mut i = 0usize;
2934        let n_rows = rows.len();
2935        while row_idx < n_rows {
2936            let take = chunk_rows[i % chunk_rows.len()];
2937            i += 1;
2938            let end_row = (row_idx + take).min(n_rows);
2939            let byte_start = boundaries[row_idx];
2940            let byte_end = boundaries[end_row];
2941            buffered.extend_from_slice(&stream[byte_start..byte_end]);
2942            loop {
2943                let consumed = decoder.decode(&buffered)?;
2944                if consumed == 0 {
2945                    break;
2946                }
2947                let _ = buffered.split_to(consumed);
2948            }
2949            assert!(
2950                buffered.is_empty(),
2951                "expected decoder to consume the entire frame-aligned chunk"
2952            );
2953            row_idx = end_row;
2954        }
2955        let out = decoder.flush()?.expect("decoded batch");
2956        let expected = pretty_format_batches(std::slice::from_ref(&batch))?.to_string();
2957        let actual = pretty_format_batches(&[out])?.to_string();
2958        assert_eq!(expected, actual);
2959        Ok(())
2960    }
2961
2962    #[test]
2963    fn test_row_writer_roundtrip_decoder_confluent_wire_format_id() -> Result<(), AvroError> {
2964        let (schema, batch, avro_schema) = make_real_avro_schema_and_batch()?;
2965        let schema_id: u32 = 42;
2966        let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id);
2967        store.set(Fingerprint::Id(schema_id), avro_schema.clone())?;
2968        let mut row_writer = WriterBuilder::new(schema)
2969            .with_fingerprint_strategy(FingerprintStrategy::Id(schema_id))
2970            .build_encoder::<AvroSoeFormat>()?;
2971        row_writer.encode(&batch)?;
2972        let rows = row_writer.flush();
2973        let mut decoder = ReaderBuilder::new()
2974            .with_writer_schema_store(store)
2975            .with_batch_size(1024)
2976            .build_decoder()?;
2977        for row in rows.iter() {
2978            let consumed = decoder.decode(row.as_ref())?;
2979            assert_eq!(consumed, row.len());
2980        }
2981        let out = decoder.flush()?.expect("decoded batch");
2982        let expected = pretty_format_batches(std::slice::from_ref(&batch))?.to_string();
2983        let actual = pretty_format_batches(&[out])?.to_string();
2984        assert_eq!(expected, actual);
2985        Ok(())
2986    }
2987    #[test]
2988    fn test_encoder_encode_batches_flush_and_encoded_rows_methods_with_avro_binary_format()
2989    -> Result<(), AvroError> {
2990        use crate::writer::format::AvroBinaryFormat;
2991        use arrow_array::{ArrayRef, Int32Array, RecordBatch};
2992        use arrow_schema::{DataType, Field, Schema};
2993        use std::sync::Arc;
2994        let schema = Schema::new(vec![
2995            Field::new("a", DataType::Int32, false),
2996            Field::new("b", DataType::Int32, false),
2997        ]);
2998        let schema_ref = Arc::new(schema.clone());
2999        let batch1 = RecordBatch::try_new(
3000            schema_ref.clone(),
3001            vec![
3002                Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef,
3003                Arc::new(Int32Array::from(vec![10, 20, 30])) as ArrayRef,
3004            ],
3005        )?;
3006        let batch2 = RecordBatch::try_new(
3007            schema_ref,
3008            vec![
3009                Arc::new(Int32Array::from(vec![4, 5])) as ArrayRef,
3010                Arc::new(Int32Array::from(vec![40, 50])) as ArrayRef,
3011            ],
3012        )?;
3013        let mut encoder = WriterBuilder::new(schema).build_encoder::<AvroBinaryFormat>()?;
3014        let empty = Encoder::flush(&mut encoder);
3015        assert_eq!(EncodedRows::len(&empty), 0);
3016        assert!(EncodedRows::is_empty(&empty));
3017        assert_eq!(EncodedRows::bytes(&empty).as_ref(), &[] as &[u8]);
3018        assert_eq!(EncodedRows::offsets(&empty), &[0usize]);
3019        assert_eq!(EncodedRows::iter(&empty).count(), 0);
3020        let empty_vecs: Vec<Vec<u8>> = empty.iter().map(|b| b.to_vec()).collect();
3021        assert!(empty_vecs.is_empty());
3022        let batches = vec![batch1, batch2];
3023        Encoder::encode_batches(&mut encoder, &batches)?;
3024        assert_eq!(encoder.buffered_len(), 5);
3025        let rows = Encoder::flush(&mut encoder);
3026        assert_eq!(
3027            encoder.buffered_len(),
3028            0,
3029            "Encoder::flush should reset the internal offsets"
3030        );
3031        assert_eq!(EncodedRows::len(&rows), 5);
3032        assert!(!EncodedRows::is_empty(&rows));
3033        let expected_offsets: &[usize] = &[0, 2, 4, 6, 8, 10];
3034        assert_eq!(EncodedRows::offsets(&rows), expected_offsets);
3035        let expected_rows: Vec<Vec<u8>> = vec![
3036            vec![2, 20],
3037            vec![4, 40],
3038            vec![6, 60],
3039            vec![8, 80],
3040            vec![10, 100],
3041        ];
3042        let expected_stream: Vec<u8> = expected_rows.concat();
3043        assert_eq!(
3044            EncodedRows::bytes(&rows).as_ref(),
3045            expected_stream.as_slice()
3046        );
3047        for (i, expected) in expected_rows.iter().enumerate() {
3048            assert_eq!(EncodedRows::row(&rows, i)?.as_ref(), expected.as_slice());
3049        }
3050        let iter_rows: Vec<Vec<u8>> = EncodedRows::iter(&rows).map(|b| b.to_vec()).collect();
3051        assert_eq!(iter_rows, expected_rows);
3052        let recreated = EncodedRows::new(
3053            EncodedRows::bytes(&rows).clone(),
3054            EncodedRows::offsets(&rows).to_vec(),
3055        );
3056        assert_eq!(EncodedRows::len(&recreated), EncodedRows::len(&rows));
3057        assert_eq!(EncodedRows::bytes(&recreated), EncodedRows::bytes(&rows));
3058        assert_eq!(
3059            EncodedRows::offsets(&recreated),
3060            EncodedRows::offsets(&rows)
3061        );
3062        let rec_vecs: Vec<Vec<u8>> = recreated.iter().map(|b| b.to_vec()).collect();
3063        assert_eq!(rec_vecs, iter_rows);
3064        let empty_again = Encoder::flush(&mut encoder);
3065        assert!(EncodedRows::is_empty(&empty_again));
3066        Ok(())
3067    }
3068
3069    #[test]
3070    fn test_writer_builder_build_rejects_avro_binary_format() {
3071        use crate::writer::format::AvroBinaryFormat;
3072        use arrow_schema::{DataType, Field, Schema};
3073        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
3074        let err = WriterBuilder::new(schema)
3075            .build::<_, AvroBinaryFormat>(Vec::<u8>::new())
3076            .unwrap_err();
3077        match err {
3078            AvroError::InvalidArgument(msg) => assert_eq!(
3079                msg,
3080                "AvroBinaryFormat is only supported with Encoder, use build_encoder instead"
3081            ),
3082            other => panic!("expected InvalidArgumentError, got {:?}", other),
3083        }
3084    }
3085    #[test]
3086    fn test_row_encoder_avro_binary_format_roundtrip_decoder_with_soe_framing()
3087    -> Result<(), AvroError> {
3088        use crate::writer::format::AvroBinaryFormat;
3089        let (schema, batch, avro_schema) = make_real_avro_schema_and_batch()?;
3090        let batches: Vec<RecordBatch> = vec![batch.clone(), batch.slice(1, 2)];
3091        let expected = arrow::compute::concat_batches(&batch.schema(), &batches)?;
3092        let mut binary_encoder =
3093            WriterBuilder::new(schema.clone()).build_encoder::<AvroBinaryFormat>()?;
3094        binary_encoder.encode_batches(&batches)?;
3095        let binary_rows = binary_encoder.flush();
3096        assert_eq!(
3097            binary_rows.len(),
3098            expected.num_rows(),
3099            "binary encoder row count mismatch"
3100        );
3101        let mut soe_encoder = WriterBuilder::new(schema).build_encoder::<AvroSoeFormat>()?;
3102        soe_encoder.encode_batches(&batches)?;
3103        let soe_rows = soe_encoder.flush();
3104        assert_eq!(
3105            soe_rows.len(),
3106            binary_rows.len(),
3107            "SOE vs binary row count mismatch"
3108        );
3109        let mut store = SchemaStore::new(); // Rabin by default
3110        let fp = store.register(avro_schema)?;
3111        let fp_le_bytes = match fp {
3112            Fingerprint::Rabin(v) => v.to_le_bytes(),
3113            other => panic!("expected Rabin fingerprint from SchemaStore::new(), got {other:?}"),
3114        };
3115        const SOE_MAGIC: [u8; 2] = [0xC3, 0x01];
3116        const SOE_PREFIX_LEN: usize = 2 + 8;
3117        for i in 0..binary_rows.len() {
3118            let body = binary_rows.row(i)?;
3119            let soe = soe_rows.row(i)?;
3120            assert!(
3121                soe.len() >= SOE_PREFIX_LEN,
3122                "expected SOE row to include prefix"
3123            );
3124            assert_eq!(&soe.as_ref()[..2], &SOE_MAGIC);
3125            assert_eq!(&soe.as_ref()[2..SOE_PREFIX_LEN], &fp_le_bytes);
3126            assert_eq!(
3127                &soe.as_ref()[SOE_PREFIX_LEN..],
3128                body.as_ref(),
3129                "SOE body bytes differ from AvroBinaryFormat body bytes (row {i})"
3130            );
3131        }
3132        let mut decoder = ReaderBuilder::new()
3133            .with_writer_schema_store(store)
3134            .with_batch_size(1024)
3135            .build_decoder()?;
3136        for body in binary_rows.iter() {
3137            let mut framed = Vec::with_capacity(SOE_PREFIX_LEN + body.len());
3138            framed.extend_from_slice(&SOE_MAGIC);
3139            framed.extend_from_slice(&fp_le_bytes);
3140            framed.extend_from_slice(body.as_ref());
3141            let consumed = decoder.decode(&framed)?;
3142            assert_eq!(
3143                consumed,
3144                framed.len(),
3145                "decoder should consume the full SOE-framed message"
3146            );
3147        }
3148        let out = decoder.flush()?.expect("expected a decoded RecordBatch");
3149        let expected_str = pretty_format_batches(&[expected])?.to_string();
3150        let actual_str = pretty_format_batches(&[out])?.to_string();
3151        assert_eq!(expected_str, actual_str);
3152        Ok(())
3153    }
3154
3155    #[test]
3156    fn test_row_encoder_avro_binary_format_roundtrip_decoder_streaming_chunks()
3157    -> Result<(), AvroError> {
3158        use crate::writer::format::AvroBinaryFormat;
3159        let (schema, batch, avro_schema) = make_real_avro_schema_and_batch()?;
3160        let mut encoder = WriterBuilder::new(schema).build_encoder::<AvroBinaryFormat>()?;
3161        encoder.encode(&batch)?;
3162        let rows = encoder.flush();
3163        let mut store = SchemaStore::new();
3164        let fp = store.register(avro_schema)?;
3165        let fp_le_bytes = match fp {
3166            Fingerprint::Rabin(v) => v.to_le_bytes(),
3167            other => panic!("expected Rabin fingerprint from SchemaStore::new(), got {other:?}"),
3168        };
3169        const SOE_MAGIC: [u8; 2] = [0xC3, 0x01];
3170        const SOE_PREFIX_LEN: usize = 2 + 8;
3171        let mut stream: Vec<u8> = Vec::new();
3172        for body in rows.iter() {
3173            let msg_len: u32 = (SOE_PREFIX_LEN + body.len())
3174                .try_into()
3175                .expect("message length must fit in u32");
3176            stream.extend_from_slice(&msg_len.to_le_bytes());
3177            stream.extend_from_slice(&SOE_MAGIC);
3178            stream.extend_from_slice(&fp_le_bytes);
3179            stream.extend_from_slice(body.as_ref());
3180        }
3181        let mut decoder = ReaderBuilder::new()
3182            .with_writer_schema_store(store)
3183            .with_batch_size(1024)
3184            .build_decoder()?;
3185        let chunk_sizes = [1usize, 2, 3, 5, 8, 13, 21, 34];
3186        let mut pos = 0usize;
3187        let mut i = 0usize;
3188        let mut buffered = BytesMut::new();
3189        let mut decoded_frames = 0usize;
3190        while pos < stream.len() {
3191            let take = chunk_sizes[i % chunk_sizes.len()];
3192            i += 1;
3193            let end = (pos + take).min(stream.len());
3194            buffered.extend_from_slice(&stream[pos..end]);
3195            pos = end;
3196            loop {
3197                if buffered.len() < 4 {
3198                    break;
3199                }
3200                let msg_len =
3201                    u32::from_le_bytes([buffered[0], buffered[1], buffered[2], buffered[3]])
3202                        as usize;
3203                if buffered.len() < 4 + msg_len {
3204                    break;
3205                }
3206                let frame = buffered.split_to(4 + msg_len);
3207                let payload = &frame[4..];
3208                let consumed = decoder.decode(payload)?;
3209                assert_eq!(
3210                    consumed,
3211                    payload.len(),
3212                    "decoder should consume the full SOE-framed message"
3213                );
3214
3215                decoded_frames += 1;
3216            }
3217        }
3218        assert!(
3219            buffered.is_empty(),
3220            "expected transport framer to consume all bytes; leftover = {}",
3221            buffered.len()
3222        );
3223        assert_eq!(
3224            decoded_frames,
3225            rows.len(),
3226            "expected to decode exactly one frame per encoded row"
3227        );
3228        let out = decoder.flush()?.expect("expected decoded RecordBatch");
3229        let expected_str = pretty_format_batches(std::slice::from_ref(&batch))?.to_string();
3230        let actual_str = pretty_format_batches(&[out])?.to_string();
3231        assert_eq!(expected_str, actual_str);
3232        Ok(())
3233    }
3234
3235    /// Helper to roundtrip a RecordBatch through OCF writer/reader
3236    fn roundtrip_ocf(batch: &RecordBatch) -> Result<RecordBatch, AvroError> {
3237        let schema = batch.schema();
3238        let mut buffer = Vec::<u8>::new();
3239        let mut writer = AvroWriter::new(&mut buffer, schema.as_ref().clone())?;
3240        writer.write(batch)?;
3241        writer.finish()?;
3242        drop(writer);
3243        let reader = ReaderBuilder::new()
3244            .build(Cursor::new(buffer))
3245            .expect("build reader for roundtrip OCF");
3246        // Get the Avro schema JSON from the OCF header
3247        let avro_schema_json = reader
3248            .avro_header()
3249            .get(SCHEMA_METADATA_KEY)
3250            .map(|raw| std::str::from_utf8(raw).expect("valid UTF-8").to_string());
3251        // Get the Arrow schema and add the Avro schema metadata
3252        let arrow_schema = reader.schema();
3253        let rt_schema = if let Some(json) = avro_schema_json {
3254            let mut metadata = arrow_schema.metadata().clone();
3255            metadata.insert(SCHEMA_METADATA_KEY.to_string(), json);
3256            Arc::new(Schema::new_with_metadata(
3257                arrow_schema.fields().clone(),
3258                metadata,
3259            ))
3260        } else {
3261            arrow_schema
3262        };
3263        let rt_batches: Vec<RecordBatch> = reader.collect::<Result<Vec<_>, _>>()?;
3264        Ok(arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip"))
3265    }
3266
3267    /// Assert that an array roundtrips through Avro OCF and comes back identical.
3268    #[cfg(feature = "avro_custom_types")]
3269    fn assert_round_trip(array: ArrayRef) {
3270        assert_round_trip_widened(array.clone(), array);
3271    }
3272
3273    /// Assert that an input array roundtrips through Avro OCF and produces the expected output.
3274    fn assert_round_trip_widened(input: ArrayRef, expected: ArrayRef) {
3275        let schema = Schema::new(vec![Field::new("val", input.data_type().clone(), true)]);
3276        let batch =
3277            RecordBatch::try_new(Arc::new(schema), vec![input]).expect("failed to create batch");
3278        let roundtrip = roundtrip_ocf(&batch).expect("roundtrip failed");
3279        assert_eq!(
3280            roundtrip.column(0).data_type(),
3281            expected.data_type(),
3282            "output data type mismatch"
3283        );
3284        assert_eq!(
3285            roundtrip.column(0).to_data(),
3286            expected.to_data(),
3287            "output data mismatch"
3288        );
3289    }
3290
3291    #[cfg(feature = "avro_custom_types")]
3292    #[test]
3293    fn test_roundtrip_int8_custom_types() {
3294        assert_round_trip(Arc::new(Int8Array::from(vec![
3295            Some(i8::MIN),
3296            Some(-1),
3297            Some(0),
3298            None,
3299            Some(1),
3300            Some(i8::MAX),
3301        ])));
3302    }
3303
3304    #[cfg(not(feature = "avro_custom_types"))]
3305    #[test]
3306    fn test_roundtrip_int8_no_custom_widens_to_int32() {
3307        assert_round_trip_widened(
3308            Arc::new(Int8Array::from(vec![
3309                Some(i8::MIN),
3310                Some(-1),
3311                Some(0),
3312                None,
3313                Some(1),
3314                Some(i8::MAX),
3315            ])),
3316            Arc::new(Int32Array::from(vec![
3317                Some(i8::MIN as i32),
3318                Some(-1),
3319                Some(0),
3320                None,
3321                Some(1),
3322                Some(i8::MAX as i32),
3323            ])),
3324        );
3325    }
3326
3327    #[cfg(feature = "avro_custom_types")]
3328    #[test]
3329    fn test_roundtrip_int16_custom_types() {
3330        assert_round_trip(Arc::new(Int16Array::from(vec![
3331            Some(i16::MIN),
3332            Some(-1),
3333            Some(0),
3334            None,
3335            Some(1),
3336            Some(i16::MAX),
3337        ])));
3338    }
3339
3340    #[cfg(not(feature = "avro_custom_types"))]
3341    #[test]
3342    fn test_roundtrip_int16_no_custom_widens_to_int32() {
3343        assert_round_trip_widened(
3344            Arc::new(Int16Array::from(vec![
3345                Some(i16::MIN),
3346                Some(-1),
3347                Some(0),
3348                None,
3349                Some(1),
3350                Some(i16::MAX),
3351            ])),
3352            Arc::new(Int32Array::from(vec![
3353                Some(i16::MIN as i32),
3354                Some(-1),
3355                Some(0),
3356                None,
3357                Some(1),
3358                Some(i16::MAX as i32),
3359            ])),
3360        );
3361    }
3362
3363    #[cfg(feature = "avro_custom_types")]
3364    #[test]
3365    fn test_roundtrip_uint8_custom_types() {
3366        assert_round_trip(Arc::new(UInt8Array::from(vec![
3367            Some(0u8),
3368            Some(1),
3369            None,
3370            Some(127),
3371            Some(u8::MAX),
3372        ])));
3373    }
3374
3375    #[cfg(not(feature = "avro_custom_types"))]
3376    #[test]
3377    fn test_roundtrip_uint8_no_custom_widens_to_int32() {
3378        assert_round_trip_widened(
3379            Arc::new(UInt8Array::from(vec![
3380                Some(0u8),
3381                Some(1),
3382                None,
3383                Some(127),
3384                Some(u8::MAX),
3385            ])),
3386            Arc::new(Int32Array::from(vec![
3387                Some(0i32),
3388                Some(1),
3389                None,
3390                Some(127),
3391                Some(u8::MAX as i32),
3392            ])),
3393        );
3394    }
3395
3396    #[cfg(feature = "avro_custom_types")]
3397    #[test]
3398    fn test_roundtrip_uint16_custom_types() {
3399        assert_round_trip(Arc::new(UInt16Array::from(vec![
3400            Some(0u16),
3401            Some(1),
3402            None,
3403            Some(32767),
3404            Some(u16::MAX),
3405        ])));
3406    }
3407
3408    #[cfg(not(feature = "avro_custom_types"))]
3409    #[test]
3410    fn test_roundtrip_uint16_no_custom_widens_to_int32() {
3411        assert_round_trip_widened(
3412            Arc::new(UInt16Array::from(vec![
3413                Some(0u16),
3414                Some(1),
3415                None,
3416                Some(32767),
3417                Some(u16::MAX),
3418            ])),
3419            Arc::new(Int32Array::from(vec![
3420                Some(0i32),
3421                Some(1),
3422                None,
3423                Some(32767),
3424                Some(u16::MAX as i32),
3425            ])),
3426        );
3427    }
3428
3429    #[cfg(feature = "avro_custom_types")]
3430    #[test]
3431    fn test_roundtrip_uint32_custom_types() {
3432        assert_round_trip(Arc::new(UInt32Array::from(vec![
3433            Some(0u32),
3434            Some(1),
3435            None,
3436            Some(i32::MAX as u32),
3437            Some(u32::MAX),
3438        ])));
3439    }
3440
3441    #[cfg(not(feature = "avro_custom_types"))]
3442    #[test]
3443    fn test_roundtrip_uint32_no_custom_widens_to_int64() {
3444        assert_round_trip_widened(
3445            Arc::new(UInt32Array::from(vec![
3446                Some(0u32),
3447                Some(1),
3448                None,
3449                Some(i32::MAX as u32),
3450                Some(u32::MAX),
3451            ])),
3452            Arc::new(Int64Array::from(vec![
3453                Some(0i64),
3454                Some(1),
3455                None,
3456                Some(i32::MAX as i64),
3457                Some(u32::MAX as i64),
3458            ])),
3459        );
3460    }
3461
3462    #[cfg(feature = "avro_custom_types")]
3463    #[test]
3464    fn test_roundtrip_uint64_custom_types() {
3465        assert_round_trip(Arc::new(UInt64Array::from(vec![
3466            Some(0u64),
3467            Some(1),
3468            None,
3469            Some(i64::MAX as u64),
3470            Some(u64::MAX),
3471        ])));
3472    }
3473
3474    #[cfg(not(feature = "avro_custom_types"))]
3475    #[test]
3476    fn test_roundtrip_uint64_no_custom_widens_to_int64() {
3477        assert_round_trip_widened(
3478            Arc::new(UInt64Array::from(vec![
3479                Some(0u64),
3480                Some(1),
3481                None,
3482                Some(i64::MAX as u64),
3483            ])),
3484            Arc::new(Int64Array::from(vec![
3485                Some(0i64),
3486                Some(1),
3487                None,
3488                Some(i64::MAX),
3489            ])),
3490        );
3491    }
3492
3493    #[cfg(not(feature = "avro_custom_types"))]
3494    #[test]
3495    fn test_roundtrip_uint64_overflow_errors_without_custom() {
3496        use arrow_array::UInt64Array;
3497        let schema = Schema::new(vec![Field::new("val", DataType::UInt64, false)]);
3498        let values: Vec<u64> = vec![u64::MAX];
3499        let array = UInt64Array::from(values);
3500        let batch = RecordBatch::try_new(Arc::new(schema), vec![Arc::new(array) as ArrayRef])
3501            .expect("create batch");
3502        let result = roundtrip_ocf(&batch);
3503        assert!(
3504            result.is_err(),
3505            "Expected error when encoding UInt64 > i64::MAX without avro_custom_types"
3506        );
3507    }
3508
3509    #[cfg(feature = "avro_custom_types")]
3510    #[test]
3511    fn test_roundtrip_float16_custom_types() {
3512        assert_round_trip(Arc::new(Float16Array::from(vec![
3513            Some(f16::ZERO),
3514            Some(f16::ONE),
3515            None,
3516            Some(f16::NEG_ONE),
3517            Some(f16::MAX),
3518            Some(f16::MIN),
3519        ])));
3520    }
3521
3522    #[cfg(not(feature = "avro_custom_types"))]
3523    #[test]
3524    fn test_roundtrip_float16_no_custom_widens_to_float32() {
3525        assert_round_trip_widened(
3526            Arc::new(Float16Array::from(vec![
3527                Some(f16::ZERO),
3528                Some(f16::ONE),
3529                None,
3530                Some(f16::NEG_ONE),
3531            ])),
3532            Arc::new(Float32Array::from(vec![
3533                Some(0.0f32),
3534                Some(1.0),
3535                None,
3536                Some(-1.0),
3537            ])),
3538        );
3539    }
3540
3541    #[cfg(feature = "avro_custom_types")]
3542    #[test]
3543    fn test_roundtrip_date64_custom_types() {
3544        assert_round_trip(Arc::new(Date64Array::from(vec![
3545            Some(0i64),
3546            Some(86_400_000),
3547            None,
3548            Some(1_609_459_200_000),
3549        ])));
3550    }
3551
3552    #[cfg(not(feature = "avro_custom_types"))]
3553    #[test]
3554    fn test_roundtrip_date64_no_custom_as_timestamp_millis() {
3555        assert_round_trip_widened(
3556            Arc::new(Date64Array::from(vec![
3557                Some(0i64),
3558                Some(86_400_000),
3559                None,
3560                Some(1_609_459_200_000),
3561            ])),
3562            Arc::new(TimestampMillisecondArray::from(vec![
3563                Some(0i64),
3564                Some(86_400_000),
3565                None,
3566                Some(1_609_459_200_000),
3567            ])),
3568        );
3569    }
3570
3571    #[cfg(feature = "avro_custom_types")]
3572    #[test]
3573    fn test_roundtrip_time64_nanosecond_custom_types() {
3574        assert_round_trip(Arc::new(Time64NanosecondArray::from(vec![
3575            Some(0i64),
3576            Some(1_000_000_000),
3577            None,
3578            Some(86_399_999_999_999),
3579        ])));
3580    }
3581
3582    #[cfg(not(feature = "avro_custom_types"))]
3583    #[test]
3584    fn test_roundtrip_time64_nanos_no_custom_truncates_to_micros() {
3585        // Use values evenly divisible by 1000 to avoid truncation issues
3586        assert_round_trip_widened(
3587            Arc::new(Time64NanosecondArray::from(vec![
3588                Some(0i64),
3589                Some(1_000_000_000),
3590                None,
3591                Some(86_399_999_000_000),
3592            ])),
3593            Arc::new(Time64MicrosecondArray::from(vec![
3594                Some(0i64),
3595                Some(1_000_000),
3596                None,
3597                Some(86_399_999_000),
3598            ])),
3599        );
3600    }
3601
3602    #[cfg(feature = "avro_custom_types")]
3603    #[test]
3604    fn test_roundtrip_time32_second_custom_types() {
3605        assert_round_trip(Arc::new(Time32SecondArray::from(vec![
3606            Some(0i32),
3607            Some(3600),
3608            None,
3609            Some(86399),
3610        ])));
3611    }
3612
3613    #[cfg(not(feature = "avro_custom_types"))]
3614    #[test]
3615    fn test_roundtrip_time32_second_no_custom_scales_to_millis() {
3616        assert_round_trip_widened(
3617            Arc::new(Time32SecondArray::from(vec![
3618                Some(0i32),
3619                Some(3600),
3620                None,
3621                Some(86399),
3622            ])),
3623            Arc::new(Time32MillisecondArray::from(vec![
3624                Some(0i32),
3625                Some(3_600_000),
3626                None,
3627                Some(86_399_000),
3628            ])),
3629        );
3630    }
3631
3632    #[cfg(feature = "avro_custom_types")]
3633    #[test]
3634    fn test_roundtrip_timestamp_second_custom_types() {
3635        assert_round_trip(Arc::new(
3636            TimestampSecondArray::from(vec![Some(0i64), Some(1609459200), None, Some(1735689600)])
3637                .with_timezone("+00:00"),
3638        ));
3639    }
3640
3641    #[cfg(not(feature = "avro_custom_types"))]
3642    #[test]
3643    fn test_roundtrip_timestamp_second_no_custom_scales_to_millis() {
3644        assert_round_trip_widened(
3645            Arc::new(
3646                TimestampSecondArray::from(vec![
3647                    Some(0i64),
3648                    Some(1609459200),
3649                    None,
3650                    Some(1735689600),
3651                ])
3652                .with_timezone("+00:00"),
3653            ),
3654            Arc::new(
3655                TimestampMillisecondArray::from(vec![
3656                    Some(0i64),
3657                    Some(1_609_459_200_000),
3658                    None,
3659                    Some(1_735_689_600_000),
3660                ])
3661                .with_timezone("+00:00"),
3662            ),
3663        );
3664    }
3665
3666    #[cfg(feature = "avro_custom_types")]
3667    #[test]
3668    fn test_roundtrip_interval_year_month_custom_types() {
3669        assert_round_trip(Arc::new(IntervalYearMonthArray::from(vec![
3670            Some(0i32),
3671            Some(12),
3672            None,
3673            Some(-6),
3674            Some(25),
3675        ])));
3676    }
3677
3678    #[cfg(not(feature = "avro_custom_types"))]
3679    #[test]
3680    fn test_roundtrip_interval_year_month_no_custom() {
3681        // Only non-negative values for standard Avro duration
3682        assert_round_trip_widened(
3683            Arc::new(IntervalYearMonthArray::from(vec![
3684                Some(0i32),
3685                Some(12),
3686                None,
3687                Some(25),
3688            ])),
3689            Arc::new(IntervalMonthDayNanoArray::from(vec![
3690                Some(IntervalMonthDayNano::new(0, 0, 0)),
3691                Some(IntervalMonthDayNano::new(12, 0, 0)),
3692                None,
3693                Some(IntervalMonthDayNano::new(25, 0, 0)),
3694            ])),
3695        );
3696    }
3697
3698    #[cfg(feature = "avro_custom_types")]
3699    #[test]
3700    fn test_roundtrip_interval_day_time_custom_types() {
3701        assert_round_trip(Arc::new(IntervalDayTimeArray::from(vec![
3702            Some(IntervalDayTime::new(0, 0)),
3703            Some(IntervalDayTime::new(1, 1000)),
3704            None,
3705            Some(IntervalDayTime::new(30, 3600000)),
3706        ])));
3707    }
3708
3709    #[cfg(not(feature = "avro_custom_types"))]
3710    #[test]
3711    fn test_roundtrip_interval_day_time_no_custom() {
3712        assert_round_trip_widened(
3713            Arc::new(IntervalDayTimeArray::from(vec![
3714                Some(IntervalDayTime::new(0, 0)),
3715                Some(IntervalDayTime::new(1, 1000)),
3716                None,
3717                Some(IntervalDayTime::new(30, 3600000)),
3718            ])),
3719            Arc::new(IntervalMonthDayNanoArray::from(vec![
3720                Some(IntervalMonthDayNano::new(0, 0, 0)),
3721                Some(IntervalMonthDayNano::new(0, 1, 1_000_000_000)),
3722                None,
3723                Some(IntervalMonthDayNano::new(0, 30, 3_600_000_000_000)),
3724            ])),
3725        );
3726    }
3727
3728    #[cfg(feature = "avro_custom_types")]
3729    #[test]
3730    fn test_roundtrip_interval_month_day_nano_custom_types() {
3731        assert_round_trip(Arc::new(IntervalMonthDayNanoArray::from(vec![
3732            Some(IntervalMonthDayNano::new(0, 0, 0)),
3733            Some(IntervalMonthDayNano::new(1, 2, 3)),
3734            None,
3735            Some(IntervalMonthDayNano::new(-4, -5, -6)),
3736        ])));
3737    }
3738
3739    #[cfg(not(feature = "avro_custom_types"))]
3740    #[test]
3741    fn test_roundtrip_interval_month_day_nano_no_custom() {
3742        // Only representable values for Avro duration: non-negative and whole milliseconds
3743        assert_round_trip_widened(
3744            Arc::new(IntervalMonthDayNanoArray::from(vec![
3745                Some(IntervalMonthDayNano::new(0, 0, 0)),
3746                Some(IntervalMonthDayNano::new(1, 2, 3_000_000)),
3747                None,
3748                Some(IntervalMonthDayNano::new(4, 5, 6_000_000)),
3749            ])),
3750            Arc::new(IntervalMonthDayNanoArray::from(vec![
3751                Some(IntervalMonthDayNano::new(0, 0, 0)),
3752                Some(IntervalMonthDayNano::new(1, 2, 3_000_000)),
3753                None,
3754                Some(IntervalMonthDayNano::new(4, 5, 6_000_000)),
3755            ])),
3756        );
3757    }
3758
3759    fn schemas_equal_ignoring_metadata(left: &Schema, right: &Schema) -> bool {
3760        if left.fields().len() != right.fields().len() {
3761            return false;
3762        }
3763        for (l, r) in left.fields().iter().zip(right.fields().iter()) {
3764            if l.name() != r.name()
3765                || l.data_type() != r.data_type()
3766                || l.is_nullable() != r.is_nullable()
3767            {
3768                return false;
3769            }
3770        }
3771        true
3772    }
3773
3774    fn avro_field_type<'a>(avro_schema: &'a Value, name: &str) -> &'a Value {
3775        let fields = avro_schema
3776            .get("fields")
3777            .and_then(|v| v.as_array())
3778            .expect("avro schema has 'fields' array");
3779        fields
3780            .iter()
3781            .find(|f| f.get("name").and_then(|n| n.as_str()) == Some(name))
3782            .unwrap_or_else(|| panic!("avro schema missing field '{name}'"))
3783            .get("type")
3784            .expect("field has 'type'")
3785    }
3786
3787    #[test]
3788    fn e2e_types_and_schema_alignment() -> Result<(), AvroError> {
3789        // Values are chosen to:
3790        // - exercise full UInt64 range when `avro_custom_types` is enabled
3791        // - exercise negative / sub-millisecond intervals when `avro_custom_types` is enabled
3792        // - remain representable under standard Avro logical types when `avro_custom_types` is disabled
3793        let i8_values: Vec<Option<i8>> = vec![Some(i8::MIN), Some(-1), Some(i8::MAX)];
3794        let i16_values: Vec<Option<i16>> = vec![Some(i16::MIN), Some(-1), Some(i16::MAX)];
3795        let u8_values: Vec<Option<u8>> = vec![Some(0), Some(1), Some(u8::MAX)];
3796        let u16_values: Vec<Option<u16>> = vec![Some(0), Some(1), Some(u16::MAX)];
3797        let u32_values: Vec<Option<u32>> = vec![Some(0), Some(1), Some(u32::MAX)];
3798        let u64_values: Vec<Option<u64>> = if cfg!(feature = "avro_custom_types") {
3799            vec![Some(0), Some(i64::MAX as u64), Some((i64::MAX as u64) + 1)]
3800        } else {
3801            // Must remain <= i64::MAX when `avro_custom_types` is disabled
3802            vec![Some(0), Some((i64::MAX as u64) - 1), Some(i64::MAX as u64)]
3803        };
3804        let f16_values: Vec<Option<f16>> = vec![
3805            Some(f16::from_f32(1.5)),
3806            Some(f16::from_f32(-2.0)),
3807            Some(f16::from_f32(0.0)),
3808        ];
3809        let date64_values: Vec<Option<i64>> = vec![Some(-86_400_000), Some(0), Some(86_400_000)];
3810        let time32s_values: Vec<Option<i32>> = vec![Some(0), Some(1), Some(86_399)];
3811        let time64ns_values: Vec<Option<i64>> = vec![
3812            Some(0),
3813            Some(1_234_567_890), // truncation case for no-custom (nanos -> micros)
3814            Some(86_399_000_000_123_i64), // near end-of-day, also truncation
3815        ];
3816        let ts_s_local_values: Vec<Option<i64>> = vec![Some(-1), Some(0), Some(1)];
3817        let ts_s_utc_values: Vec<Option<i64>> = vec![Some(1), Some(2), Some(3)];
3818        let iv_ym_values: Vec<Option<i32>> = if cfg!(feature = "avro_custom_types") {
3819            vec![Some(0), Some(-6), Some(25)]
3820        } else {
3821            // Avro duration cannot represent negative months without custom types
3822            vec![Some(0), Some(12), Some(25)]
3823        };
3824        let iv_dt_values: Vec<Option<IntervalDayTime>> = if cfg!(feature = "avro_custom_types") {
3825            vec![
3826                Some(IntervalDayTime::new(0, 0)),
3827                Some(IntervalDayTime::new(1, 1000)),
3828                Some(IntervalDayTime::new(-1, -1000)),
3829            ]
3830        } else {
3831            // Avro duration cannot represent negative day-time without custom types
3832            vec![
3833                Some(IntervalDayTime::new(0, 0)),
3834                Some(IntervalDayTime::new(1, 1000)),
3835                Some(IntervalDayTime::new(30, 3_600_000)),
3836            ]
3837        };
3838        let iv_mdn_values: Vec<Option<IntervalMonthDayNano>> =
3839            if cfg!(feature = "avro_custom_types") {
3840                vec![
3841                    Some(IntervalMonthDayNano::new(0, 0, 0)),
3842                    Some(IntervalMonthDayNano::new(1, 2, 3)), // sub-millisecond
3843                    Some(IntervalMonthDayNano::new(-1, -2, -3)), // negative
3844                ]
3845            } else {
3846                // Avro duration requires non-negative and whole milliseconds
3847                vec![
3848                    Some(IntervalMonthDayNano::new(0, 0, 0)),
3849                    Some(IntervalMonthDayNano::new(1, 2, 3_000_000)), // 3ms
3850                    Some(IntervalMonthDayNano::new(10, 20, 30_000_000_000)), // 30s
3851                ]
3852            };
3853        // Build a batch containing all impacted types from issue #9290
3854        let schema = Schema::new(vec![
3855            Field::new("i8", DataType::Int8, false),
3856            Field::new("i16", DataType::Int16, false),
3857            Field::new("u8", DataType::UInt8, false),
3858            Field::new("u16", DataType::UInt16, false),
3859            Field::new("u32", DataType::UInt32, false),
3860            Field::new("u64", DataType::UInt64, false),
3861            Field::new("f16", DataType::Float16, false),
3862            Field::new("date64", DataType::Date64, false),
3863            Field::new("time32s", DataType::Time32(TimeUnit::Second), false),
3864            Field::new("time64ns", DataType::Time64(TimeUnit::Nanosecond), false),
3865            Field::new(
3866                "ts_s_local",
3867                DataType::Timestamp(TimeUnit::Second, None),
3868                false,
3869            ),
3870            Field::new(
3871                "ts_s_utc",
3872                DataType::Timestamp(TimeUnit::Second, Some("+00:00".into())),
3873                false,
3874            ),
3875            Field::new("iv_ym", DataType::Interval(IntervalUnit::YearMonth), false),
3876            Field::new("iv_dt", DataType::Interval(IntervalUnit::DayTime), false),
3877            Field::new(
3878                "iv_mdn",
3879                DataType::Interval(IntervalUnit::MonthDayNano),
3880                false,
3881            ),
3882        ]);
3883        let batch = RecordBatch::try_new(
3884            Arc::new(schema.clone()),
3885            vec![
3886                Arc::new(Int8Array::from(i8_values.clone())) as ArrayRef,
3887                Arc::new(Int16Array::from(i16_values.clone())) as ArrayRef,
3888                Arc::new(UInt8Array::from(u8_values.clone())) as ArrayRef,
3889                Arc::new(UInt16Array::from(u16_values.clone())) as ArrayRef,
3890                Arc::new(UInt32Array::from(u32_values.clone())) as ArrayRef,
3891                Arc::new(UInt64Array::from(u64_values.clone())) as ArrayRef,
3892                Arc::new(Float16Array::from(f16_values.clone())) as ArrayRef,
3893                Arc::new(Date64Array::from(date64_values.clone())) as ArrayRef,
3894                Arc::new(Time32SecondArray::from(time32s_values.clone())) as ArrayRef,
3895                Arc::new(Time64NanosecondArray::from(time64ns_values.clone())) as ArrayRef,
3896                Arc::new(TimestampSecondArray::from(ts_s_local_values.clone())) as ArrayRef,
3897                Arc::new(
3898                    TimestampSecondArray::from(ts_s_utc_values.clone()).with_timezone("+00:00"),
3899                ) as ArrayRef,
3900                Arc::new(IntervalYearMonthArray::from(iv_ym_values.clone())) as ArrayRef,
3901                Arc::new(IntervalDayTimeArray::from(iv_dt_values.clone())) as ArrayRef,
3902                Arc::new(IntervalMonthDayNanoArray::from(iv_mdn_values.clone())) as ArrayRef,
3903            ],
3904        )?;
3905        let rt = roundtrip_ocf(&batch)?;
3906        let rt_schema = rt.schema();
3907        let avro_schema_json = rt_schema
3908            .metadata()
3909            .get(SCHEMA_METADATA_KEY)
3910            .expect("avro.schema missing in round-tripped batch metadata");
3911        let avro_schema: Value =
3912            serde_json::from_str(avro_schema_json).expect("valid avro schema json");
3913        let rt_arrow_schema = rt.schema();
3914        if cfg!(feature = "avro_custom_types") {
3915            assert!(
3916                schemas_equal_ignoring_metadata(rt_arrow_schema.as_ref(), &schema),
3917                "Schema fields mismatch.\nExpected: {:?}\nGot: {:?}",
3918                schema,
3919                rt_arrow_schema
3920            );
3921            for field_name in ["u64", "f16", "iv_ym", "iv_dt", "iv_mdn"] {
3922                let field = rt_arrow_schema
3923                    .field_with_name(field_name)
3924                    .expect("field exists");
3925                assert!(
3926                    field.metadata().get(AVRO_NAME_METADATA_KEY).is_some(),
3927                    "Field '{}' should have avro.name metadata",
3928                    field_name
3929                );
3930            }
3931        } else {
3932            // Without avro_custom_types, Avro's type system is narrower than Arrow's.
3933            // Each field below shows the expected type AFTER round-tripping through Avro,
3934            // which differs from the original `schema` above:
3935            let exp_schema = Schema::new(vec![
3936                Field::new("i8", DataType::Int32, false),
3937                Field::new("i16", DataType::Int32, false),
3938                Field::new("u8", DataType::Int32, false),
3939                Field::new("u16", DataType::Int32, false),
3940                Field::new("u32", DataType::Int64, false),
3941                Field::new("u64", DataType::Int64, false),
3942                Field::new("f16", DataType::Float32, false),
3943                Field::new(
3944                    "date64",
3945                    DataType::Timestamp(TimeUnit::Millisecond, None),
3946                    false,
3947                ),
3948                Field::new("time32s", DataType::Time32(TimeUnit::Millisecond), false),
3949                Field::new("time64ns", DataType::Time64(TimeUnit::Microsecond), false),
3950                Field::new(
3951                    "ts_s_local",
3952                    DataType::Timestamp(TimeUnit::Millisecond, None),
3953                    false,
3954                ),
3955                Field::new(
3956                    "ts_s_utc",
3957                    DataType::Timestamp(TimeUnit::Millisecond, Some("+00:00".into())),
3958                    false,
3959                ),
3960                Field::new(
3961                    "iv_ym",
3962                    DataType::Interval(IntervalUnit::MonthDayNano),
3963                    false,
3964                ),
3965                Field::new(
3966                    "iv_dt",
3967                    DataType::Interval(IntervalUnit::MonthDayNano),
3968                    false,
3969                ),
3970                Field::new(
3971                    "iv_mdn",
3972                    DataType::Interval(IntervalUnit::MonthDayNano),
3973                    false,
3974                ),
3975            ]);
3976            assert!(
3977                schemas_equal_ignoring_metadata(rt_arrow_schema.as_ref(), &exp_schema),
3978                "Schema fields mismatch.\nExpected: {:?}\nGot: {:?}",
3979                exp_schema,
3980                rt_arrow_schema
3981            );
3982            for field_name in ["iv_ym", "iv_dt", "iv_mdn"] {
3983                let field = rt_arrow_schema
3984                    .field_with_name(field_name)
3985                    .expect("field exists");
3986                assert!(
3987                    field.metadata().get(AVRO_NAME_METADATA_KEY).is_some(),
3988                    "Field '{}' should have avro.name metadata",
3989                    field_name
3990                );
3991            }
3992        }
3993        if cfg!(feature = "avro_custom_types") {
3994            assert_eq!(
3995                avro_field_type(&avro_schema, "i8"),
3996                &json!({"type":"int","logicalType":"arrow.int8"})
3997            );
3998            assert_eq!(
3999                avro_field_type(&avro_schema, "i16"),
4000                &json!({"type":"int","logicalType":"arrow.int16"})
4001            );
4002            assert_eq!(
4003                avro_field_type(&avro_schema, "u8"),
4004                &json!({"type":"int","logicalType":"arrow.uint8"})
4005            );
4006            assert_eq!(
4007                avro_field_type(&avro_schema, "u16"),
4008                &json!({"type":"int","logicalType":"arrow.uint16"})
4009            );
4010            assert_eq!(
4011                avro_field_type(&avro_schema, "u32"),
4012                &json!({"type":"long","logicalType":"arrow.uint32"})
4013            );
4014            assert_eq!(
4015                avro_field_type(&avro_schema, "u64"),
4016                &json!({"type":"fixed","name":"u64","size":8,"logicalType":"arrow.uint64"})
4017            );
4018            assert_eq!(
4019                avro_field_type(&avro_schema, "f16"),
4020                &json!({"type":"fixed","name":"f16","size":2,"logicalType":"arrow.float16"})
4021            );
4022            assert_eq!(
4023                avro_field_type(&avro_schema, "date64"),
4024                &json!({"type":"long","logicalType":"arrow.date64"})
4025            );
4026            assert_eq!(
4027                avro_field_type(&avro_schema, "time32s"),
4028                &json!({"type":"int","logicalType":"arrow.time32-second"})
4029            );
4030            assert_eq!(
4031                avro_field_type(&avro_schema, "time64ns"),
4032                &json!({"type":"long","logicalType":"arrow.time64-nanosecond"})
4033            );
4034            assert_eq!(
4035                avro_field_type(&avro_schema, "ts_s_local"),
4036                &json!({"type":"long","logicalType":"arrow.local-timestamp-second"})
4037            );
4038            assert_eq!(
4039                avro_field_type(&avro_schema, "ts_s_utc"),
4040                &json!({"type":"long","logicalType":"arrow.timestamp-second"})
4041            );
4042            assert_eq!(
4043                avro_field_type(&avro_schema, "iv_ym"),
4044                &json!({"type":"fixed","name":"iv_ym","size":4,"logicalType":"arrow.interval-year-month"})
4045            );
4046            assert_eq!(
4047                avro_field_type(&avro_schema, "iv_dt"),
4048                &json!({"type":"fixed","name":"iv_dt","size":8,"logicalType":"arrow.interval-day-time"})
4049            );
4050            assert_eq!(
4051                avro_field_type(&avro_schema, "iv_mdn"),
4052                &json!({"type":"fixed","name":"iv_mdn","size":16,"logicalType":"arrow.interval-month-day-nano"})
4053            );
4054        } else {
4055            // Without custom types:
4056            // - small ints widen to int
4057            // - UInt32/UInt64 widen to long
4058            // - Float16 widens to float
4059            // - Date64 coerces to local-timestamp-millis
4060            // - Time32(Second) coerces to time-millis and scales seconds->millis
4061            // - Time64(Nanosecond) coerces to time-micros and truncates nanos->micros
4062            // - Timestamp(Second) coerces to timestamp-millis / local-timestamp-millis and scales seconds->millis
4063            // - Intervals YearMonth/DayTime encode as Avro duration (fixed 12) with arrowIntervalUnit annotation
4064            assert_eq!(avro_field_type(&avro_schema, "i8"), &json!("int"));
4065            assert_eq!(avro_field_type(&avro_schema, "i16"), &json!("int"));
4066            assert_eq!(avro_field_type(&avro_schema, "u8"), &json!("int"));
4067            assert_eq!(avro_field_type(&avro_schema, "u16"), &json!("int"));
4068            assert_eq!(avro_field_type(&avro_schema, "u32"), &json!("long"));
4069            assert_eq!(avro_field_type(&avro_schema, "u64"), &json!("long"));
4070            assert_eq!(avro_field_type(&avro_schema, "f16"), &json!("float"));
4071            assert_eq!(
4072                avro_field_type(&avro_schema, "date64"),
4073                &json!({"type":"long","logicalType":"local-timestamp-millis"})
4074            );
4075            assert_eq!(
4076                avro_field_type(&avro_schema, "time32s"),
4077                &json!({"type":"int","logicalType":"time-millis"})
4078            );
4079            assert_eq!(
4080                avro_field_type(&avro_schema, "time64ns"),
4081                &json!({"type":"long","logicalType":"time-micros"})
4082            );
4083            assert_eq!(
4084                avro_field_type(&avro_schema, "ts_s_local"),
4085                &json!({"type":"long","logicalType":"local-timestamp-millis"})
4086            );
4087            assert_eq!(
4088                avro_field_type(&avro_schema, "ts_s_utc"),
4089                &json!({"type":"long","logicalType":"timestamp-millis"})
4090            );
4091            assert_eq!(
4092                avro_field_type(&avro_schema, "iv_ym"),
4093                &json!({"type":"fixed","name":"iv_ym","size":12,"logicalType":"duration"})
4094            );
4095            assert_eq!(
4096                avro_field_type(&avro_schema, "iv_dt"),
4097                &json!({"type":"fixed","name":"iv_dt","size":12,"logicalType":"duration"})
4098            );
4099            assert_eq!(
4100                avro_field_type(&avro_schema, "iv_mdn"),
4101                &json!({"type":"fixed","name":"iv_mdn","size":12,"logicalType":"duration"})
4102            );
4103        }
4104        if cfg!(feature = "avro_custom_types") {
4105            assert_eq!(
4106                rt.column(0).as_ref(),
4107                &Int8Array::from(i8_values) as &dyn Array
4108            );
4109            assert_eq!(
4110                rt.column(1).as_ref(),
4111                &Int16Array::from(i16_values) as &dyn Array
4112            );
4113            assert_eq!(
4114                rt.column(2).as_ref(),
4115                &UInt8Array::from(u8_values) as &dyn Array
4116            );
4117            assert_eq!(
4118                rt.column(3).as_ref(),
4119                &UInt16Array::from(u16_values) as &dyn Array
4120            );
4121            assert_eq!(
4122                rt.column(4).as_ref(),
4123                &UInt32Array::from(u32_values) as &dyn Array
4124            );
4125            assert_eq!(
4126                rt.column(5).as_ref(),
4127                &UInt64Array::from(u64_values) as &dyn Array
4128            );
4129            assert_eq!(
4130                rt.column(6).as_ref(),
4131                &Float16Array::from(f16_values) as &dyn Array
4132            );
4133            assert_eq!(
4134                rt.column(7).as_ref(),
4135                &Date64Array::from(date64_values) as &dyn Array
4136            );
4137            assert_eq!(
4138                rt.column(8).as_ref(),
4139                &Time32SecondArray::from(time32s_values) as &dyn Array
4140            );
4141            assert_eq!(
4142                rt.column(9).as_ref(),
4143                &Time64NanosecondArray::from(time64ns_values) as &dyn Array
4144            );
4145            assert_eq!(
4146                rt.column(10).as_ref(),
4147                &TimestampSecondArray::from(ts_s_local_values) as &dyn Array
4148            );
4149            assert_eq!(
4150                rt.column(11).as_ref(),
4151                &TimestampSecondArray::from(ts_s_utc_values).with_timezone("+00:00") as &dyn Array
4152            );
4153            assert_eq!(
4154                rt.column(12).as_ref(),
4155                &IntervalYearMonthArray::from(iv_ym_values) as &dyn Array
4156            );
4157            assert_eq!(
4158                rt.column(13).as_ref(),
4159                &IntervalDayTimeArray::from(iv_dt_values) as &dyn Array
4160            );
4161            assert_eq!(
4162                rt.column(14).as_ref(),
4163                &IntervalMonthDayNanoArray::from(iv_mdn_values) as &dyn Array
4164            );
4165        } else {
4166            let exp_i8: Vec<Option<i32>> = i8_values.iter().map(|v| v.map(|x| x as i32)).collect();
4167            let exp_i16: Vec<Option<i32>> =
4168                i16_values.iter().map(|v| v.map(|x| x as i32)).collect();
4169            let exp_u8: Vec<Option<i32>> = u8_values.iter().map(|v| v.map(|x| x as i32)).collect();
4170            let exp_u16: Vec<Option<i32>> =
4171                u16_values.iter().map(|v| v.map(|x| x as i32)).collect();
4172            let exp_u32: Vec<Option<i64>> =
4173                u32_values.iter().map(|v| v.map(|x| x as i64)).collect();
4174            let exp_u64: Vec<Option<i64>> =
4175                u64_values.iter().map(|v| v.map(|x| x as i64)).collect();
4176            let exp_f16: Vec<Option<f32>> =
4177                f16_values.iter().map(|v| v.map(|x| x.to_f32())).collect();
4178            let exp_time32_ms: Vec<Option<i32>> = time32s_values
4179                .iter()
4180                .map(|v| v.map(|x| x.saturating_mul(1000)))
4181                .collect();
4182            let exp_time64_us: Vec<Option<i64>> = time64ns_values
4183                .iter()
4184                .map(|v| v.map(|x| x / 1000))
4185                .collect();
4186            let exp_ts_local_ms: Vec<Option<i64>> = ts_s_local_values
4187                .iter()
4188                .map(|v| v.map(|x| x * 1000))
4189                .collect();
4190            let exp_ts_utc_ms: Vec<Option<i64>> = ts_s_utc_values
4191                .iter()
4192                .map(|v| v.map(|x| x * 1000))
4193                .collect();
4194            // Interval conversions to MonthDayNano via Avro duration
4195            let exp_iv_ym: Vec<Option<IntervalMonthDayNano>> = iv_ym_values
4196                .iter()
4197                .map(|v| v.map(|months| IntervalMonthDayNano::new(months, 0, 0)))
4198                .collect();
4199            let exp_iv_dt: Vec<Option<IntervalMonthDayNano>> = iv_dt_values
4200                .iter()
4201                .map(|v| {
4202                    v.map(|dt| {
4203                        IntervalMonthDayNano::new(0, dt.days, (dt.milliseconds as i64) * 1_000_000)
4204                    })
4205                })
4206                .collect();
4207            assert_eq!(
4208                rt.column(0).as_ref(),
4209                &Int32Array::from(exp_i8) as &dyn Array
4210            );
4211            assert_eq!(
4212                rt.column(1).as_ref(),
4213                &Int32Array::from(exp_i16) as &dyn Array
4214            );
4215            assert_eq!(
4216                rt.column(2).as_ref(),
4217                &Int32Array::from(exp_u8) as &dyn Array
4218            );
4219            assert_eq!(
4220                rt.column(3).as_ref(),
4221                &Int32Array::from(exp_u16) as &dyn Array
4222            );
4223            assert_eq!(
4224                rt.column(4).as_ref(),
4225                &arrow_array::Int64Array::from(exp_u32) as &dyn Array
4226            );
4227            assert_eq!(
4228                rt.column(5).as_ref(),
4229                &arrow_array::Int64Array::from(exp_u64) as &dyn Array
4230            );
4231            assert_eq!(
4232                rt.column(6).as_ref(),
4233                &arrow_array::Float32Array::from(exp_f16) as &dyn Array
4234            );
4235            assert_eq!(
4236                rt.column(7).as_ref(),
4237                &TimestampMillisecondArray::from(date64_values) as &dyn Array
4238            );
4239            assert_eq!(
4240                rt.column(8).as_ref(),
4241                &Time32MillisecondArray::from(exp_time32_ms) as &dyn Array
4242            );
4243            assert_eq!(
4244                rt.column(9).as_ref(),
4245                &Time64MicrosecondArray::from(exp_time64_us) as &dyn Array
4246            );
4247            assert_eq!(
4248                rt.column(10).as_ref(),
4249                &TimestampMillisecondArray::from(exp_ts_local_ms) as &dyn Array
4250            );
4251            assert_eq!(
4252                rt.column(11).as_ref(),
4253                &TimestampMillisecondArray::from(exp_ts_utc_ms).with_timezone("+00:00")
4254                    as &dyn Array
4255            );
4256            assert_eq!(
4257                rt.column(12).as_ref(),
4258                &IntervalMonthDayNanoArray::from(exp_iv_ym) as &dyn Array
4259            );
4260            assert_eq!(
4261                rt.column(13).as_ref(),
4262                &IntervalMonthDayNanoArray::from(exp_iv_dt) as &dyn Array
4263            );
4264            assert_eq!(
4265                rt.column(14).as_ref(),
4266                &IntervalMonthDayNanoArray::from(iv_mdn_values) as &dyn Array
4267            );
4268        }
4269        Ok(())
4270    }
4271
4272    #[cfg(not(feature = "avro_custom_types"))]
4273    #[test]
4274    fn non_custom_uint64_overflow_errors() -> Result<(), AvroError> {
4275        let schema = Schema::new(vec![Field::new("u64", DataType::UInt64, false)]);
4276        let values: Vec<Option<u64>> = vec![Some((i64::MAX as u64) + 1)];
4277        let batch = RecordBatch::try_new(
4278            Arc::new(schema.clone()),
4279            vec![Arc::new(UInt64Array::from(values)) as ArrayRef],
4280        )?;
4281        let mut w = AvroWriter::new(Vec::<u8>::new(), schema)?;
4282        let err = w
4283            .write(&batch)
4284            .expect_err("expected UInt64 overflow error when avro_custom_types is disabled");
4285        match err {
4286            AvroError::InvalidArgument(msg) => {
4287                assert_eq!(
4288                    msg,
4289                    "UInt64 value 9223372036854775808 exceeds i64::MAX; enable avro_custom_types feature for full UInt64 support"
4290                );
4291            }
4292            other => panic!("expected InvalidArgument, got {other:?}"),
4293        }
4294        Ok(())
4295    }
4296
4297    #[cfg(not(feature = "avro_custom_types"))]
4298    #[test]
4299    fn non_custom_interval_year_month_negative_errors() -> Result<(), AvroError> {
4300        let schema = Schema::new(vec![Field::new(
4301            "iv_ym",
4302            DataType::Interval(IntervalUnit::YearMonth),
4303            false,
4304        )]);
4305        let values: Vec<Option<i32>> = vec![Some(-1)];
4306        let batch = RecordBatch::try_new(
4307            Arc::new(schema.clone()),
4308            vec![Arc::new(IntervalYearMonthArray::from(values)) as ArrayRef],
4309        )?;
4310
4311        let mut w = AvroWriter::new(Vec::<u8>::new(), schema)?;
4312        let err = w
4313            .write(&batch)
4314            .expect_err("expected negative Interval(YearMonth) error");
4315        match err {
4316            AvroError::InvalidArgument(msg) => {
4317                assert_eq!(
4318                    msg,
4319                    "Avro 'duration' cannot encode negative months; enable `avro_custom_types` to round-trip signed Arrow Interval(YearMonth)"
4320                );
4321            }
4322            other => panic!("expected InvalidArgument, got {other:?}"),
4323        }
4324        Ok(())
4325    }
4326
4327    #[cfg(not(feature = "avro_custom_types"))]
4328    #[test]
4329    fn non_custom_interval_day_time_negative_errors() -> Result<(), AvroError> {
4330        let schema = Schema::new(vec![Field::new(
4331            "iv_dt",
4332            DataType::Interval(IntervalUnit::DayTime),
4333            false,
4334        )]);
4335        let values: Vec<Option<IntervalDayTime>> = vec![Some(IntervalDayTime::new(-1, 0))];
4336        let batch = RecordBatch::try_new(
4337            Arc::new(schema.clone()),
4338            vec![Arc::new(IntervalDayTimeArray::from(values)) as ArrayRef],
4339        )?;
4340        let mut w = AvroWriter::new(Vec::<u8>::new(), schema)?;
4341        let err = w
4342            .write(&batch)
4343            .expect_err("expected negative Interval(DayTime) error");
4344        match err {
4345            AvroError::InvalidArgument(msg) => {
4346                assert_eq!(
4347                    msg,
4348                    "Avro 'duration' cannot encode negative days or milliseconds; enable `avro_custom_types` to round-trip signed Arrow Interval(DayTime)"
4349                );
4350            }
4351            other => panic!("expected InvalidArgument, got {other:?}"),
4352        }
4353        Ok(())
4354    }
4355
4356    #[cfg(not(feature = "avro_custom_types"))]
4357    #[test]
4358    fn non_custom_interval_month_day_nano_negative_errors() -> Result<(), AvroError> {
4359        let schema = Schema::new(vec![Field::new(
4360            "iv_mdn",
4361            DataType::Interval(IntervalUnit::MonthDayNano),
4362            false,
4363        )]);
4364        let values: Vec<Option<IntervalMonthDayNano>> =
4365            vec![Some(IntervalMonthDayNano::new(-1, 0, 0))];
4366        let batch = RecordBatch::try_new(
4367            Arc::new(schema.clone()),
4368            vec![Arc::new(IntervalMonthDayNanoArray::from(values)) as ArrayRef],
4369        )?;
4370        let mut w = AvroWriter::new(Vec::<u8>::new(), schema)?;
4371        let err = w
4372            .write(&batch)
4373            .expect_err("expected negative Interval(MonthDayNano) error");
4374        match err {
4375            AvroError::InvalidArgument(msg) => {
4376                assert_eq!(
4377                    msg,
4378                    "Avro 'duration' cannot encode negative months/days/nanoseconds; enable `avro_custom_types` to round-trip signed Arrow intervals"
4379                );
4380            }
4381            other => panic!("expected InvalidArgument, got {other:?}"),
4382        }
4383        Ok(())
4384    }
4385
4386    #[cfg(not(feature = "avro_custom_types"))]
4387    #[test]
4388    fn non_custom_interval_month_day_nano_sub_millis_errors() -> Result<(), AvroError> {
4389        let schema = Schema::new(vec![Field::new(
4390            "iv_mdn",
4391            DataType::Interval(IntervalUnit::MonthDayNano),
4392            false,
4393        )]);
4394        let values: Vec<Option<IntervalMonthDayNano>> =
4395            vec![Some(IntervalMonthDayNano::new(0, 0, 1))];
4396        let batch = RecordBatch::try_new(
4397            Arc::new(schema.clone()),
4398            vec![Arc::new(IntervalMonthDayNanoArray::from(values)) as ArrayRef],
4399        )?;
4400        let mut w = AvroWriter::new(Vec::<u8>::new(), schema)?;
4401        let err = w
4402            .write(&batch)
4403            .expect_err("expected sub-millisecond Interval(MonthDayNano) error");
4404        match err {
4405            AvroError::InvalidArgument(msg) => {
4406                assert_eq!(
4407                    msg,
4408                    "Avro 'duration' requires whole milliseconds; nanoseconds must be divisible by 1_000_000 (enable `avro_custom_types` to preserve nanosecond intervals)"
4409                );
4410            }
4411            other => panic!("expected InvalidArgument, got {other:?}"),
4412        }
4413        Ok(())
4414    }
4415
4416    #[cfg(not(feature = "avro_custom_types"))]
4417    #[test]
4418    fn non_custom_time32_second_scaling_overflow_errors() -> Result<(), AvroError> {
4419        let schema = Schema::new(vec![Field::new(
4420            "time32s",
4421            DataType::Time32(TimeUnit::Second),
4422            false,
4423        )]);
4424        let values: Vec<Option<i32>> = vec![Some((i32::MAX / 1000) + 1)];
4425        let batch = RecordBatch::try_new(
4426            Arc::new(schema.clone()),
4427            vec![Arc::new(Time32SecondArray::from(values)) as ArrayRef],
4428        )?;
4429        let mut w = AvroWriter::new(Vec::<u8>::new(), schema)?;
4430        let err = w
4431            .write(&batch)
4432            .expect_err("expected time32 seconds->millis overflow error");
4433        match err {
4434            AvroError::InvalidArgument(msg) => {
4435                assert_eq!(msg, "time32(secs) * 1000 overflowed");
4436            }
4437            other => panic!("expected InvalidArgument, got {other:?}"),
4438        }
4439        Ok(())
4440    }
4441
4442    #[cfg(not(feature = "avro_custom_types"))]
4443    #[test]
4444    fn non_custom_timestamp_second_scaling_overflow_errors() -> Result<(), AvroError> {
4445        let schema = Schema::new(vec![Field::new(
4446            "ts_s_local",
4447            DataType::Timestamp(TimeUnit::Second, None),
4448            false,
4449        )]);
4450        // i64::MAX / 1000 + 1 will overflow when multiplied by 1000
4451        let values: Vec<Option<i64>> = vec![Some((i64::MAX / 1000) + 1)];
4452        let batch = RecordBatch::try_new(
4453            Arc::new(schema.clone()),
4454            vec![Arc::new(TimestampSecondArray::from(values)) as ArrayRef],
4455        )?;
4456        let mut w = AvroWriter::new(Vec::<u8>::new(), schema)?;
4457        let err = w
4458            .write(&batch)
4459            .expect_err("expected timestamp seconds->millis overflow error");
4460        match err {
4461            AvroError::InvalidArgument(msg) => {
4462                assert_eq!(msg, "timestamp(secs) * 1000 overflowed");
4463            }
4464            other => panic!("expected InvalidArgument, got {other:?}"),
4465        }
4466        Ok(())
4467    }
4468}