Skip to main content

arrow_avro/writer/
mod.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Avro writer implementation for the `arrow-avro` crate.
19//!
20//! # Overview
21//!
22//! Use this module to serialize Arrow [`arrow_array::RecordBatch`] values into Avro. Three output
23//! modes are supported:
24//!
25//! * **[`crate::writer::AvroWriter`]** — writes an **Object Container File (OCF)**: a self‑describing
26//!   file with header (schema JSON and metadata), optional compression, data blocks, and
27//!   sync markers. See Avro 1.11.1 "Object Container Files."
28//!   <https://avro.apache.org/docs/1.11.1/specification/#object-container-files>
29//!
30//! * **[`crate::writer::AvroStreamWriter`]** — writes a **Single Object Encoding (SOE) Stream** without
31//!   any container framing. This is useful when the schema is known out‑of‑band (i.e.,
32//!   via a registry) and you want minimal overhead.
33//!
34//! * **[`crate::writer::Encoder`]** — a row-by-row encoder that buffers encoded records into a single
35//!   contiguous byte buffer and returns per-row [`bytes::Bytes`] slices.
36//!   Ideal for publishing individual messages to Kafka, Pulsar, or other message queues
37//!   where each message must be a self-contained Avro payload.
38//!
39//! ## Which writer should you use?
40//!
41//! | Use Case | Recommended Type |
42//! |----------|------------------|
43//! | Write an OCF file to disk | [`crate::writer::AvroWriter`] |
44//! | Stream records continuously to a file/socket | [`crate::writer::AvroStreamWriter`] |
45//! | Publish individual records to Kafka/Pulsar | [`crate::writer::Encoder`] |
46//! | Need per-row byte slices for custom framing | [`crate::writer::Encoder`] |
47//!
48//! ## Per-Record Prefix Formats
49//!
50//! For [`crate::writer::AvroStreamWriter`] and [`crate::writer::Encoder`], each record is automatically prefixed
51//! based on the fingerprint strategy:
52//!
53//! | Strategy | Prefix | Use Case |
54//! |----------|--------|----------|
55//! | `FingerprintStrategy::Rabin` (default) | `0xC3 0x01` + 8-byte LE Rabin fingerprint | Standard Avro SOE |
56//! | `FingerprintStrategy::Id(id)` | `0x00` + 4-byte BE schema ID | [Confluent Schema Registry] |
57//! | `FingerprintStrategy::Id64(id)` | `0x00` + 8-byte BE schema ID | [Apicurio Registry] |
58//!
59//! [Confluent Schema Registry]: https://docs.confluent.io/platform/current/schema-registry/fundamentals/serdes-develop/index.html#wire-format
60//! [Apicurio Registry]: https://www.apicur.io/registry/docs/apicurio-registry/1.3.3.Final/getting-started/assembly-using-kafka-client-serdes.html#registry-serdes-types-avro-registry
61//!
62//! ## Choosing the Avro Schema
63//!
64//! By default, the writer converts your Arrow schema to Avro (including a top‑level record
65//! name). If you already have an Avro schema JSON you want to use verbatim, put it into the
66//! Arrow schema metadata under the [`SCHEMA_METADATA_KEY`](crate::schema::SCHEMA_METADATA_KEY)
67//! key before constructing the writer. The builder will use that schema instead of generating
68//! a new one.
69//!
70//! ## Compression
71//!
72//! For OCF ([`crate::writer::AvroWriter`]), you may enable a compression codec via
73//! [`crate::writer::WriterBuilder::with_compression`]. The chosen codec is written into the file header
74//! and used for subsequent blocks. SOE stream writing ([`crate::writer::AvroStreamWriter`], [`crate::writer::Encoder`])
75//! does not apply container‑level compression.
76//!
77//! # Examples
78//!
79//! ## Writing an OCF File
80//!
81//! ```
82//! use std::sync::Arc;
83//! use arrow_array::{ArrayRef, Int64Array, StringArray, RecordBatch};
84//! use arrow_schema::{DataType, Field, Schema};
85//! use arrow_avro::writer::AvroWriter;
86//!
87//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
88//! let schema = Schema::new(vec![
89//!     Field::new("id", DataType::Int64, false),
90//!     Field::new("name", DataType::Utf8, false),
91//! ]);
92//!
93//! let batch = RecordBatch::try_new(
94//!     Arc::new(schema.clone()),
95//!     vec![
96//!         Arc::new(Int64Array::from(vec![1, 2])) as ArrayRef,
97//!         Arc::new(StringArray::from(vec!["alice", "bob"])) as ArrayRef,
98//!     ],
99//! )?;
100//!
101//! let mut writer = AvroWriter::new(Vec::<u8>::new(), schema)?;
102//! writer.write(&batch)?;
103//! writer.finish()?;
104//! let bytes = writer.into_inner();
105//! assert!(!bytes.is_empty());
106//! # Ok(())
107//! # }
108//! ```
109//!
110//! ## Using the Row-by-Row Encoder for Message Queues
111//!
112//! ```
113//! use std::sync::Arc;
114//! use arrow_array::{ArrayRef, Int32Array, RecordBatch};
115//! use arrow_schema::{DataType, Field, Schema};
116//! use arrow_avro::writer::{WriterBuilder, format::AvroSoeFormat};
117//! use arrow_avro::schema::FingerprintStrategy;
118//!
119//! # fn main() -> Result<(), Box<dyn std::error::Error>> {
120//! let schema = Schema::new(vec![Field::new("x", DataType::Int32, false)]);
121//! let batch = RecordBatch::try_new(
122//!     Arc::new(schema.clone()),
123//!     vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef],
124//! )?;
125//!
126//! // Build an Encoder with Confluent wire format (schema ID = 42)
127//! let mut encoder = WriterBuilder::new(schema)
128//!     .with_fingerprint_strategy(FingerprintStrategy::Id(42))
129//!     .build_encoder::<AvroSoeFormat>()?;
130//!
131//! encoder.encode(&batch)?;
132//!
133//! // Get the buffered rows (zero-copy views into a single backing buffer)
134//! let rows = encoder.flush();
135//! assert_eq!(rows.len(), 3);
136//!
137//! // Each row has Confluent wire format: magic byte + 4-byte schema ID + body
138//! for row in rows.iter() {
139//!     assert_eq!(row[0], 0x00); // Confluent magic byte
140//! }
141//! # Ok(())
142//! # }
143//! ```
144//!
145//! ---
146use crate::codec::AvroFieldBuilder;
147use crate::compression::CompressionCodec;
148use crate::errors::AvroError;
149use crate::schema::{
150    AvroSchema, Fingerprint, FingerprintAlgorithm, FingerprintStrategy, SCHEMA_METADATA_KEY,
151};
152use crate::writer::encoder::{RecordEncoder, RecordEncoderBuilder, write_long};
153use crate::writer::format::{AvroFormat, AvroOcfFormat, AvroSoeFormat};
154use arrow_array::RecordBatch;
155use arrow_schema::{Schema, SchemaRef};
156use bytes::{Bytes, BytesMut};
157use std::io::Write;
158use std::sync::Arc;
159
160/// Encodes `RecordBatch` into the Avro binary format.
161mod encoder;
162/// Logic for different Avro container file formats.
163pub mod format;
164
165/// A contiguous set of Avro encoded rows.
166///
167/// `EncodedRows` stores:
168/// - a single backing byte buffer (`bytes::Bytes`)
169/// - a `Vec<usize>` of row boundary offsets (length = `rows + 1`)
170///
171/// This lets callers get per-row payloads as zero-copy `Bytes` slices.
172///
173/// For compatibility with APIs that require owned `Vec<u8>`, use:
174/// `let vecs: Vec<Vec<u8>> = rows.iter().map(|b| b.to_vec()).collect();`
175#[derive(Debug, Clone)]
176pub struct EncodedRows {
177    data: Bytes,
178    offsets: Vec<usize>,
179}
180
181impl EncodedRows {
182    /// Create a new `EncodedRows` from a backing buffer and row boundary offsets.
183    ///
184    /// `offsets` must have length `rows + 1`, and be monotonically non-decreasing.
185    /// The last offset should equal `data.len()`.
186    pub fn new(data: Bytes, offsets: Vec<usize>) -> Self {
187        Self { data, offsets }
188    }
189
190    /// Returns the number of encoded rows stored in this container.
191    #[inline]
192    pub fn len(&self) -> usize {
193        self.offsets.len().saturating_sub(1)
194    }
195
196    /// Returns `true` if this container holds no encoded rows.
197    #[inline]
198    pub fn is_empty(&self) -> bool {
199        self.len() == 0
200    }
201
202    /// Returns a reference to the single contiguous backing buffer.
203    ///
204    /// This buffer contains the payloads of all rows concatenated together.
205    ///
206    /// # Note
207    ///
208    /// To access individual row payloads, prefer using [`Self::row`] or [`Self::iter`]
209    /// rather than slicing this buffer manually.
210    #[inline]
211    pub fn bytes(&self) -> &Bytes {
212        &self.data
213    }
214
215    /// Returns the row boundary offsets.
216    ///
217    /// The returned slice always has the length `self.len() + 1`. The `n`th row payload
218    /// corresponds to `bytes[offsets[n] ... offsets[n+1]]`.
219    #[inline]
220    pub fn offsets(&self) -> &[usize] {
221        &self.offsets
222    }
223
224    /// Return the `n`th row as a zero-copy `Bytes` slice.
225    ///
226    /// # Errors
227    ///
228    /// Returns an error if `n` is out of bounds or if the internal offsets are invalid
229    /// (e.g., offsets are not within the backing buffer).
230    ///
231    /// # Examples
232    ///
233    /// ```
234    /// use std::sync::Arc;
235    /// use arrow_array::{ArrayRef, Int32Array, RecordBatch};
236    /// use arrow_schema::{DataType, Field, Schema};
237    /// use arrow_avro::writer::WriterBuilder;
238    /// use arrow_avro::writer::format::AvroSoeFormat;
239    ///
240    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
241    /// let schema = Schema::new(vec![Field::new("x", DataType::Int32, false)]);
242    /// let batch = RecordBatch::try_new(
243    ///     Arc::new(schema.clone()),
244    ///     vec![Arc::new(Int32Array::from(vec![1, 2])) as ArrayRef],
245    /// )?;
246    ///
247    /// let mut encoder = WriterBuilder::new(schema).build_encoder::<AvroSoeFormat>()?;
248    /// encoder.encode(&batch)?;
249    /// let rows = encoder.flush();
250    ///
251    /// assert_eq!(rows.iter().count(), 2);
252    /// # Ok(())
253    /// # }
254    /// ```
255    pub fn row(&self, n: usize) -> Result<Bytes, AvroError> {
256        if n >= self.len() {
257            return Err(AvroError::General(format!(
258                "Row index {n} out of bounds for len {}",
259                self.len()
260            )));
261        }
262        // SAFETY:
263        // self.len() is defined as self.offsets.len().saturating_sub(1).
264        // The check `n >= self.len()` above ensures that `n < self.offsets.len() - 1`.
265        // Therefore, both `n` and `n + 1` are strictly within the bounds of `self.offsets`.
266        let (start, end) = unsafe {
267            (
268                *self.offsets.get_unchecked(n),
269                *self.offsets.get_unchecked(n + 1),
270            )
271        };
272        if start > end || end > self.data.len() {
273            return Err(AvroError::General(format!(
274                "Invalid row offsets for row {n}: start={start}, end={end}, data_len={}",
275                self.data.len()
276            )));
277        }
278        Ok(self.data.slice(start..end))
279    }
280
281    /// Iterate over rows as zero-copy `Bytes` slices.
282    ///
283    /// This iterator is infallible and is intended for the common case where
284    /// `EncodedRows` is produced by [`Encoder::flush`], which guarantees valid offsets.
285    ///
286    /// # Examples
287    ///
288    /// ```
289    /// use std::sync::Arc;
290    /// use arrow_array::{ArrayRef, Int32Array, RecordBatch};
291    /// use arrow_schema::{DataType, Field, Schema};
292    /// use arrow_avro::writer::WriterBuilder;
293    /// use arrow_avro::writer::format::AvroSoeFormat;
294    ///
295    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
296    /// let schema = Schema::new(vec![Field::new("x", DataType::Int32, false)]);
297    /// let batch = RecordBatch::try_new(
298    ///     Arc::new(schema.clone()),
299    ///     vec![Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef],
300    /// )?;
301    ///
302    /// let mut encoder = WriterBuilder::new(schema).build_encoder::<AvroSoeFormat>()?;
303    /// encoder.encode(&batch)?;
304    /// let rows = encoder.flush();
305    ///
306    /// assert_eq!(rows.iter().count(), 2);
307    /// # Ok(())
308    /// # }
309    /// ```
310    #[inline]
311    pub fn iter(&self) -> impl ExactSizeIterator<Item = Bytes> + '_ {
312        self.offsets.windows(2).map(|w| self.data.slice(w[0]..w[1]))
313    }
314}
315
316/// Builder to configure and create a `Writer`.
317#[derive(Debug, Clone)]
318pub struct WriterBuilder {
319    schema: Schema,
320    codec: Option<CompressionCodec>,
321    row_capacity: Option<usize>,
322    capacity: usize,
323    fingerprint_strategy: Option<FingerprintStrategy>,
324}
325
326impl WriterBuilder {
327    /// Create a new builder with default settings.
328    ///
329    /// The Avro schema used for writing is determined as follows:
330    /// 1) If the Arrow schema metadata contains `avro::schema` (see `SCHEMA_METADATA_KEY`),
331    ///    that JSON is used verbatim.
332    /// 2) Otherwise, the Arrow schema is converted to an Avro record schema.
333    pub fn new(schema: Schema) -> Self {
334        Self {
335            schema,
336            codec: None,
337            row_capacity: None,
338            capacity: 1024,
339            fingerprint_strategy: None,
340        }
341    }
342
343    /// Set the fingerprinting strategy for the stream writer.
344    /// This determines the per-record prefix format.
345    pub fn with_fingerprint_strategy(mut self, strategy: FingerprintStrategy) -> Self {
346        self.fingerprint_strategy = Some(strategy);
347        self
348    }
349
350    /// Change the compression codec.
351    pub fn with_compression(mut self, codec: Option<CompressionCodec>) -> Self {
352        self.codec = codec;
353        self
354    }
355
356    /// Sets the expected capacity (in bytes) for internal buffers.
357    ///
358    /// This is used as a hint to pre-allocate staging buffers for writing.
359    pub fn with_capacity(mut self, capacity: usize) -> Self {
360        self.capacity = capacity;
361        self
362    }
363
364    /// Sets the expected byte size for each encoded row.
365    ///
366    /// This setting affects [`Encoder`] created via [`build_encoder`](Self::build_encoder).
367    /// It is used as a hint to reduce reallocations when the typical encoded row size is known.
368    pub fn with_row_capacity(mut self, capacity: usize) -> Self {
369        self.row_capacity = Some(capacity);
370        self
371    }
372
373    fn prepare_encoder<F: AvroFormat>(&self) -> Result<(Arc<Schema>, RecordEncoder), AvroError> {
374        let avro_schema = match self.schema.metadata.get(SCHEMA_METADATA_KEY) {
375            Some(json) => AvroSchema::new(json.clone()),
376            None => AvroSchema::try_from(&self.schema)?,
377        };
378        let maybe_fingerprint = if F::NEEDS_PREFIX {
379            match &self.fingerprint_strategy {
380                Some(FingerprintStrategy::Id(id)) => Some(Fingerprint::Id(*id)),
381                Some(FingerprintStrategy::Id64(id)) => Some(Fingerprint::Id64(*id)),
382                Some(strategy) => {
383                    Some(avro_schema.fingerprint(FingerprintAlgorithm::from(*strategy))?)
384                }
385                None => Some(
386                    avro_schema
387                        .fingerprint(FingerprintAlgorithm::from(FingerprintStrategy::Rabin))?,
388                ),
389            }
390        } else {
391            None
392        };
393        let mut md = self.schema.metadata().clone();
394        md.insert(
395            SCHEMA_METADATA_KEY.to_string(),
396            avro_schema.clone().json_string,
397        );
398        let schema = Arc::new(Schema::new_with_metadata(self.schema.fields().clone(), md));
399        let avro_root = AvroFieldBuilder::new(&avro_schema.schema()?).build()?;
400        let encoder = RecordEncoderBuilder::new(&avro_root, schema.as_ref())
401            .with_fingerprint(maybe_fingerprint)
402            .build()?;
403        Ok((schema, encoder))
404    }
405
406    /// Build a new [`Encoder`] for the given [`AvroFormat`].
407    ///
408    /// `Encoder` only supports stream formats (no OCF sync markers). Attempting to build an
409    /// encoder with an OCF format (e.g. [`AvroOcfFormat`]) will return an error.
410    pub fn build_encoder<F: AvroFormat>(self) -> Result<Encoder, AvroError> {
411        if F::default().sync_marker().is_some() {
412            return Err(AvroError::InvalidArgument(
413                "Encoder only supports stream formats (no OCF header/sync marker)".to_string(),
414            ));
415        }
416        let (schema, encoder) = self.prepare_encoder::<F>()?;
417        Ok(Encoder {
418            schema,
419            encoder,
420            row_capacity: self.row_capacity,
421            buffer: BytesMut::with_capacity(self.capacity),
422            offsets: vec![0],
423        })
424    }
425
426    /// Build a new [`Writer`] with the specified [`AvroFormat`] and builder options.
427    pub fn build<W, F>(self, mut writer: W) -> Result<Writer<W, F>, AvroError>
428    where
429        W: Write,
430        F: AvroFormat,
431    {
432        let mut format = F::default();
433        if format.sync_marker().is_none() && !F::NEEDS_PREFIX {
434            return Err(AvroError::InvalidArgument(
435                "AvroBinaryFormat is only supported with Encoder, use build_encoder instead"
436                    .to_string(),
437            ));
438        }
439        let (schema, encoder) = self.prepare_encoder::<F>()?;
440        format.start_stream(&mut writer, &schema, self.codec)?;
441        Ok(Writer {
442            writer,
443            schema,
444            format,
445            compression: self.codec,
446            capacity: self.capacity,
447            encoder,
448        })
449    }
450}
451
452/// A row-by-row encoder for Avro *stream/message* formats (SOE / registry wire formats / raw binary).
453///
454/// Unlike [`Writer`], which emits a single continuous byte stream to a [`std::io::Write`] sink,
455/// `Encoder` tracks row boundaries during encoding and returns an [`EncodedRows`] containing:
456/// - one backing buffer (`Bytes`)
457/// - row boundary offsets
458///
459/// This enables zero-copy per-row payloads (for instance, one Kafka message per Arrow row) without
460/// re-encoding or decoding the byte stream to recover record boundaries.
461///
462/// ### Example
463///
464/// ```
465/// use std::sync::Arc;
466/// use arrow_array::{ArrayRef, Int32Array, RecordBatch};
467/// use arrow_schema::{DataType, Field, Schema};
468/// use arrow_avro::writer::{WriterBuilder, format::AvroSoeFormat};
469/// use arrow_avro::schema::FingerprintStrategy;
470///
471/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
472/// let schema = Schema::new(vec![Field::new("value", DataType::Int32, false)]);
473/// let batch = RecordBatch::try_new(
474///     Arc::new(schema.clone()),
475///     vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef],
476/// )?;
477///
478/// // Configure the encoder (here: Confluent Wire Format with schema ID 100)
479/// let mut encoder = WriterBuilder::new(schema)
480///     .with_fingerprint_strategy(FingerprintStrategy::Id(100))
481///     .build_encoder::<AvroSoeFormat>()?;
482///
483/// // Encode the batch
484/// encoder.encode(&batch)?;
485///
486/// // Get the encoded rows
487/// let rows = encoder.flush();
488///
489/// // Convert to owned Vec<u8> payloads (e.g., for a Kafka producer)
490/// let payloads: Vec<Vec<u8>> = rows.iter().map(|row| row.to_vec()).collect();
491///
492/// assert_eq!(payloads.len(), 3);
493/// assert_eq!(payloads[0][0], 0x00); // Magic byte
494/// # Ok(())
495/// # }
496/// ```
497#[derive(Debug)]
498pub struct Encoder {
499    schema: SchemaRef,
500    encoder: RecordEncoder,
501    row_capacity: Option<usize>,
502    buffer: BytesMut,
503    offsets: Vec<usize>,
504}
505
506impl Encoder {
507    /// Serialize one [`RecordBatch`] into the internal buffer.
508    pub fn encode(&mut self, batch: &RecordBatch) -> Result<(), AvroError> {
509        if batch.schema().fields() != self.schema.fields() {
510            return Err(AvroError::SchemaError(
511                "Schema of RecordBatch differs from Writer schema".to_string(),
512            ));
513        }
514        self.encoder.encode_rows(
515            batch,
516            self.row_capacity.unwrap_or(0),
517            &mut self.buffer,
518            &mut self.offsets,
519        )?;
520        Ok(())
521    }
522
523    /// A convenience method to write a slice of [`RecordBatch`] values.
524    pub fn encode_batches(&mut self, batches: &[RecordBatch]) -> Result<(), AvroError> {
525        for b in batches {
526            self.encode(b)?;
527        }
528        Ok(())
529    }
530
531    /// Drain and return all currently buffered encoded rows.
532    ///
533    /// The returned [`EncodedRows`] provides per-row payloads as `Bytes` slices.
534    pub fn flush(&mut self) -> EncodedRows {
535        let data = self.buffer.split().freeze();
536        let mut offsets = Vec::with_capacity(self.offsets.len());
537        offsets.append(&mut self.offsets);
538        self.offsets.push(0);
539        EncodedRows::new(data, offsets)
540    }
541
542    /// Returns the Arrow schema used by this encoder.
543    ///
544    /// The returned schema includes metadata with the Avro schema JSON under
545    /// the `avro.schema` key.
546    pub fn schema(&self) -> SchemaRef {
547        self.schema.clone()
548    }
549
550    /// Returns the number of encoded rows currently buffered.
551    pub fn buffered_len(&self) -> usize {
552        self.offsets.len().saturating_sub(1)
553    }
554}
555
556/// Generic Avro writer.
557///
558/// This type is generic over the output Write sink (`W`) and the Avro format (`F`).
559/// You’ll usually use the concrete aliases:
560///
561/// * **[`AvroWriter`]** for **OCF** (self‑describing container file)
562/// * **[`AvroStreamWriter`]** for **SOE** Avro streams
563#[derive(Debug)]
564pub struct Writer<W: Write, F: AvroFormat> {
565    writer: W,
566    schema: SchemaRef,
567    format: F,
568    compression: Option<CompressionCodec>,
569    capacity: usize,
570    encoder: RecordEncoder,
571}
572
573/// Alias for an Avro **Object Container File** writer.
574///
575/// ### Quickstart (runnable)
576///
577/// ```
578/// use std::io::Cursor;
579/// use std::sync::Arc;
580/// use arrow_array::{ArrayRef, Int64Array, StringArray, RecordBatch};
581/// use arrow_schema::{DataType, Field, Schema};
582/// use arrow_avro::writer::AvroWriter;
583/// use arrow_avro::reader::ReaderBuilder;
584///
585/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
586/// // Writer schema: { id: long, name: string }
587/// let writer_schema = Schema::new(vec![
588///     Field::new("id", DataType::Int64, false),
589///     Field::new("name", DataType::Utf8, false),
590/// ]);
591///
592/// // Build a RecordBatch with two rows
593/// let batch = RecordBatch::try_new(
594///     Arc::new(writer_schema.clone()),
595///     vec![
596///         Arc::new(Int64Array::from(vec![1, 2])) as ArrayRef,
597///         Arc::new(StringArray::from(vec!["a", "b"])) as ArrayRef,
598///     ],
599/// )?;
600///
601/// // Write an Avro **Object Container File** (OCF) to memory
602/// let mut w = AvroWriter::new(Vec::<u8>::new(), writer_schema.clone())?;
603/// w.write(&batch)?;
604/// w.finish()?;
605/// let bytes = w.into_inner();
606///
607/// // Build a Reader and decode the batch back
608/// let mut r = ReaderBuilder::new().build(Cursor::new(bytes))?;
609/// let out = r.next().unwrap()?;
610/// assert_eq!(out.num_rows(), 2);
611/// # Ok(()) }
612/// ```
613pub type AvroWriter<W> = Writer<W, AvroOcfFormat>;
614
615/// Alias for an Avro **Single Object Encoding** stream writer.
616///
617/// ### Example
618///
619/// This writer automatically adds the appropriate per-record prefix (based on the
620/// fingerprint strategy) before the Avro body of each record. The default is Single
621/// Object Encoding (SOE) with a Rabin fingerprint.
622///
623/// ```
624/// use std::sync::Arc;
625/// use arrow_array::{ArrayRef, Int64Array, RecordBatch};
626/// use arrow_schema::{DataType, Field, Schema};
627/// use arrow_avro::writer::AvroStreamWriter;
628///
629/// # fn main() -> Result<(), Box<dyn std::error::Error>> {
630/// // One‑column Arrow batch
631/// let schema = Schema::new(vec![Field::new("x", DataType::Int64, false)]);
632/// let batch = RecordBatch::try_new(
633///     Arc::new(schema.clone()),
634///     vec![Arc::new(Int64Array::from(vec![10, 20])) as ArrayRef],
635/// )?;
636///
637/// // Write an Avro Single Object Encoding stream to a Vec<u8>
638/// let sink: Vec<u8> = Vec::new();
639/// let mut w = AvroStreamWriter::new(sink, schema)?;
640/// w.write(&batch)?;
641/// w.finish()?;
642/// let bytes = w.into_inner();
643/// assert!(!bytes.is_empty());
644/// # Ok(()) }
645/// ```
646pub type AvroStreamWriter<W> = Writer<W, AvroSoeFormat>;
647
648impl<W: Write> Writer<W, AvroOcfFormat> {
649    /// Convenience constructor – same as [`WriterBuilder::build`] with `AvroOcfFormat`.
650    ///
651    /// ### Example
652    ///
653    /// ```
654    /// use std::sync::Arc;
655    /// use arrow_array::{ArrayRef, Int32Array, RecordBatch};
656    /// use arrow_schema::{DataType, Field, Schema};
657    /// use arrow_avro::writer::AvroWriter;
658    ///
659    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
660    /// let schema = Schema::new(vec![Field::new("id", DataType::Int32, false)]);
661    /// let batch = RecordBatch::try_new(
662    ///     Arc::new(schema.clone()),
663    ///     vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef],
664    /// )?;
665    ///
666    /// let buf: Vec<u8> = Vec::new();
667    /// let mut w = AvroWriter::new(buf, schema)?;
668    /// w.write(&batch)?;
669    /// w.finish()?;
670    /// let bytes = w.into_inner();
671    /// assert!(!bytes.is_empty());
672    /// # Ok(()) }
673    /// ```
674    pub fn new(writer: W, schema: Schema) -> Result<Self, AvroError> {
675        WriterBuilder::new(schema).build::<W, AvroOcfFormat>(writer)
676    }
677
678    /// Return a reference to the 16‑byte sync marker generated for this file.
679    pub fn sync_marker(&self) -> Option<&[u8; 16]> {
680        self.format.sync_marker()
681    }
682}
683
684impl<W: Write> Writer<W, AvroSoeFormat> {
685    /// Convenience constructor to create a new [`AvroStreamWriter`].
686    ///
687    /// The resulting stream contains **Single Object Encodings** (no OCF header/sync).
688    ///
689    /// ### Example
690    ///
691    /// ```
692    /// use std::sync::Arc;
693    /// use arrow_array::{ArrayRef, Int64Array, RecordBatch};
694    /// use arrow_schema::{DataType, Field, Schema};
695    /// use arrow_avro::writer::AvroStreamWriter;
696    ///
697    /// # fn main() -> Result<(), Box<dyn std::error::Error>> {
698    /// let schema = Schema::new(vec![Field::new("x", DataType::Int64, false)]);
699    /// let batch = RecordBatch::try_new(
700    ///     Arc::new(schema.clone()),
701    ///     vec![Arc::new(Int64Array::from(vec![10, 20])) as ArrayRef],
702    /// )?;
703    ///
704    /// let sink: Vec<u8> = Vec::new();
705    /// let mut w = AvroStreamWriter::new(sink, schema)?;
706    /// w.write(&batch)?;
707    /// w.finish()?;
708    /// let bytes = w.into_inner();
709    /// assert!(!bytes.is_empty());
710    /// # Ok(()) }
711    /// ```
712    pub fn new(writer: W, schema: Schema) -> Result<Self, AvroError> {
713        WriterBuilder::new(schema).build::<W, AvroSoeFormat>(writer)
714    }
715}
716
717impl<W: Write, F: AvroFormat> Writer<W, F> {
718    /// Serialize one [`RecordBatch`] to the output.
719    pub fn write(&mut self, batch: &RecordBatch) -> Result<(), AvroError> {
720        if batch.schema().fields() != self.schema.fields() {
721            return Err(AvroError::SchemaError(
722                "Schema of RecordBatch differs from Writer schema".to_string(),
723            ));
724        }
725        match self.format.sync_marker() {
726            Some(&sync) => self.write_ocf_block(batch, &sync),
727            None => self.write_stream(batch),
728        }
729    }
730
731    /// A convenience method to write a slice of [`RecordBatch`].
732    ///
733    /// This is equivalent to calling `write` for each batch in the slice.
734    pub fn write_batches(&mut self, batches: &[&RecordBatch]) -> Result<(), AvroError> {
735        for b in batches {
736            self.write(b)?;
737        }
738        Ok(())
739    }
740
741    /// Flush remaining buffered data and (for OCF) ensure the header is present.
742    pub fn finish(&mut self) -> Result<(), AvroError> {
743        self.writer
744            .flush()
745            .map_err(|e| AvroError::IoError(format!("Error flushing writer: {e}"), e))
746    }
747
748    /// Consume the writer, returning the underlying output object.
749    pub fn into_inner(self) -> W {
750        self.writer
751    }
752
753    fn write_ocf_block(&mut self, batch: &RecordBatch, sync: &[u8; 16]) -> Result<(), AvroError> {
754        let mut buf = Vec::<u8>::with_capacity(self.capacity);
755        self.encoder.encode(&mut buf, batch)?;
756        let encoded = match self.compression {
757            Some(codec) => codec.compress(&buf)?,
758            None => buf,
759        };
760        write_long(&mut self.writer, batch.num_rows() as i64)?;
761        write_long(&mut self.writer, encoded.len() as i64)?;
762        self.writer
763            .write_all(&encoded)
764            .map_err(|e| AvroError::IoError(format!("Error writing Avro block: {e}"), e))?;
765        self.writer
766            .write_all(sync)
767            .map_err(|e| AvroError::IoError(format!("Error writing Avro sync: {e}"), e))?;
768        Ok(())
769    }
770
771    fn write_stream(&mut self, batch: &RecordBatch) -> Result<(), AvroError> {
772        self.encoder.encode(&mut self.writer, batch)?;
773        Ok(())
774    }
775}
776
777#[cfg(test)]
778mod tests {
779    use super::*;
780    use crate::compression::CompressionCodec;
781    use crate::reader::ReaderBuilder;
782    use crate::schema::{AvroSchema, SchemaStore};
783    use crate::test_util::arrow_test_data;
784    use arrow::datatypes::TimeUnit;
785    use arrow::util::pretty::pretty_format_batches;
786    use arrow_array::builder::{Int32Builder, ListBuilder};
787    #[cfg(feature = "avro_custom_types")]
788    use arrow_array::types::{Int16Type, Int32Type, Int64Type};
789    use arrow_array::types::{
790        Time32MillisecondType, Time64MicrosecondType, TimestampMicrosecondType,
791        TimestampMillisecondType, TimestampNanosecondType,
792    };
793    use arrow_array::{
794        Array, ArrayRef, BinaryArray, BooleanArray, Date32Array, Int32Array, Int64Array,
795        PrimitiveArray, RecordBatch, StringArray, StructArray, UnionArray,
796    };
797    #[cfg(feature = "avro_custom_types")]
798    use arrow_array::{Int16Array, RunArray};
799    use arrow_schema::UnionMode;
800    #[cfg(not(feature = "avro_custom_types"))]
801    use arrow_schema::{DataType, Field, Schema};
802    #[cfg(feature = "avro_custom_types")]
803    use arrow_schema::{DataType, Field, Schema};
804    use bytes::BytesMut;
805    use std::collections::HashMap;
806    use std::collections::HashSet;
807    use std::fs::File;
808    use std::io::{BufReader, Cursor};
809    use std::path::PathBuf;
810    use std::sync::Arc;
811    use tempfile::NamedTempFile;
812
813    fn files() -> impl Iterator<Item = &'static str> {
814        [
815            // TODO: avoid requiring snappy for this file
816            #[cfg(feature = "snappy")]
817            "avro/alltypes_plain.avro",
818            #[cfg(feature = "snappy")]
819            "avro/alltypes_plain.snappy.avro",
820            #[cfg(feature = "zstd")]
821            "avro/alltypes_plain.zstandard.avro",
822            #[cfg(feature = "bzip2")]
823            "avro/alltypes_plain.bzip2.avro",
824            #[cfg(feature = "xz")]
825            "avro/alltypes_plain.xz.avro",
826        ]
827        .into_iter()
828    }
829
830    fn make_schema() -> Schema {
831        Schema::new(vec![
832            Field::new("id", DataType::Int32, false),
833            Field::new("name", DataType::Binary, false),
834        ])
835    }
836
837    fn make_batch() -> RecordBatch {
838        let ids = Int32Array::from(vec![1, 2, 3]);
839        let names = BinaryArray::from_vec(vec![b"a".as_ref(), b"b".as_ref(), b"c".as_ref()]);
840        RecordBatch::try_new(
841            Arc::new(make_schema()),
842            vec![Arc::new(ids) as ArrayRef, Arc::new(names) as ArrayRef],
843        )
844        .expect("failed to build test RecordBatch")
845    }
846
847    #[test]
848    fn test_stream_writer_writes_prefix_per_row_rt() -> Result<(), AvroError> {
849        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
850        let batch = RecordBatch::try_new(
851            Arc::new(schema.clone()),
852            vec![Arc::new(Int32Array::from(vec![10, 20])) as ArrayRef],
853        )?;
854        let buf: Vec<u8> = Vec::new();
855        let mut writer = AvroStreamWriter::new(buf, schema.clone())?;
856        writer.write(&batch)?;
857        let encoded = writer.into_inner();
858        let mut store = SchemaStore::new(); // Rabin by default
859        let avro_schema = AvroSchema::try_from(&schema)?;
860        let _fp = store.register(avro_schema)?;
861        let mut decoder = ReaderBuilder::new()
862            .with_writer_schema_store(store)
863            .build_decoder()?;
864        let _consumed = decoder.decode(&encoded)?;
865        let decoded = decoder
866            .flush()?
867            .expect("expected at least one batch from decoder");
868        assert_eq!(decoded.num_columns(), 1);
869        assert_eq!(decoded.num_rows(), 2);
870        let col = decoded
871            .column(0)
872            .as_any()
873            .downcast_ref::<Int32Array>()
874            .expect("int column");
875        assert_eq!(col, &Int32Array::from(vec![10, 20]));
876        Ok(())
877    }
878
879    #[test]
880    fn test_nullable_struct_with_nonnullable_field_sliced_encoding() {
881        use arrow_array::{ArrayRef, Int32Array, StringArray, StructArray};
882        use arrow_buffer::NullBuffer;
883        use arrow_schema::{DataType, Field, Fields, Schema};
884        use std::sync::Arc;
885        let inner_fields = Fields::from(vec![
886            Field::new("id", DataType::Int32, false), // non-nullable
887            Field::new("name", DataType::Utf8, true), // nullable
888        ]);
889        let inner_struct_type = DataType::Struct(inner_fields.clone());
890        let schema = Schema::new(vec![
891            Field::new("before", inner_struct_type.clone(), true), // nullable struct
892            Field::new("after", inner_struct_type.clone(), true),  // nullable struct
893            Field::new("op", DataType::Utf8, false),               // non-nullable
894        ]);
895        let before_ids = Int32Array::from(vec![None, None]);
896        let before_names = StringArray::from(vec![None::<&str>, None]);
897        let before_struct = StructArray::new(
898            inner_fields.clone(),
899            vec![
900                Arc::new(before_ids) as ArrayRef,
901                Arc::new(before_names) as ArrayRef,
902            ],
903            Some(NullBuffer::from(vec![false, false])),
904        );
905        let after_ids = Int32Array::from(vec![1, 2]); // non-nullable, no nulls
906        let after_names = StringArray::from(vec![Some("Alice"), Some("Bob")]);
907        let after_struct = StructArray::new(
908            inner_fields.clone(),
909            vec![
910                Arc::new(after_ids) as ArrayRef,
911                Arc::new(after_names) as ArrayRef,
912            ],
913            Some(NullBuffer::from(vec![true, true])),
914        );
915        let op_col = StringArray::from(vec!["r", "r"]);
916        let batch = RecordBatch::try_new(
917            Arc::new(schema.clone()),
918            vec![
919                Arc::new(before_struct) as ArrayRef,
920                Arc::new(after_struct) as ArrayRef,
921                Arc::new(op_col) as ArrayRef,
922            ],
923        )
924        .expect("failed to create test batch");
925        let mut sink = Vec::new();
926        let mut writer = WriterBuilder::new(schema)
927            .with_fingerprint_strategy(FingerprintStrategy::Id(1))
928            .build::<_, AvroSoeFormat>(&mut sink)
929            .expect("failed to create writer");
930        for row_idx in 0..batch.num_rows() {
931            let single_row = batch.slice(row_idx, 1);
932            let after_col = single_row.column(1);
933            assert_eq!(
934                after_col.null_count(),
935                0,
936                "after column should have no nulls in sliced row"
937            );
938            writer
939                .write(&single_row)
940                .unwrap_or_else(|e| panic!("Failed to encode row {row_idx}: {e}"));
941        }
942        writer.finish().expect("failed to finish writer");
943        assert!(!sink.is_empty(), "encoded output should not be empty");
944    }
945
946    #[test]
947    fn test_nullable_struct_with_decimal_and_timestamp_sliced() {
948        use arrow_array::{
949            ArrayRef, Decimal128Array, Int32Array, StringArray, StructArray,
950            TimestampMicrosecondArray,
951        };
952        use arrow_buffer::NullBuffer;
953        use arrow_schema::{DataType, Field, Fields, Schema};
954        use std::sync::Arc;
955        let row_fields = Fields::from(vec![
956            Field::new("id", DataType::Int32, false),
957            Field::new("name", DataType::Utf8, true),
958            Field::new("category", DataType::Utf8, true),
959            Field::new("price", DataType::Decimal128(10, 2), true),
960            Field::new("stock_quantity", DataType::Int32, true),
961            Field::new(
962                "created_at",
963                DataType::Timestamp(TimeUnit::Microsecond, None),
964                true,
965            ),
966        ]);
967        let row_struct_type = DataType::Struct(row_fields.clone());
968        let schema = Schema::new(vec![
969            Field::new("before", row_struct_type.clone(), true),
970            Field::new("after", row_struct_type.clone(), true),
971            Field::new("op", DataType::Utf8, false),
972        ]);
973        let before_struct = StructArray::new_null(row_fields.clone(), 2);
974        let ids = Int32Array::from(vec![1, 2]);
975        let names = StringArray::from(vec![Some("Widget"), Some("Gadget")]);
976        let categories = StringArray::from(vec![Some("Electronics"), Some("Electronics")]);
977        let prices = Decimal128Array::from(vec![Some(1999), Some(2999)])
978            .with_precision_and_scale(10, 2)
979            .unwrap();
980        let quantities = Int32Array::from(vec![Some(100), Some(50)]);
981        let timestamps = TimestampMicrosecondArray::from(vec![
982            Some(1700000000000000i64),
983            Some(1700000001000000i64),
984        ]);
985        let after_struct = StructArray::new(
986            row_fields.clone(),
987            vec![
988                Arc::new(ids) as ArrayRef,
989                Arc::new(names) as ArrayRef,
990                Arc::new(categories) as ArrayRef,
991                Arc::new(prices) as ArrayRef,
992                Arc::new(quantities) as ArrayRef,
993                Arc::new(timestamps) as ArrayRef,
994            ],
995            Some(NullBuffer::from(vec![true, true])),
996        );
997        let op_col = StringArray::from(vec!["r", "r"]);
998        let batch = RecordBatch::try_new(
999            Arc::new(schema.clone()),
1000            vec![
1001                Arc::new(before_struct) as ArrayRef,
1002                Arc::new(after_struct) as ArrayRef,
1003                Arc::new(op_col) as ArrayRef,
1004            ],
1005        )
1006        .expect("failed to create products batch");
1007        let mut sink = Vec::new();
1008        let mut writer = WriterBuilder::new(schema)
1009            .with_fingerprint_strategy(FingerprintStrategy::Id(1))
1010            .build::<_, AvroSoeFormat>(&mut sink)
1011            .expect("failed to create writer");
1012        // Encode row by row
1013        for row_idx in 0..batch.num_rows() {
1014            let single_row = batch.slice(row_idx, 1);
1015            writer
1016                .write(&single_row)
1017                .unwrap_or_else(|e| panic!("Failed to encode product row {row_idx}: {e}"));
1018        }
1019        writer.finish().expect("failed to finish writer");
1020        assert!(!sink.is_empty());
1021    }
1022
1023    #[test]
1024    fn non_nullable_child_in_nullable_struct_should_encode_per_row() {
1025        use arrow_array::{
1026            ArrayRef, Int32Array, Int64Array, RecordBatch, StringArray, StructArray,
1027        };
1028        use arrow_schema::{DataType, Field, Fields, Schema};
1029        use std::sync::Arc;
1030        let row_fields = Fields::from(vec![
1031            Field::new("id", DataType::Int32, false),
1032            Field::new("name", DataType::Utf8, true),
1033        ]);
1034        let row_struct_dt = DataType::Struct(row_fields.clone());
1035        let before: ArrayRef = Arc::new(StructArray::new_null(row_fields.clone(), 1));
1036        let id_col: ArrayRef = Arc::new(Int32Array::from(vec![1]));
1037        let name_col: ArrayRef = Arc::new(StringArray::from(vec![None::<&str>]));
1038        let after: ArrayRef = Arc::new(StructArray::new(
1039            row_fields.clone(),
1040            vec![id_col, name_col],
1041            None,
1042        ));
1043        let schema = Arc::new(Schema::new(vec![
1044            Field::new("before", row_struct_dt.clone(), true),
1045            Field::new("after", row_struct_dt, true),
1046            Field::new("op", DataType::Utf8, false),
1047            Field::new("ts_ms", DataType::Int64, false),
1048        ]));
1049        let op = Arc::new(StringArray::from(vec!["r"])) as ArrayRef;
1050        let ts_ms = Arc::new(Int64Array::from(vec![1732900000000_i64])) as ArrayRef;
1051        let batch = RecordBatch::try_new(schema.clone(), vec![before, after, op, ts_ms]).unwrap();
1052        let mut buf = Vec::new();
1053        let mut writer = WriterBuilder::new(schema.as_ref().clone())
1054            .build::<_, AvroSoeFormat>(&mut buf)
1055            .unwrap();
1056        let single = batch.slice(0, 1);
1057        let res = writer.write(&single);
1058        assert!(
1059            res.is_ok(),
1060            "expected to encode successfully, got: {:?}",
1061            res.err()
1062        );
1063    }
1064
1065    #[test]
1066    fn test_union_nonzero_type_ids() -> Result<(), AvroError> {
1067        use arrow_array::UnionArray;
1068        use arrow_buffer::Buffer;
1069        use arrow_schema::UnionFields;
1070        let union_fields = UnionFields::try_new(
1071            vec![2, 5],
1072            vec![
1073                Field::new("v_str", DataType::Utf8, true),
1074                Field::new("v_int", DataType::Int32, true),
1075            ],
1076        )
1077        .unwrap();
1078        let strings = StringArray::from(vec!["hello", "world"]);
1079        let ints = Int32Array::from(vec![10, 20, 30]);
1080        let type_ids = Buffer::from_slice_ref([2_i8, 5, 5, 2, 5]);
1081        let offsets = Buffer::from_slice_ref([0_i32, 0, 1, 1, 2]);
1082        let union_array = UnionArray::try_new(
1083            union_fields.clone(),
1084            type_ids.into(),
1085            Some(offsets.into()),
1086            vec![Arc::new(strings) as ArrayRef, Arc::new(ints) as ArrayRef],
1087        )?;
1088        let schema = Schema::new(vec![Field::new(
1089            "union_col",
1090            DataType::Union(union_fields, UnionMode::Dense),
1091            false,
1092        )]);
1093        let batch = RecordBatch::try_new(
1094            Arc::new(schema.clone()),
1095            vec![Arc::new(union_array) as ArrayRef],
1096        )?;
1097        let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
1098        assert!(
1099            writer.write(&batch).is_ok(),
1100            "Expected no error from writing"
1101        );
1102        writer.finish()?;
1103        assert!(
1104            writer.finish().is_ok(),
1105            "Expected no error from finishing writer"
1106        );
1107        Ok(())
1108    }
1109
1110    #[test]
1111    fn test_stream_writer_with_id_fingerprint_rt() -> Result<(), AvroError> {
1112        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1113        let batch = RecordBatch::try_new(
1114            Arc::new(schema.clone()),
1115            vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef],
1116        )?;
1117        let schema_id: u32 = 42;
1118        let mut writer = WriterBuilder::new(schema.clone())
1119            .with_fingerprint_strategy(FingerprintStrategy::Id(schema_id))
1120            .build::<_, AvroSoeFormat>(Vec::new())?;
1121        writer.write(&batch)?;
1122        let encoded = writer.into_inner();
1123        let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id);
1124        let avro_schema = AvroSchema::try_from(&schema)?;
1125        let _ = store.set(Fingerprint::Id(schema_id), avro_schema)?;
1126        let mut decoder = ReaderBuilder::new()
1127            .with_writer_schema_store(store)
1128            .build_decoder()?;
1129        let _ = decoder.decode(&encoded)?;
1130        let decoded = decoder
1131            .flush()?
1132            .expect("expected at least one batch from decoder");
1133        assert_eq!(decoded.num_columns(), 1);
1134        assert_eq!(decoded.num_rows(), 3);
1135        let col = decoded
1136            .column(0)
1137            .as_any()
1138            .downcast_ref::<Int32Array>()
1139            .expect("int column");
1140        assert_eq!(col, &Int32Array::from(vec![1, 2, 3]));
1141        Ok(())
1142    }
1143
1144    #[test]
1145    fn test_stream_writer_with_id64_fingerprint_rt() -> Result<(), AvroError> {
1146        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
1147        let batch = RecordBatch::try_new(
1148            Arc::new(schema.clone()),
1149            vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef],
1150        )?;
1151        let schema_id: u64 = 42;
1152        let mut writer = WriterBuilder::new(schema.clone())
1153            .with_fingerprint_strategy(FingerprintStrategy::Id64(schema_id))
1154            .build::<_, AvroSoeFormat>(Vec::new())?;
1155        writer.write(&batch)?;
1156        let encoded = writer.into_inner();
1157        let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id64);
1158        let avro_schema = AvroSchema::try_from(&schema)?;
1159        let _ = store.set(Fingerprint::Id64(schema_id), avro_schema)?;
1160        let mut decoder = ReaderBuilder::new()
1161            .with_writer_schema_store(store)
1162            .build_decoder()?;
1163        let _ = decoder.decode(&encoded)?;
1164        let decoded = decoder
1165            .flush()?
1166            .expect("expected at least one batch from decoder");
1167        assert_eq!(decoded.num_columns(), 1);
1168        assert_eq!(decoded.num_rows(), 3);
1169        let col = decoded
1170            .column(0)
1171            .as_any()
1172            .downcast_ref::<Int32Array>()
1173            .expect("int column");
1174        assert_eq!(col, &Int32Array::from(vec![1, 2, 3]));
1175        Ok(())
1176    }
1177
1178    #[test]
1179    fn test_ocf_writer_generates_header_and_sync() -> Result<(), AvroError> {
1180        let batch = make_batch();
1181        let buffer: Vec<u8> = Vec::new();
1182        let mut writer = AvroWriter::new(buffer, make_schema())?;
1183        writer.write(&batch)?;
1184        writer.finish()?;
1185        let out = writer.into_inner();
1186        assert_eq!(&out[..4], b"Obj\x01", "OCF magic bytes missing/incorrect");
1187        let trailer = &out[out.len() - 16..];
1188        assert_eq!(trailer.len(), 16, "expected 16‑byte sync marker");
1189        Ok(())
1190    }
1191
1192    #[test]
1193    fn test_schema_mismatch_yields_error() {
1194        let batch = make_batch();
1195        let alt_schema = Schema::new(vec![Field::new("x", DataType::Int32, false)]);
1196        let buffer = Vec::<u8>::new();
1197        let mut writer = AvroWriter::new(buffer, alt_schema).unwrap();
1198        let err = writer.write(&batch).unwrap_err();
1199        assert!(matches!(err, AvroError::SchemaError(_)));
1200    }
1201
1202    #[test]
1203    fn test_write_batches_accumulates_multiple() -> Result<(), AvroError> {
1204        let batch1 = make_batch();
1205        let batch2 = make_batch();
1206        let buffer = Vec::<u8>::new();
1207        let mut writer = AvroWriter::new(buffer, make_schema())?;
1208        writer.write_batches(&[&batch1, &batch2])?;
1209        writer.finish()?;
1210        let out = writer.into_inner();
1211        assert!(out.len() > 4, "combined batches produced tiny file");
1212        Ok(())
1213    }
1214
1215    #[test]
1216    fn test_finish_without_write_adds_header() -> Result<(), AvroError> {
1217        let buffer = Vec::<u8>::new();
1218        let mut writer = AvroWriter::new(buffer, make_schema())?;
1219        writer.finish()?;
1220        let out = writer.into_inner();
1221        assert_eq!(&out[..4], b"Obj\x01", "finish() should emit OCF header");
1222        Ok(())
1223    }
1224
1225    #[test]
1226    fn test_write_long_encodes_zigzag_varint() -> Result<(), AvroError> {
1227        let mut buf = Vec::new();
1228        write_long(&mut buf, 0)?;
1229        write_long(&mut buf, -1)?;
1230        write_long(&mut buf, 1)?;
1231        write_long(&mut buf, -2)?;
1232        write_long(&mut buf, 2147483647)?;
1233        assert!(
1234            buf.starts_with(&[0x00, 0x01, 0x02, 0x03]),
1235            "zig‑zag varint encodings incorrect: {buf:?}"
1236        );
1237        Ok(())
1238    }
1239
1240    #[test]
1241    fn test_roundtrip_alltypes_roundtrip_writer() -> Result<(), AvroError> {
1242        for rel in files() {
1243            let path = arrow_test_data(rel);
1244            let rdr_file = File::open(&path).expect("open input avro");
1245            let reader = ReaderBuilder::new()
1246                .build(BufReader::new(rdr_file))
1247                .expect("build reader");
1248            let schema = reader.schema();
1249            let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
1250            let original =
1251                arrow::compute::concat_batches(&schema, &input_batches).expect("concat input");
1252            let tmp = NamedTempFile::new().expect("create temp file");
1253            let out_path = tmp.into_temp_path();
1254            let out_file = File::create(&out_path).expect("create temp avro");
1255            let codec = if rel.contains(".snappy.") {
1256                Some(CompressionCodec::Snappy)
1257            } else if rel.contains(".zstandard.") {
1258                Some(CompressionCodec::ZStandard)
1259            } else if rel.contains(".bzip2.") {
1260                Some(CompressionCodec::Bzip2)
1261            } else if rel.contains(".xz.") {
1262                Some(CompressionCodec::Xz)
1263            } else {
1264                None
1265            };
1266            let mut writer = WriterBuilder::new(original.schema().as_ref().clone())
1267                .with_compression(codec)
1268                .build::<_, AvroOcfFormat>(out_file)?;
1269            writer.write(&original)?;
1270            writer.finish()?;
1271            drop(writer);
1272            let rt_file = File::open(&out_path).expect("open roundtrip avro");
1273            let rt_reader = ReaderBuilder::new()
1274                .build(BufReader::new(rt_file))
1275                .expect("build roundtrip reader");
1276            let rt_schema = rt_reader.schema();
1277            let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1278            let roundtrip =
1279                arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip");
1280            assert_eq!(
1281                roundtrip, original,
1282                "Round-trip batch mismatch for file: {}",
1283                rel
1284            );
1285        }
1286        Ok(())
1287    }
1288
1289    #[test]
1290    fn test_roundtrip_nested_records_writer() -> Result<(), AvroError> {
1291        let path = arrow_test_data("avro/nested_records.avro");
1292        let rdr_file = File::open(&path).expect("open nested_records.avro");
1293        let reader = ReaderBuilder::new()
1294            .build(BufReader::new(rdr_file))
1295            .expect("build reader for nested_records.avro");
1296        let schema = reader.schema();
1297        let batches = reader.collect::<Result<Vec<_>, _>>()?;
1298        let original = arrow::compute::concat_batches(&schema, &batches).expect("concat original");
1299        let tmp = NamedTempFile::new().expect("create temp file");
1300        let out_path = tmp.into_temp_path();
1301        {
1302            let out_file = File::create(&out_path).expect("create output avro");
1303            let mut writer = AvroWriter::new(out_file, original.schema().as_ref().clone())?;
1304            writer.write(&original)?;
1305            writer.finish()?;
1306        }
1307        let rt_file = File::open(&out_path).expect("open round_trip avro");
1308        let rt_reader = ReaderBuilder::new()
1309            .build(BufReader::new(rt_file))
1310            .expect("build round_trip reader");
1311        let rt_schema = rt_reader.schema();
1312        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1313        let round_trip =
1314            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
1315        assert_eq!(
1316            round_trip, original,
1317            "Round-trip batch mismatch for nested_records.avro"
1318        );
1319        Ok(())
1320    }
1321
1322    #[test]
1323    #[cfg(feature = "snappy")]
1324    fn test_roundtrip_nested_lists_writer() -> Result<(), AvroError> {
1325        let path = arrow_test_data("avro/nested_lists.snappy.avro");
1326        let rdr_file = File::open(&path).expect("open nested_lists.snappy.avro");
1327        let reader = ReaderBuilder::new()
1328            .build(BufReader::new(rdr_file))
1329            .expect("build reader for nested_lists.snappy.avro");
1330        let schema = reader.schema();
1331        let batches = reader.collect::<Result<Vec<_>, _>>()?;
1332        let original = arrow::compute::concat_batches(&schema, &batches).expect("concat original");
1333        let tmp = NamedTempFile::new().expect("create temp file");
1334        let out_path = tmp.into_temp_path();
1335        {
1336            let out_file = File::create(&out_path).expect("create output avro");
1337            let mut writer = WriterBuilder::new(original.schema().as_ref().clone())
1338                .with_compression(Some(CompressionCodec::Snappy))
1339                .build::<_, AvroOcfFormat>(out_file)?;
1340            writer.write(&original)?;
1341            writer.finish()?;
1342        }
1343        let rt_file = File::open(&out_path).expect("open round_trip avro");
1344        let rt_reader = ReaderBuilder::new()
1345            .build(BufReader::new(rt_file))
1346            .expect("build round_trip reader");
1347        let rt_schema = rt_reader.schema();
1348        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1349        let round_trip =
1350            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
1351        assert_eq!(
1352            round_trip, original,
1353            "Round-trip batch mismatch for nested_lists.snappy.avro"
1354        );
1355        Ok(())
1356    }
1357
1358    #[test]
1359    fn test_round_trip_simple_fixed_ocf() -> Result<(), AvroError> {
1360        let path = arrow_test_data("avro/simple_fixed.avro");
1361        let rdr_file = File::open(&path).expect("open avro/simple_fixed.avro");
1362        let reader = ReaderBuilder::new()
1363            .build(BufReader::new(rdr_file))
1364            .expect("build avro reader");
1365        let schema = reader.schema();
1366        let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
1367        let original =
1368            arrow::compute::concat_batches(&schema, &input_batches).expect("concat input");
1369        let tmp = NamedTempFile::new().expect("create temp file");
1370        let out_file = File::create(tmp.path()).expect("create temp avro");
1371        let mut writer = AvroWriter::new(out_file, original.schema().as_ref().clone())?;
1372        writer.write(&original)?;
1373        writer.finish()?;
1374        drop(writer);
1375        let rt_file = File::open(tmp.path()).expect("open round_trip avro");
1376        let rt_reader = ReaderBuilder::new()
1377            .build(BufReader::new(rt_file))
1378            .expect("build round_trip reader");
1379        let rt_schema = rt_reader.schema();
1380        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1381        let round_trip =
1382            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
1383        assert_eq!(round_trip, original);
1384        Ok(())
1385    }
1386
1387    // Strict equality (schema + values) only when canonical extension types are enabled
1388    #[test]
1389    #[cfg(feature = "canonical_extension_types")]
1390    fn test_round_trip_duration_and_uuid_ocf() -> Result<(), AvroError> {
1391        use arrow_schema::{DataType, IntervalUnit};
1392        let in_file =
1393            File::open("test/data/duration_uuid.avro").expect("open test/data/duration_uuid.avro");
1394        let reader = ReaderBuilder::new()
1395            .build(BufReader::new(in_file))
1396            .expect("build reader for duration_uuid.avro");
1397        let in_schema = reader.schema();
1398        let has_mdn = in_schema.fields().iter().any(|f| {
1399            matches!(
1400                f.data_type(),
1401                DataType::Interval(IntervalUnit::MonthDayNano)
1402            )
1403        });
1404        assert!(
1405            has_mdn,
1406            "expected at least one Interval(MonthDayNano) field in duration_uuid.avro"
1407        );
1408        let has_uuid_fixed = in_schema
1409            .fields()
1410            .iter()
1411            .any(|f| matches!(f.data_type(), DataType::FixedSizeBinary(16)));
1412        assert!(
1413            has_uuid_fixed,
1414            "expected at least one FixedSizeBinary(16) (uuid) field in duration_uuid.avro"
1415        );
1416        let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
1417        let input =
1418            arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
1419        // Write to an in‑memory OCF and read back
1420        let mut writer = AvroWriter::new(Vec::<u8>::new(), in_schema.as_ref().clone())?;
1421        writer.write(&input)?;
1422        writer.finish()?;
1423        let bytes = writer.into_inner();
1424        let rt_reader = ReaderBuilder::new()
1425            .build(Cursor::new(bytes))
1426            .expect("build round_trip reader");
1427        let rt_schema = rt_reader.schema();
1428        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1429        let round_trip =
1430            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
1431        assert_eq!(round_trip, input);
1432        Ok(())
1433    }
1434
1435    // Feature OFF: only values are asserted equal; schema may legitimately differ (uuid as fixed(16))
1436    #[test]
1437    #[cfg(not(feature = "canonical_extension_types"))]
1438    fn test_duration_and_uuid_ocf_without_extensions_round_trips_values() -> Result<(), AvroError> {
1439        use arrow::datatypes::{DataType, IntervalUnit};
1440        use std::io::BufReader;
1441
1442        // Read input Avro (duration + uuid)
1443        let in_file =
1444            File::open("test/data/duration_uuid.avro").expect("open test/data/duration_uuid.avro");
1445        let reader = ReaderBuilder::new()
1446            .build(BufReader::new(in_file))
1447            .expect("build reader for duration_uuid.avro");
1448        let in_schema = reader.schema();
1449
1450        // Sanity checks: has MonthDayNano and a FixedSizeBinary(16)
1451        assert!(
1452            in_schema.fields().iter().any(|f| {
1453                matches!(
1454                    f.data_type(),
1455                    DataType::Interval(IntervalUnit::MonthDayNano)
1456                )
1457            }),
1458            "expected at least one Interval(MonthDayNano) field"
1459        );
1460        assert!(
1461            in_schema
1462                .fields()
1463                .iter()
1464                .any(|f| matches!(f.data_type(), DataType::FixedSizeBinary(16))),
1465            "expected a FixedSizeBinary(16) field (uuid)"
1466        );
1467
1468        let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
1469        let input =
1470            arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
1471
1472        // Write to a temp OCF and read back
1473        let mut writer = AvroWriter::new(Vec::<u8>::new(), in_schema.as_ref().clone())?;
1474        writer.write(&input)?;
1475        writer.finish()?;
1476        let bytes = writer.into_inner();
1477        let rt_reader = ReaderBuilder::new()
1478            .build(Cursor::new(bytes))
1479            .expect("build round_trip reader");
1480        let rt_schema = rt_reader.schema();
1481        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1482        let round_trip =
1483            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
1484
1485        // 1) Values must round-trip for both columns
1486        assert_eq!(
1487            round_trip.column(0),
1488            input.column(0),
1489            "duration column values differ"
1490        );
1491        assert_eq!(round_trip.column(1), input.column(1), "uuid bytes differ");
1492
1493        // 2) Schema expectation without extensions:
1494        //    uuid is written as named fixed(16), so reader attaches avro.name
1495        let uuid_rt = rt_schema.field_with_name("uuid_field")?;
1496        assert_eq!(uuid_rt.data_type(), &DataType::FixedSizeBinary(16));
1497        assert_eq!(
1498            uuid_rt.metadata().get("logicalType").map(|s| s.as_str()),
1499            Some("uuid"),
1500            "expected `logicalType = \"uuid\"` on round-tripped field metadata"
1501        );
1502
1503        // 3) Duration remains Interval(MonthDayNano)
1504        let dur_rt = rt_schema.field_with_name("duration_field")?;
1505        assert!(matches!(
1506            dur_rt.data_type(),
1507            DataType::Interval(IntervalUnit::MonthDayNano)
1508        ));
1509
1510        Ok(())
1511    }
1512
1513    // This test reads the same 'nonnullable.impala.avro' used by the reader tests,
1514    // writes it back out with the writer (hitting Map encoding paths), then reads it
1515    // again and asserts exact Arrow equivalence.
1516    #[test]
1517    // TODO: avoid requiring snappy for this file
1518    #[cfg(feature = "snappy")]
1519    fn test_nonnullable_impala_roundtrip_writer() -> Result<(), AvroError> {
1520        // Load source Avro with Map fields
1521        let path = arrow_test_data("avro/nonnullable.impala.avro");
1522        let rdr_file = File::open(&path).expect("open avro/nonnullable.impala.avro");
1523        let reader = ReaderBuilder::new()
1524            .build(BufReader::new(rdr_file))
1525            .expect("build reader for nonnullable.impala.avro");
1526        // Collect all input batches and concatenate to a single RecordBatch
1527        let in_schema = reader.schema();
1528        // Sanity: ensure the file actually contains at least one Map field
1529        let has_map = in_schema
1530            .fields()
1531            .iter()
1532            .any(|f| matches!(f.data_type(), DataType::Map(_, _)));
1533        assert!(
1534            has_map,
1535            "expected at least one Map field in avro/nonnullable.impala.avro"
1536        );
1537
1538        let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
1539        let original =
1540            arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
1541        // Write out using the OCF writer into an in-memory Vec<u8>
1542        let buffer = Vec::<u8>::new();
1543        let mut writer = AvroWriter::new(buffer, in_schema.as_ref().clone())?;
1544        writer.write(&original)?;
1545        writer.finish()?;
1546        let out_bytes = writer.into_inner();
1547        // Read the produced bytes back with the Reader
1548        let rt_reader = ReaderBuilder::new()
1549            .build(Cursor::new(out_bytes))
1550            .expect("build reader for round-tripped in-memory OCF");
1551        let rt_schema = rt_reader.schema();
1552        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1553        let roundtrip =
1554            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip");
1555        // Exact value fidelity (schema + data)
1556        assert_eq!(
1557            roundtrip, original,
1558            "Round-trip Avro map data mismatch for nonnullable.impala.avro"
1559        );
1560        Ok(())
1561    }
1562
1563    #[test]
1564    // TODO: avoid requiring snappy for these files
1565    #[cfg(feature = "snappy")]
1566    fn test_roundtrip_decimals_via_writer() -> Result<(), AvroError> {
1567        // (file, resolve via ARROW_TEST_DATA?)
1568        let files: [(&str, bool); 8] = [
1569            ("avro/fixed_length_decimal.avro", true), // fixed-backed -> Decimal128(25,2)
1570            ("avro/fixed_length_decimal_legacy.avro", true), // legacy fixed[8] -> Decimal64(13,2)
1571            ("avro/int32_decimal.avro", true),        // bytes-backed -> Decimal32(4,2)
1572            ("avro/int64_decimal.avro", true),        // bytes-backed -> Decimal64(10,2)
1573            ("test/data/int256_decimal.avro", false), // bytes-backed -> Decimal256(76,2)
1574            ("test/data/fixed256_decimal.avro", false), // fixed[32]-backed -> Decimal256(76,10)
1575            ("test/data/fixed_length_decimal_legacy_32.avro", false), // legacy fixed[4] -> Decimal32(9,2)
1576            ("test/data/int128_decimal.avro", false), // bytes-backed -> Decimal128(38,2)
1577        ];
1578        for (rel, in_test_data_dir) in files {
1579            // Resolve path the same way as reader::test_decimal
1580            let path: String = if in_test_data_dir {
1581                arrow_test_data(rel)
1582            } else {
1583                PathBuf::from(env!("CARGO_MANIFEST_DIR"))
1584                    .join(rel)
1585                    .to_string_lossy()
1586                    .into_owned()
1587            };
1588            // Read original file into a single RecordBatch for comparison
1589            let f_in = File::open(&path).expect("open input avro");
1590            let rdr = ReaderBuilder::new().build(BufReader::new(f_in))?;
1591            let in_schema = rdr.schema();
1592            let in_batches = rdr.collect::<Result<Vec<_>, _>>()?;
1593            let original =
1594                arrow::compute::concat_batches(&in_schema, &in_batches).expect("concat input");
1595            // Write it out with the OCF writer (no special compression)
1596            let tmp = NamedTempFile::new().expect("create temp file");
1597            let out_path = tmp.into_temp_path();
1598            let out_file = File::create(&out_path).expect("create temp avro");
1599            let mut writer = AvroWriter::new(out_file, original.schema().as_ref().clone())?;
1600            writer.write(&original)?;
1601            writer.finish()?;
1602            // Read back the file we just wrote and compare equality (schema + data)
1603            let f_rt = File::open(&out_path).expect("open roundtrip avro");
1604            let rt_rdr = ReaderBuilder::new().build(BufReader::new(f_rt))?;
1605            let rt_schema = rt_rdr.schema();
1606            let rt_batches = rt_rdr.collect::<Result<Vec<_>, _>>()?;
1607            let roundtrip =
1608                arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat rt");
1609            assert_eq!(roundtrip, original, "decimal round-trip mismatch for {rel}");
1610        }
1611        Ok(())
1612    }
1613
1614    #[test]
1615    fn test_named_types_complex_roundtrip() -> Result<(), AvroError> {
1616        // 1. Read the new, more complex named references file.
1617        let path =
1618            PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test/data/named_types_complex.avro");
1619        let rdr_file = File::open(&path).expect("open avro/named_types_complex.avro");
1620
1621        let reader = ReaderBuilder::new()
1622            .build(BufReader::new(rdr_file))
1623            .expect("build reader for named_types_complex.avro");
1624
1625        // 2. Concatenate all batches to one RecordBatch.
1626        let in_schema = reader.schema();
1627        let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
1628        let original =
1629            arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
1630
1631        // 3. Sanity Checks: Validate that all named types were reused correctly.
1632        {
1633            let arrow_schema = original.schema();
1634
1635            // --- A. Validate 'User' record reuse ---
1636            let author_field = arrow_schema.field_with_name("author")?;
1637            let author_type = author_field.data_type();
1638            let editors_field = arrow_schema.field_with_name("editors")?;
1639            let editors_item_type = match editors_field.data_type() {
1640                DataType::List(item_field) => item_field.data_type(),
1641                other => panic!("Editors field should be a List, but was {:?}", other),
1642            };
1643            assert_eq!(
1644                author_type, editors_item_type,
1645                "The DataType for the 'author' struct and the 'editors' list items must be identical"
1646            );
1647
1648            // --- B. Validate 'PostStatus' enum reuse ---
1649            let status_field = arrow_schema.field_with_name("status")?;
1650            let status_type = status_field.data_type();
1651            assert!(
1652                matches!(status_type, DataType::Dictionary(_, _)),
1653                "Status field should be a Dictionary (Enum)"
1654            );
1655
1656            let prev_status_field = arrow_schema.field_with_name("previous_status")?;
1657            let prev_status_type = prev_status_field.data_type();
1658            assert_eq!(
1659                status_type, prev_status_type,
1660                "The DataType for 'status' and 'previous_status' enums must be identical"
1661            );
1662
1663            // --- C. Validate 'MD5' fixed reuse ---
1664            let content_hash_field = arrow_schema.field_with_name("content_hash")?;
1665            let content_hash_type = content_hash_field.data_type();
1666            assert!(
1667                matches!(content_hash_type, DataType::FixedSizeBinary(16)),
1668                "Content hash should be FixedSizeBinary(16)"
1669            );
1670
1671            let thumb_hash_field = arrow_schema.field_with_name("thumbnail_hash")?;
1672            let thumb_hash_type = thumb_hash_field.data_type();
1673            assert_eq!(
1674                content_hash_type, thumb_hash_type,
1675                "The DataType for 'content_hash' and 'thumbnail_hash' fixed types must be identical"
1676            );
1677        }
1678
1679        // 4. Write the data to an in-memory buffer.
1680        let buffer: Vec<u8> = Vec::new();
1681        let mut writer = AvroWriter::new(buffer, original.schema().as_ref().clone())?;
1682        writer.write(&original)?;
1683        writer.finish()?;
1684        let bytes = writer.into_inner();
1685
1686        // 5. Read the data back and compare for exact equality.
1687        let rt_reader = ReaderBuilder::new()
1688            .build(Cursor::new(bytes))
1689            .expect("build reader for round-trip");
1690        let rt_schema = rt_reader.schema();
1691        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1692        let roundtrip =
1693            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip");
1694
1695        assert_eq!(
1696            roundtrip, original,
1697            "Avro complex named types round-trip mismatch"
1698        );
1699
1700        Ok(())
1701    }
1702
1703    // Union Roundtrip Test Helpers
1704
1705    // Asserts that the `actual` schema is a semantically equivalent superset of the `expected` one.
1706    // This allows the `actual` schema to contain additional metadata keys
1707    // (`arrowUnionMode`, `arrowUnionTypeIds`, `avro.name`) that are added during an Arrow-to-Avro-to-Arrow
1708    // roundtrip, while ensuring no other information was lost or changed.
1709    fn assert_schema_is_semantically_equivalent(expected: &Schema, actual: &Schema) {
1710        // Compare top-level schema metadata using the same superset logic.
1711        assert_metadata_is_superset(expected.metadata(), actual.metadata(), "Schema");
1712
1713        // Compare fields.
1714        assert_eq!(
1715            expected.fields().len(),
1716            actual.fields().len(),
1717            "Schema must have the same number of fields"
1718        );
1719
1720        for (expected_field, actual_field) in expected.fields().iter().zip(actual.fields().iter()) {
1721            assert_field_is_semantically_equivalent(expected_field, actual_field);
1722        }
1723    }
1724
1725    fn assert_field_is_semantically_equivalent(expected: &Field, actual: &Field) {
1726        let context = format!("Field '{}'", expected.name());
1727
1728        assert_eq!(
1729            expected.name(),
1730            actual.name(),
1731            "{context}: names must match"
1732        );
1733        assert_eq!(
1734            expected.is_nullable(),
1735            actual.is_nullable(),
1736            "{context}: nullability must match"
1737        );
1738
1739        // Recursively check the data types.
1740        assert_datatype_is_semantically_equivalent(
1741            expected.data_type(),
1742            actual.data_type(),
1743            &context,
1744        );
1745
1746        // Check that metadata is a valid superset.
1747        assert_metadata_is_superset(expected.metadata(), actual.metadata(), &context);
1748    }
1749
1750    fn assert_datatype_is_semantically_equivalent(
1751        expected: &DataType,
1752        actual: &DataType,
1753        context: &str,
1754    ) {
1755        match (expected, actual) {
1756            (DataType::List(expected_field), DataType::List(actual_field))
1757            | (DataType::LargeList(expected_field), DataType::LargeList(actual_field))
1758            | (DataType::Map(expected_field, _), DataType::Map(actual_field, _)) => {
1759                assert_field_is_semantically_equivalent(expected_field, actual_field);
1760            }
1761            (DataType::Struct(expected_fields), DataType::Struct(actual_fields)) => {
1762                assert_eq!(
1763                    expected_fields.len(),
1764                    actual_fields.len(),
1765                    "{context}: struct must have same number of fields"
1766                );
1767                for (ef, af) in expected_fields.iter().zip(actual_fields.iter()) {
1768                    assert_field_is_semantically_equivalent(ef, af);
1769                }
1770            }
1771            (
1772                DataType::Union(expected_fields, expected_mode),
1773                DataType::Union(actual_fields, actual_mode),
1774            ) => {
1775                assert_eq!(
1776                    expected_mode, actual_mode,
1777                    "{context}: union mode must match"
1778                );
1779                assert_eq!(
1780                    expected_fields.len(),
1781                    actual_fields.len(),
1782                    "{context}: union must have same number of variants"
1783                );
1784                for ((exp_id, exp_field), (act_id, act_field)) in
1785                    expected_fields.iter().zip(actual_fields.iter())
1786                {
1787                    assert_eq!(exp_id, act_id, "{context}: union type ids must match");
1788                    assert_field_is_semantically_equivalent(exp_field, act_field);
1789                }
1790            }
1791            _ => {
1792                assert_eq!(expected, actual, "{context}: data types must be identical");
1793            }
1794        }
1795    }
1796
1797    fn assert_batch_data_is_identical(expected: &RecordBatch, actual: &RecordBatch) {
1798        assert_eq!(
1799            expected.num_columns(),
1800            actual.num_columns(),
1801            "RecordBatches must have the same number of columns"
1802        );
1803        assert_eq!(
1804            expected.num_rows(),
1805            actual.num_rows(),
1806            "RecordBatches must have the same number of rows"
1807        );
1808
1809        for i in 0..expected.num_columns() {
1810            let context = format!("Column {i}");
1811            let expected_col = expected.column(i);
1812            let actual_col = actual.column(i);
1813            assert_array_data_is_identical(expected_col, actual_col, &context);
1814        }
1815    }
1816
1817    /// Recursively asserts that the data content of two Arrays is identical.
1818    fn assert_array_data_is_identical(expected: &dyn Array, actual: &dyn Array, context: &str) {
1819        assert_eq!(
1820            expected.nulls(),
1821            actual.nulls(),
1822            "{context}: null buffers must match"
1823        );
1824        assert_eq!(
1825            expected.len(),
1826            actual.len(),
1827            "{context}: array lengths must match"
1828        );
1829
1830        match (expected.data_type(), actual.data_type()) {
1831            (DataType::Union(expected_fields, _), DataType::Union(..)) => {
1832                let expected_union = expected.as_any().downcast_ref::<UnionArray>().unwrap();
1833                let actual_union = actual.as_any().downcast_ref::<UnionArray>().unwrap();
1834
1835                // Compare the type_ids buffer (always the first buffer).
1836                assert_eq!(
1837                    &expected.to_data().buffers()[0],
1838                    &actual.to_data().buffers()[0],
1839                    "{context}: union type_ids buffer mismatch"
1840                );
1841
1842                // For dense unions, compare the value_offsets buffer (the second buffer).
1843                if expected.to_data().buffers().len() > 1 {
1844                    assert_eq!(
1845                        &expected.to_data().buffers()[1],
1846                        &actual.to_data().buffers()[1],
1847                        "{context}: union value_offsets buffer mismatch"
1848                    );
1849                }
1850
1851                // Recursively compare children based on the fields in the DataType.
1852                for (type_id, _) in expected_fields.iter() {
1853                    let child_context = format!("{context} -> child variant {type_id}");
1854                    assert_array_data_is_identical(
1855                        expected_union.child(type_id),
1856                        actual_union.child(type_id),
1857                        &child_context,
1858                    );
1859                }
1860            }
1861            (DataType::Struct(_), DataType::Struct(_)) => {
1862                let expected_struct = expected.as_any().downcast_ref::<StructArray>().unwrap();
1863                let actual_struct = actual.as_any().downcast_ref::<StructArray>().unwrap();
1864                for i in 0..expected_struct.num_columns() {
1865                    let child_context = format!("{context} -> struct child {i}");
1866                    assert_array_data_is_identical(
1867                        expected_struct.column(i),
1868                        actual_struct.column(i),
1869                        &child_context,
1870                    );
1871                }
1872            }
1873            // Fallback for primitive types and other types where buffer comparison is sufficient.
1874            _ => {
1875                assert_eq!(
1876                    expected.to_data().buffers(),
1877                    actual.to_data().buffers(),
1878                    "{context}: data buffers must match"
1879                );
1880            }
1881        }
1882    }
1883
1884    /// Checks that `actual_meta` contains all of `expected_meta`, and any additional
1885    /// keys in `actual_meta` are from a permitted set.
1886    fn assert_metadata_is_superset(
1887        expected_meta: &HashMap<String, String>,
1888        actual_meta: &HashMap<String, String>,
1889        context: &str,
1890    ) {
1891        let allowed_additions: HashSet<&str> =
1892            vec!["arrowUnionMode", "arrowUnionTypeIds", "avro.name"]
1893                .into_iter()
1894                .collect();
1895        for (key, expected_value) in expected_meta {
1896            match actual_meta.get(key) {
1897                Some(actual_value) => assert_eq!(
1898                    expected_value, actual_value,
1899                    "{context}: preserved metadata for key '{key}' must have the same value"
1900                ),
1901                None => panic!("{context}: metadata key '{key}' was lost during roundtrip"),
1902            }
1903        }
1904        for key in actual_meta.keys() {
1905            if !expected_meta.contains_key(key) && !allowed_additions.contains(key.as_str()) {
1906                panic!("{context}: unexpected metadata key '{key}' was added during roundtrip");
1907            }
1908        }
1909    }
1910
1911    #[test]
1912    fn test_union_roundtrip() -> Result<(), AvroError> {
1913        let file_path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
1914            .join("test/data/union_fields.avro")
1915            .to_string_lossy()
1916            .into_owned();
1917        let rdr_file = File::open(&file_path).expect("open avro/union_fields.avro");
1918        let reader = ReaderBuilder::new()
1919            .build(BufReader::new(rdr_file))
1920            .expect("build reader for union_fields.avro");
1921        let schema = reader.schema();
1922        let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
1923        let original =
1924            arrow::compute::concat_batches(&schema, &input_batches).expect("concat input");
1925        let mut writer = AvroWriter::new(Vec::<u8>::new(), original.schema().as_ref().clone())?;
1926        writer.write(&original)?;
1927        writer.finish()?;
1928        let bytes = writer.into_inner();
1929        let rt_reader = ReaderBuilder::new()
1930            .build(Cursor::new(bytes))
1931            .expect("build round_trip reader");
1932        let rt_schema = rt_reader.schema();
1933        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1934        let round_trip =
1935            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
1936
1937        // The nature of the crate is such that metadata gets appended during the roundtrip,
1938        // so we can't compare the schemas directly. Instead, we semantically compare the schemas and data.
1939        assert_schema_is_semantically_equivalent(&original.schema(), &round_trip.schema());
1940
1941        assert_batch_data_is_identical(&original, &round_trip);
1942        Ok(())
1943    }
1944
1945    #[test]
1946    fn test_enum_roundtrip_uses_reader_fixture() -> Result<(), AvroError> {
1947        // Read the known-good enum file (same as reader::test_simple)
1948        let path = arrow_test_data("avro/simple_enum.avro");
1949        let rdr_file = File::open(&path).expect("open avro/simple_enum.avro");
1950        let reader = ReaderBuilder::new()
1951            .build(BufReader::new(rdr_file))
1952            .expect("build reader for simple_enum.avro");
1953        // Concatenate all batches to one RecordBatch for a clean equality check
1954        let in_schema = reader.schema();
1955        let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
1956        let original =
1957            arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
1958        // Sanity: expect at least one Dictionary(Int32, Utf8) column (enum)
1959        let has_enum_dict = in_schema.fields().iter().any(|f| {
1960            matches!(
1961                f.data_type(),
1962                DataType::Dictionary(k, v) if **k == DataType::Int32 && **v == DataType::Utf8
1963            )
1964        });
1965        assert!(
1966            has_enum_dict,
1967            "Expected at least one enum-mapped Dictionary<Int32, Utf8> field"
1968        );
1969        // Write with OCF writer into memory using the reader-provided Arrow schema.
1970        // The writer will embed the Avro JSON from `avro.schema` metadata if present.
1971        let buffer: Vec<u8> = Vec::new();
1972        let mut writer = AvroWriter::new(buffer, in_schema.as_ref().clone())?;
1973        writer.write(&original)?;
1974        writer.finish()?;
1975        let bytes = writer.into_inner();
1976        // Read back and compare for exact equality (schema + data)
1977        let rt_reader = ReaderBuilder::new()
1978            .build(Cursor::new(bytes))
1979            .expect("reader for round-trip");
1980        let rt_schema = rt_reader.schema();
1981        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
1982        let roundtrip =
1983            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip");
1984        assert_eq!(roundtrip, original, "Avro enum round-trip mismatch");
1985        Ok(())
1986    }
1987
1988    #[test]
1989    fn test_builder_propagates_capacity_to_writer() -> Result<(), AvroError> {
1990        let cap = 64 * 1024;
1991        let buffer = Vec::<u8>::new();
1992        let mut writer = WriterBuilder::new(make_schema())
1993            .with_capacity(cap)
1994            .build::<_, AvroOcfFormat>(buffer)?;
1995        assert_eq!(writer.capacity, cap, "builder capacity not propagated");
1996        let batch = make_batch();
1997        writer.write(&batch)?;
1998        writer.finish()?;
1999        let out = writer.into_inner();
2000        assert_eq!(&out[..4], b"Obj\x01", "OCF magic missing/incorrect");
2001        Ok(())
2002    }
2003
2004    #[test]
2005    fn test_stream_writer_stores_capacity_direct_writes() -> Result<(), AvroError> {
2006        use arrow_array::{ArrayRef, Int32Array};
2007        use arrow_schema::{DataType, Field, Schema};
2008        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
2009        let batch = RecordBatch::try_new(
2010            Arc::new(schema.clone()),
2011            vec![Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef],
2012        )?;
2013        let cap = 8192;
2014        let mut writer = WriterBuilder::new(schema)
2015            .with_capacity(cap)
2016            .build::<_, AvroSoeFormat>(Vec::new())?;
2017        assert_eq!(writer.capacity, cap);
2018        writer.write(&batch)?;
2019        let _bytes = writer.into_inner();
2020        Ok(())
2021    }
2022
2023    #[cfg(feature = "avro_custom_types")]
2024    #[test]
2025    fn test_roundtrip_duration_logical_types_ocf() -> Result<(), AvroError> {
2026        let file_path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
2027            .join("test/data/duration_logical_types.avro")
2028            .to_string_lossy()
2029            .into_owned();
2030
2031        let in_file = File::open(&file_path)
2032            .unwrap_or_else(|_| panic!("Failed to open test file: {}", file_path));
2033
2034        let reader = ReaderBuilder::new()
2035            .build(BufReader::new(in_file))
2036            .expect("build reader for duration_logical_types.avro");
2037        let in_schema = reader.schema();
2038
2039        let expected_units: HashSet<TimeUnit> = [
2040            TimeUnit::Nanosecond,
2041            TimeUnit::Microsecond,
2042            TimeUnit::Millisecond,
2043            TimeUnit::Second,
2044        ]
2045        .into_iter()
2046        .collect();
2047
2048        let found_units: HashSet<TimeUnit> = in_schema
2049            .fields()
2050            .iter()
2051            .filter_map(|f| match f.data_type() {
2052                DataType::Duration(unit) => Some(*unit),
2053                _ => None,
2054            })
2055            .collect();
2056
2057        assert_eq!(
2058            found_units, expected_units,
2059            "Expected to find all four Duration TimeUnits in the schema from the initial read"
2060        );
2061
2062        let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
2063        let input =
2064            arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
2065
2066        let tmp = NamedTempFile::new().expect("create temp file");
2067        {
2068            let out_file = File::create(tmp.path()).expect("create temp avro");
2069            let mut writer = AvroWriter::new(out_file, in_schema.as_ref().clone())?;
2070            writer.write(&input)?;
2071            writer.finish()?;
2072        }
2073
2074        let rt_file = File::open(tmp.path()).expect("open round_trip avro");
2075        let rt_reader = ReaderBuilder::new()
2076            .build(BufReader::new(rt_file))
2077            .expect("build round_trip reader");
2078        let rt_schema = rt_reader.schema();
2079        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2080        let round_trip =
2081            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
2082
2083        assert_eq!(round_trip, input);
2084        Ok(())
2085    }
2086
2087    #[cfg(feature = "avro_custom_types")]
2088    #[test]
2089    fn test_run_end_encoded_roundtrip_writer() -> Result<(), AvroError> {
2090        let run_ends = Int32Array::from(vec![3, 5, 7, 8]);
2091        let run_values = Int32Array::from(vec![Some(1), Some(2), None, Some(3)]);
2092        let ree = RunArray::<Int32Type>::try_new(&run_ends, &run_values)?;
2093        let field = Field::new("x", ree.data_type().clone(), true);
2094        let schema = Schema::new(vec![field]);
2095        let batch = RecordBatch::try_new(
2096            Arc::new(schema.clone()),
2097            vec![Arc::new(ree.clone()) as ArrayRef],
2098        )?;
2099        let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
2100        writer.write(&batch)?;
2101        writer.finish()?;
2102        let bytes = writer.into_inner();
2103        let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
2104        let out_schema = reader.schema();
2105        let batches = reader.collect::<Result<Vec<_>, _>>()?;
2106        let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output");
2107        assert_eq!(out.num_columns(), 1);
2108        assert_eq!(out.num_rows(), 8);
2109        match out.schema().field(0).data_type() {
2110            DataType::RunEndEncoded(run_ends_field, values_field) => {
2111                assert_eq!(run_ends_field.name(), "run_ends");
2112                assert_eq!(run_ends_field.data_type(), &DataType::Int32);
2113                assert_eq!(values_field.name(), "values");
2114                assert_eq!(values_field.data_type(), &DataType::Int32);
2115                assert!(values_field.is_nullable());
2116                let got_ree = out
2117                    .column(0)
2118                    .as_any()
2119                    .downcast_ref::<RunArray<Int32Type>>()
2120                    .expect("RunArray<Int32Type>");
2121                assert_eq!(got_ree, &ree);
2122            }
2123            other => panic!(
2124                "Unexpected DataType for round-tripped RunEndEncoded column: {:?}",
2125                other
2126            ),
2127        }
2128        Ok(())
2129    }
2130
2131    #[cfg(feature = "avro_custom_types")]
2132    #[test]
2133    fn test_run_end_encoded_string_values_int16_run_ends_roundtrip_writer() -> Result<(), AvroError>
2134    {
2135        let run_ends = Int16Array::from(vec![2, 5, 7]); // end indices
2136        let run_values = StringArray::from(vec![Some("a"), None, Some("c")]);
2137        let ree = RunArray::<Int16Type>::try_new(&run_ends, &run_values)?;
2138        let field = Field::new("s", ree.data_type().clone(), true);
2139        let schema = Schema::new(vec![field]);
2140        let batch = RecordBatch::try_new(
2141            Arc::new(schema.clone()),
2142            vec![Arc::new(ree.clone()) as ArrayRef],
2143        )?;
2144        let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
2145        writer.write(&batch)?;
2146        writer.finish()?;
2147        let bytes = writer.into_inner();
2148        let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
2149        let out_schema = reader.schema();
2150        let batches = reader.collect::<Result<Vec<_>, _>>()?;
2151        let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output");
2152        assert_eq!(out.num_columns(), 1);
2153        assert_eq!(out.num_rows(), 7);
2154        match out.schema().field(0).data_type() {
2155            DataType::RunEndEncoded(run_ends_field, values_field) => {
2156                assert_eq!(run_ends_field.data_type(), &DataType::Int16);
2157                assert_eq!(values_field.data_type(), &DataType::Utf8);
2158                assert!(
2159                    values_field.is_nullable(),
2160                    "REE 'values' child should be nullable"
2161                );
2162                let got = out
2163                    .column(0)
2164                    .as_any()
2165                    .downcast_ref::<RunArray<Int16Type>>()
2166                    .expect("RunArray<Int16Type>");
2167                assert_eq!(got, &ree);
2168            }
2169            other => panic!("Unexpected DataType: {:?}", other),
2170        }
2171        Ok(())
2172    }
2173
2174    #[cfg(feature = "avro_custom_types")]
2175    #[test]
2176    fn test_run_end_encoded_int64_run_ends_numeric_values_roundtrip_writer() -> Result<(), AvroError>
2177    {
2178        let run_ends = Int64Array::from(vec![4_i64, 8_i64]);
2179        let run_values = Int32Array::from(vec![Some(999), Some(-5)]);
2180        let ree = RunArray::<Int64Type>::try_new(&run_ends, &run_values)?;
2181        let field = Field::new("y", ree.data_type().clone(), true);
2182        let schema = Schema::new(vec![field]);
2183        let batch = RecordBatch::try_new(
2184            Arc::new(schema.clone()),
2185            vec![Arc::new(ree.clone()) as ArrayRef],
2186        )?;
2187        let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
2188        writer.write(&batch)?;
2189        writer.finish()?;
2190        let bytes = writer.into_inner();
2191        let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
2192        let out_schema = reader.schema();
2193        let batches = reader.collect::<Result<Vec<_>, _>>()?;
2194        let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output");
2195        assert_eq!(out.num_columns(), 1);
2196        assert_eq!(out.num_rows(), 8);
2197        match out.schema().field(0).data_type() {
2198            DataType::RunEndEncoded(run_ends_field, values_field) => {
2199                assert_eq!(run_ends_field.data_type(), &DataType::Int64);
2200                assert_eq!(values_field.data_type(), &DataType::Int32);
2201                assert!(values_field.is_nullable());
2202                let got = out
2203                    .column(0)
2204                    .as_any()
2205                    .downcast_ref::<RunArray<Int64Type>>()
2206                    .expect("RunArray<Int64Type>");
2207                assert_eq!(got, &ree);
2208            }
2209            other => panic!("Unexpected DataType for REE column: {:?}", other),
2210        }
2211        Ok(())
2212    }
2213
2214    #[cfg(feature = "avro_custom_types")]
2215    #[test]
2216    fn test_run_end_encoded_sliced_roundtrip_writer() -> Result<(), AvroError> {
2217        let run_ends = Int32Array::from(vec![3, 5, 7, 8]);
2218        let run_values = Int32Array::from(vec![Some(1), Some(2), None, Some(3)]);
2219        let base = RunArray::<Int32Type>::try_new(&run_ends, &run_values)?;
2220        let offset = 1usize;
2221        let length = 6usize;
2222        let base_values = base
2223            .values()
2224            .as_any()
2225            .downcast_ref::<Int32Array>()
2226            .expect("REE values as Int32Array");
2227        let mut logical_window: Vec<Option<i32>> = Vec::with_capacity(length);
2228        for i in offset..offset + length {
2229            let phys = base.get_physical_index(i);
2230            let v = if base_values.is_null(phys) {
2231                None
2232            } else {
2233                Some(base_values.value(phys))
2234            };
2235            logical_window.push(v);
2236        }
2237
2238        fn compress_run_ends_i32(vals: &[Option<i32>]) -> (Int32Array, Int32Array) {
2239            if vals.is_empty() {
2240                return (Int32Array::new_null(0), Int32Array::new_null(0));
2241            }
2242            let mut run_ends_out: Vec<i32> = Vec::new();
2243            let mut run_vals_out: Vec<Option<i32>> = Vec::new();
2244            let mut cur = vals[0];
2245            let mut len = 1i32;
2246            for v in &vals[1..] {
2247                if *v == cur {
2248                    len += 1;
2249                } else {
2250                    let last_end = run_ends_out.last().copied().unwrap_or(0);
2251                    run_ends_out.push(last_end + len);
2252                    run_vals_out.push(cur);
2253                    cur = *v;
2254                    len = 1;
2255                }
2256            }
2257            let last_end = run_ends_out.last().copied().unwrap_or(0);
2258            run_ends_out.push(last_end + len);
2259            run_vals_out.push(cur);
2260            (
2261                Int32Array::from(run_ends_out),
2262                Int32Array::from(run_vals_out),
2263            )
2264        }
2265        let (owned_run_ends, owned_run_values) = compress_run_ends_i32(&logical_window);
2266        let owned_slice = RunArray::<Int32Type>::try_new(&owned_run_ends, &owned_run_values)?;
2267        let field = Field::new("x", owned_slice.data_type().clone(), true);
2268        let schema = Schema::new(vec![field]);
2269        let batch = RecordBatch::try_new(
2270            Arc::new(schema.clone()),
2271            vec![Arc::new(owned_slice.clone()) as ArrayRef],
2272        )?;
2273        let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
2274        writer.write(&batch)?;
2275        writer.finish()?;
2276        let bytes = writer.into_inner();
2277        let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
2278        let out_schema = reader.schema();
2279        let batches = reader.collect::<Result<Vec<_>, _>>()?;
2280        let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output");
2281        assert_eq!(out.num_columns(), 1);
2282        assert_eq!(out.num_rows(), length);
2283        match out.schema().field(0).data_type() {
2284            DataType::RunEndEncoded(run_ends_field, values_field) => {
2285                assert_eq!(run_ends_field.data_type(), &DataType::Int32);
2286                assert_eq!(values_field.data_type(), &DataType::Int32);
2287                assert!(values_field.is_nullable());
2288                let got = out
2289                    .column(0)
2290                    .as_any()
2291                    .downcast_ref::<RunArray<Int32Type>>()
2292                    .expect("RunArray<Int32Type>");
2293                fn expand_ree_to_int32(a: &RunArray<Int32Type>) -> Int32Array {
2294                    let vals = a
2295                        .values()
2296                        .as_any()
2297                        .downcast_ref::<Int32Array>()
2298                        .expect("REE values as Int32Array");
2299                    let mut out: Vec<Option<i32>> = Vec::with_capacity(a.len());
2300                    for i in 0..a.len() {
2301                        let phys = a.get_physical_index(i);
2302                        out.push(if vals.is_null(phys) {
2303                            None
2304                        } else {
2305                            Some(vals.value(phys))
2306                        });
2307                    }
2308                    Int32Array::from(out)
2309                }
2310                let got_logical = expand_ree_to_int32(got);
2311                let expected_logical = Int32Array::from(logical_window);
2312                assert_eq!(
2313                    got_logical, expected_logical,
2314                    "Logical values differ after REE slice round-trip"
2315                );
2316            }
2317            other => panic!("Unexpected DataType for REE column: {:?}", other),
2318        }
2319        Ok(())
2320    }
2321
2322    #[cfg(not(feature = "avro_custom_types"))]
2323    #[test]
2324    fn test_run_end_encoded_roundtrip_writer_feature_off() -> Result<(), AvroError> {
2325        use arrow_schema::{DataType, Field, Schema};
2326        let run_ends = arrow_array::Int32Array::from(vec![3, 5, 7, 8]);
2327        let run_values = arrow_array::Int32Array::from(vec![Some(1), Some(2), None, Some(3)]);
2328        let ree = arrow_array::RunArray::<arrow_array::types::Int32Type>::try_new(
2329            &run_ends,
2330            &run_values,
2331        )?;
2332        let field = Field::new("x", ree.data_type().clone(), true);
2333        let schema = Schema::new(vec![field]);
2334        let batch =
2335            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(ree) as ArrayRef])?;
2336        let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
2337        writer.write(&batch)?;
2338        writer.finish()?;
2339        let bytes = writer.into_inner();
2340        let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
2341        let out_schema = reader.schema();
2342        let batches = reader.collect::<Result<Vec<_>, _>>()?;
2343        let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output");
2344        assert_eq!(out.num_columns(), 1);
2345        assert_eq!(out.num_rows(), 8);
2346        assert_eq!(out.schema().field(0).data_type(), &DataType::Int32);
2347        let got = out
2348            .column(0)
2349            .as_any()
2350            .downcast_ref::<Int32Array>()
2351            .expect("Int32Array");
2352        let expected = Int32Array::from(vec![
2353            Some(1),
2354            Some(1),
2355            Some(1),
2356            Some(2),
2357            Some(2),
2358            None,
2359            None,
2360            Some(3),
2361        ]);
2362        assert_eq!(got, &expected);
2363        Ok(())
2364    }
2365
2366    #[cfg(not(feature = "avro_custom_types"))]
2367    #[test]
2368    fn test_run_end_encoded_string_values_int16_run_ends_roundtrip_writer_feature_off()
2369    -> Result<(), AvroError> {
2370        use arrow_schema::{DataType, Field, Schema};
2371        let run_ends = arrow_array::Int16Array::from(vec![2, 5, 7]);
2372        let run_values = arrow_array::StringArray::from(vec![Some("a"), None, Some("c")]);
2373        let ree = arrow_array::RunArray::<arrow_array::types::Int16Type>::try_new(
2374            &run_ends,
2375            &run_values,
2376        )?;
2377        let field = Field::new("s", ree.data_type().clone(), true);
2378        let schema = Schema::new(vec![field]);
2379        let batch =
2380            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(ree) as ArrayRef])?;
2381        let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
2382        writer.write(&batch)?;
2383        writer.finish()?;
2384        let bytes = writer.into_inner();
2385        let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
2386        let out_schema = reader.schema();
2387        let batches = reader.collect::<Result<Vec<_>, _>>()?;
2388        let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output");
2389        assert_eq!(out.num_columns(), 1);
2390        assert_eq!(out.num_rows(), 7);
2391        assert_eq!(out.schema().field(0).data_type(), &DataType::Utf8);
2392        let got = out
2393            .column(0)
2394            .as_any()
2395            .downcast_ref::<arrow_array::StringArray>()
2396            .expect("StringArray");
2397        let expected = arrow_array::StringArray::from(vec![
2398            Some("a"),
2399            Some("a"),
2400            None,
2401            None,
2402            None,
2403            Some("c"),
2404            Some("c"),
2405        ]);
2406        assert_eq!(got, &expected);
2407        Ok(())
2408    }
2409
2410    #[cfg(not(feature = "avro_custom_types"))]
2411    #[test]
2412    fn test_run_end_encoded_int64_run_ends_numeric_values_roundtrip_writer_feature_off()
2413    -> Result<(), AvroError> {
2414        use arrow_schema::{DataType, Field, Schema};
2415        let run_ends = arrow_array::Int64Array::from(vec![4_i64, 8_i64]);
2416        let run_values = Int32Array::from(vec![Some(999), Some(-5)]);
2417        let ree = arrow_array::RunArray::<arrow_array::types::Int64Type>::try_new(
2418            &run_ends,
2419            &run_values,
2420        )?;
2421        let field = Field::new("y", ree.data_type().clone(), true);
2422        let schema = Schema::new(vec![field]);
2423        let batch =
2424            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(ree) as ArrayRef])?;
2425        let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
2426        writer.write(&batch)?;
2427        writer.finish()?;
2428        let bytes = writer.into_inner();
2429        let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
2430        let out_schema = reader.schema();
2431        let batches = reader.collect::<Result<Vec<_>, _>>()?;
2432        let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output");
2433        assert_eq!(out.num_columns(), 1);
2434        assert_eq!(out.num_rows(), 8);
2435        assert_eq!(out.schema().field(0).data_type(), &DataType::Int32);
2436        let got = out
2437            .column(0)
2438            .as_any()
2439            .downcast_ref::<Int32Array>()
2440            .expect("Int32Array");
2441        let expected = Int32Array::from(vec![
2442            Some(999),
2443            Some(999),
2444            Some(999),
2445            Some(999),
2446            Some(-5),
2447            Some(-5),
2448            Some(-5),
2449            Some(-5),
2450        ]);
2451        assert_eq!(got, &expected);
2452        Ok(())
2453    }
2454
2455    #[cfg(not(feature = "avro_custom_types"))]
2456    #[test]
2457    fn test_run_end_encoded_sliced_roundtrip_writer_feature_off() -> Result<(), AvroError> {
2458        use arrow_schema::{DataType, Field, Schema};
2459        let run_ends = Int32Array::from(vec![2, 4, 6]);
2460        let run_values = Int32Array::from(vec![Some(1), Some(2), None]);
2461        let ree = arrow_array::RunArray::<arrow_array::types::Int32Type>::try_new(
2462            &run_ends,
2463            &run_values,
2464        )?;
2465        let field = Field::new("x", ree.data_type().clone(), true);
2466        let schema = Schema::new(vec![field]);
2467        let batch =
2468            RecordBatch::try_new(Arc::new(schema.clone()), vec![Arc::new(ree) as ArrayRef])?;
2469        let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
2470        writer.write(&batch)?;
2471        writer.finish()?;
2472        let bytes = writer.into_inner();
2473        let reader = ReaderBuilder::new().build(Cursor::new(bytes))?;
2474        let out_schema = reader.schema();
2475        let batches = reader.collect::<Result<Vec<_>, _>>()?;
2476        let out = arrow::compute::concat_batches(&out_schema, &batches).expect("concat output");
2477        assert_eq!(out.num_columns(), 1);
2478        assert_eq!(out.num_rows(), 6);
2479        assert_eq!(out.schema().field(0).data_type(), &DataType::Int32);
2480        let got = out
2481            .column(0)
2482            .as_any()
2483            .downcast_ref::<Int32Array>()
2484            .expect("Int32Array");
2485        let expected = Int32Array::from(vec![Some(1), Some(1), Some(2), Some(2), None, None]);
2486        assert_eq!(got, &expected);
2487        Ok(())
2488    }
2489
2490    #[test]
2491    // TODO: avoid requiring snappy for this file
2492    #[cfg(feature = "snappy")]
2493    fn test_nullable_impala_roundtrip() -> Result<(), AvroError> {
2494        let path = arrow_test_data("avro/nullable.impala.avro");
2495        let rdr_file = File::open(&path).expect("open avro/nullable.impala.avro");
2496        let reader = ReaderBuilder::new()
2497            .build(BufReader::new(rdr_file))
2498            .expect("build reader for nullable.impala.avro");
2499        let in_schema = reader.schema();
2500        assert!(
2501            in_schema.fields().iter().any(|f| f.is_nullable()),
2502            "expected at least one nullable field in avro/nullable.impala.avro"
2503        );
2504        let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
2505        let original =
2506            arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
2507        let buffer: Vec<u8> = Vec::new();
2508        let mut writer = AvroWriter::new(buffer, in_schema.as_ref().clone())?;
2509        writer.write(&original)?;
2510        writer.finish()?;
2511        let out_bytes = writer.into_inner();
2512        let rt_reader = ReaderBuilder::new()
2513            .build(Cursor::new(out_bytes))
2514            .expect("build reader for round-tripped in-memory OCF");
2515        let rt_schema = rt_reader.schema();
2516        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2517        let roundtrip =
2518            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip");
2519        assert_eq!(
2520            roundtrip, original,
2521            "Round-trip Avro data mismatch for nullable.impala.avro"
2522        );
2523        Ok(())
2524    }
2525
2526    #[test]
2527    #[cfg(feature = "snappy")]
2528    fn test_datapage_v2_roundtrip() -> Result<(), AvroError> {
2529        let path = arrow_test_data("avro/datapage_v2.snappy.avro");
2530        let rdr_file = File::open(&path).expect("open avro/datapage_v2.snappy.avro");
2531        let reader = ReaderBuilder::new()
2532            .build(BufReader::new(rdr_file))
2533            .expect("build reader for datapage_v2.snappy.avro");
2534        let in_schema = reader.schema();
2535        let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
2536        let original =
2537            arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
2538        let mut writer = AvroWriter::new(Vec::<u8>::new(), in_schema.as_ref().clone())?;
2539        writer.write(&original)?;
2540        writer.finish()?;
2541        let bytes = writer.into_inner();
2542        let rt_reader = ReaderBuilder::new()
2543            .build(Cursor::new(bytes))
2544            .expect("build round-trip reader");
2545        let rt_schema = rt_reader.schema();
2546        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2547        let round_trip =
2548            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
2549        assert_eq!(
2550            round_trip, original,
2551            "Round-trip batch mismatch for datapage_v2.snappy.avro"
2552        );
2553        Ok(())
2554    }
2555
2556    #[test]
2557    #[cfg(feature = "snappy")]
2558    fn test_single_nan_roundtrip() -> Result<(), AvroError> {
2559        let path = arrow_test_data("avro/single_nan.avro");
2560        let in_file = File::open(&path).expect("open avro/single_nan.avro");
2561        let reader = ReaderBuilder::new()
2562            .build(BufReader::new(in_file))
2563            .expect("build reader for single_nan.avro");
2564        let in_schema = reader.schema();
2565        let in_batches = reader.collect::<Result<Vec<_>, _>>()?;
2566        let original =
2567            arrow::compute::concat_batches(&in_schema, &in_batches).expect("concat input");
2568        let mut writer = AvroWriter::new(Vec::<u8>::new(), original.schema().as_ref().clone())?;
2569        writer.write(&original)?;
2570        writer.finish()?;
2571        let bytes = writer.into_inner();
2572        let rt_reader = ReaderBuilder::new()
2573            .build(Cursor::new(bytes))
2574            .expect("build round_trip reader");
2575        let rt_schema = rt_reader.schema();
2576        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2577        let round_trip =
2578            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
2579        assert_eq!(
2580            round_trip, original,
2581            "Round-trip batch mismatch for avro/single_nan.avro"
2582        );
2583        Ok(())
2584    }
2585    #[test]
2586    // TODO: avoid requiring snappy for this file
2587    #[cfg(feature = "snappy")]
2588    fn test_dict_pages_offset_zero_roundtrip() -> Result<(), AvroError> {
2589        let path = arrow_test_data("avro/dict-page-offset-zero.avro");
2590        let rdr_file = File::open(&path).expect("open avro/dict-page-offset-zero.avro");
2591        let reader = ReaderBuilder::new()
2592            .build(BufReader::new(rdr_file))
2593            .expect("build reader for dict-page-offset-zero.avro");
2594        let in_schema = reader.schema();
2595        let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
2596        let original =
2597            arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
2598        let buffer: Vec<u8> = Vec::new();
2599        let mut writer = AvroWriter::new(buffer, original.schema().as_ref().clone())?;
2600        writer.write(&original)?;
2601        writer.finish()?;
2602        let bytes = writer.into_inner();
2603        let rt_reader = ReaderBuilder::new()
2604            .build(Cursor::new(bytes))
2605            .expect("build reader for round-trip");
2606        let rt_schema = rt_reader.schema();
2607        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2608        let roundtrip =
2609            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip");
2610        assert_eq!(
2611            roundtrip, original,
2612            "Round-trip batch mismatch for avro/dict-page-offset-zero.avro"
2613        );
2614        Ok(())
2615    }
2616
2617    #[test]
2618    #[cfg(feature = "snappy")]
2619    fn test_repeated_no_annotation_roundtrip() -> Result<(), AvroError> {
2620        let path = arrow_test_data("avro/repeated_no_annotation.avro");
2621        let in_file = File::open(&path).expect("open avro/repeated_no_annotation.avro");
2622        let reader = ReaderBuilder::new()
2623            .build(BufReader::new(in_file))
2624            .expect("build reader for repeated_no_annotation.avro");
2625        let in_schema = reader.schema();
2626        let in_batches = reader.collect::<Result<Vec<_>, _>>()?;
2627        let original =
2628            arrow::compute::concat_batches(&in_schema, &in_batches).expect("concat input");
2629        let mut writer = AvroWriter::new(Vec::<u8>::new(), original.schema().as_ref().clone())?;
2630        writer.write(&original)?;
2631        writer.finish()?;
2632        let bytes = writer.into_inner();
2633        let rt_reader = ReaderBuilder::new()
2634            .build(Cursor::new(bytes))
2635            .expect("build reader for round-trip buffer");
2636        let rt_schema = rt_reader.schema();
2637        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2638        let round_trip =
2639            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round-trip");
2640        assert_eq!(
2641            round_trip, original,
2642            "Round-trip batch mismatch for avro/repeated_no_annotation.avro"
2643        );
2644        Ok(())
2645    }
2646
2647    #[test]
2648    fn test_nested_record_type_reuse_roundtrip() -> Result<(), AvroError> {
2649        let path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
2650            .join("test/data/nested_record_reuse.avro")
2651            .to_string_lossy()
2652            .into_owned();
2653        let in_file = File::open(&path).expect("open avro/nested_record_reuse.avro");
2654        let reader = ReaderBuilder::new()
2655            .build(BufReader::new(in_file))
2656            .expect("build reader for nested_record_reuse.avro");
2657        let in_schema = reader.schema();
2658        let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
2659        let input =
2660            arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
2661        let mut writer = AvroWriter::new(Vec::<u8>::new(), in_schema.as_ref().clone())?;
2662        writer.write(&input)?;
2663        writer.finish()?;
2664        let bytes = writer.into_inner();
2665        let rt_reader = ReaderBuilder::new()
2666            .build(Cursor::new(bytes))
2667            .expect("build round_trip reader");
2668        let rt_schema = rt_reader.schema();
2669        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2670        let round_trip =
2671            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
2672        assert_eq!(
2673            round_trip, input,
2674            "Round-trip batch mismatch for nested_record_reuse.avro"
2675        );
2676        Ok(())
2677    }
2678
2679    #[test]
2680    fn test_enum_type_reuse_roundtrip() -> Result<(), AvroError> {
2681        let path =
2682            std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("test/data/enum_reuse.avro");
2683        let rdr_file = std::fs::File::open(&path).expect("open test/data/enum_reuse.avro");
2684        let reader = ReaderBuilder::new()
2685            .build(std::io::BufReader::new(rdr_file))
2686            .expect("build reader for enum_reuse.avro");
2687        let in_schema = reader.schema();
2688        let input_batches = reader.collect::<Result<Vec<_>, _>>()?;
2689        let original =
2690            arrow::compute::concat_batches(&in_schema, &input_batches).expect("concat input");
2691        let mut writer = AvroWriter::new(Vec::<u8>::new(), original.schema().as_ref().clone())?;
2692        writer.write(&original)?;
2693        writer.finish()?;
2694        let bytes = writer.into_inner();
2695        let rt_reader = ReaderBuilder::new()
2696            .build(std::io::Cursor::new(bytes))
2697            .expect("build round_trip reader");
2698        let rt_schema = rt_reader.schema();
2699        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2700        let round_trip =
2701            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat round_trip");
2702        assert_eq!(
2703            round_trip, original,
2704            "Avro enum type reuse round-trip mismatch"
2705        );
2706        Ok(())
2707    }
2708
2709    #[test]
2710    fn comprehensive_e2e_test_roundtrip() -> Result<(), AvroError> {
2711        let path = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
2712            .join("test/data/comprehensive_e2e.avro");
2713        let rdr_file = File::open(&path).expect("open test/data/comprehensive_e2e.avro");
2714        let reader = ReaderBuilder::new()
2715            .build(BufReader::new(rdr_file))
2716            .expect("build reader for comprehensive_e2e.avro");
2717        let in_schema = reader.schema();
2718        let in_batches = reader.collect::<Result<Vec<_>, _>>()?;
2719        let original =
2720            arrow::compute::concat_batches(&in_schema, &in_batches).expect("concat input");
2721        let sink: Vec<u8> = Vec::new();
2722        let mut writer = AvroWriter::new(sink, original.schema().as_ref().clone())?;
2723        writer.write(&original)?;
2724        writer.finish()?;
2725        let bytes = writer.into_inner();
2726        let rt_reader = ReaderBuilder::new()
2727            .build(Cursor::new(bytes))
2728            .expect("build round-trip reader");
2729        let rt_schema = rt_reader.schema();
2730        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2731        let roundtrip =
2732            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip");
2733        assert_eq!(
2734            roundtrip, original,
2735            "Round-trip batch mismatch for comprehensive_e2e.avro"
2736        );
2737        Ok(())
2738    }
2739
2740    #[test]
2741    fn test_roundtrip_new_time_encoders_writer() -> Result<(), AvroError> {
2742        let schema = Schema::new(vec![
2743            Field::new("d32", DataType::Date32, false),
2744            Field::new("t32_ms", DataType::Time32(TimeUnit::Millisecond), false),
2745            Field::new("t64_us", DataType::Time64(TimeUnit::Microsecond), false),
2746            Field::new(
2747                "ts_ms",
2748                DataType::Timestamp(TimeUnit::Millisecond, None),
2749                false,
2750            ),
2751            Field::new(
2752                "ts_us",
2753                DataType::Timestamp(TimeUnit::Microsecond, None),
2754                false,
2755            ),
2756            Field::new(
2757                "ts_ns",
2758                DataType::Timestamp(TimeUnit::Nanosecond, None),
2759                false,
2760            ),
2761        ]);
2762        let d32 = Date32Array::from(vec![0, 1, -1]);
2763        let t32_ms: PrimitiveArray<Time32MillisecondType> =
2764            vec![0_i32, 12_345_i32, 86_399_999_i32].into();
2765        let t64_us: PrimitiveArray<Time64MicrosecondType> =
2766            vec![0_i64, 1_234_567_i64, 86_399_999_999_i64].into();
2767        let ts_ms: PrimitiveArray<TimestampMillisecondType> =
2768            vec![0_i64, -1_i64, 1_700_000_000_000_i64].into();
2769        let ts_us: PrimitiveArray<TimestampMicrosecondType> = vec![0_i64, 1_i64, -1_i64].into();
2770        let ts_ns: PrimitiveArray<TimestampNanosecondType> = vec![0_i64, 1_i64, -1_i64].into();
2771        let batch = RecordBatch::try_new(
2772            Arc::new(schema.clone()),
2773            vec![
2774                Arc::new(d32) as ArrayRef,
2775                Arc::new(t32_ms) as ArrayRef,
2776                Arc::new(t64_us) as ArrayRef,
2777                Arc::new(ts_ms) as ArrayRef,
2778                Arc::new(ts_us) as ArrayRef,
2779                Arc::new(ts_ns) as ArrayRef,
2780            ],
2781        )?;
2782        let mut writer = AvroWriter::new(Vec::<u8>::new(), schema.clone())?;
2783        writer.write(&batch)?;
2784        writer.finish()?;
2785        let bytes = writer.into_inner();
2786        let rt_reader = ReaderBuilder::new()
2787            .build(std::io::Cursor::new(bytes))
2788            .expect("build reader for round-trip of new time encoders");
2789        let rt_schema = rt_reader.schema();
2790        let rt_batches = rt_reader.collect::<Result<Vec<_>, _>>()?;
2791        let roundtrip =
2792            arrow::compute::concat_batches(&rt_schema, &rt_batches).expect("concat roundtrip");
2793        assert_eq!(roundtrip, batch);
2794        Ok(())
2795    }
2796
2797    fn make_encoder_schema() -> Schema {
2798        Schema::new(vec![
2799            Field::new("a", DataType::Int32, false),
2800            Field::new("b", DataType::Int32, false),
2801        ])
2802    }
2803
2804    fn make_encoder_batch(schema: &Schema) -> RecordBatch {
2805        let a = Int32Array::from(vec![1, 2, 3]);
2806        let b = Int32Array::from(vec![10, 20, 30]);
2807        RecordBatch::try_new(
2808            Arc::new(schema.clone()),
2809            vec![Arc::new(a) as ArrayRef, Arc::new(b) as ArrayRef],
2810        )
2811        .expect("failed to build test RecordBatch")
2812    }
2813
2814    fn make_real_avro_schema_and_batch() -> Result<(Schema, RecordBatch, AvroSchema), AvroError> {
2815        let avro_json = r#"
2816        {
2817          "type": "record",
2818          "name": "User",
2819          "fields": [
2820            { "name": "id",     "type": "long" },
2821            { "name": "name",   "type": "string" },
2822            { "name": "active", "type": "boolean" },
2823            { "name": "tags",   "type": { "type": "array", "items": "int" } },
2824            { "name": "opt",    "type": ["null", "string"], "default": null }
2825          ]
2826        }"#;
2827        let avro_schema = AvroSchema::new(avro_json.to_string());
2828        let mut md = HashMap::new();
2829        md.insert(
2830            SCHEMA_METADATA_KEY.to_string(),
2831            avro_schema.json_string.clone(),
2832        );
2833        let item_field = Arc::new(Field::new(
2834            Field::LIST_FIELD_DEFAULT_NAME,
2835            DataType::Int32,
2836            false,
2837        ));
2838        let schema = Schema::new_with_metadata(
2839            vec![
2840                Field::new("id", DataType::Int64, false),
2841                Field::new("name", DataType::Utf8, false),
2842                Field::new("active", DataType::Boolean, false),
2843                Field::new("tags", DataType::List(item_field.clone()), false),
2844                Field::new("opt", DataType::Utf8, true),
2845            ],
2846            md,
2847        );
2848        let id = Int64Array::from(vec![1, 2, 3]);
2849        let name = StringArray::from(vec!["alice", "bob", "carol"]);
2850        let active = BooleanArray::from(vec![true, false, true]);
2851        let mut tags_builder = ListBuilder::new(Int32Builder::new()).with_field(item_field);
2852        tags_builder.values().append_value(1);
2853        tags_builder.values().append_value(2);
2854        tags_builder.append(true);
2855        tags_builder.append(true);
2856        tags_builder.values().append_value(3);
2857        tags_builder.append(true);
2858        let tags = tags_builder.finish();
2859        let opt = StringArray::from(vec![Some("x"), None, Some("z")]);
2860        let batch = RecordBatch::try_new(
2861            Arc::new(schema.clone()),
2862            vec![
2863                Arc::new(id) as ArrayRef,
2864                Arc::new(name) as ArrayRef,
2865                Arc::new(active) as ArrayRef,
2866                Arc::new(tags) as ArrayRef,
2867                Arc::new(opt) as ArrayRef,
2868            ],
2869        )?;
2870        Ok((schema, batch, avro_schema))
2871    }
2872
2873    #[test]
2874    fn test_row_writer_matches_stream_writer_soe() -> Result<(), AvroError> {
2875        let schema = make_encoder_schema();
2876        let batch = make_encoder_batch(&schema);
2877        let mut stream = AvroStreamWriter::new(Vec::<u8>::new(), schema.clone())?;
2878        stream.write(&batch)?;
2879        stream.finish()?;
2880        let stream_bytes = stream.into_inner();
2881        let mut row_writer = WriterBuilder::new(schema).build_encoder::<AvroSoeFormat>()?;
2882        row_writer.encode(&batch)?;
2883        let rows = row_writer.flush();
2884        let row_bytes: Vec<u8> = rows.bytes().to_vec();
2885        assert_eq!(stream_bytes, row_bytes);
2886        Ok(())
2887    }
2888
2889    #[test]
2890    fn test_row_writer_flush_clears_buffer() -> Result<(), AvroError> {
2891        let schema = make_encoder_schema();
2892        let batch = make_encoder_batch(&schema);
2893        let mut row_writer = WriterBuilder::new(schema).build_encoder::<AvroSoeFormat>()?;
2894        row_writer.encode(&batch)?;
2895        assert_eq!(row_writer.buffered_len(), batch.num_rows());
2896        let out1 = row_writer.flush();
2897        assert_eq!(out1.len(), batch.num_rows());
2898        assert_eq!(row_writer.buffered_len(), 0);
2899        let out2 = row_writer.flush();
2900        assert_eq!(out2.len(), 0);
2901        Ok(())
2902    }
2903
2904    #[test]
2905    fn test_row_writer_roundtrip_decoder_soe_real_avro_data() -> Result<(), AvroError> {
2906        let (schema, batch, avro_schema) = make_real_avro_schema_and_batch()?;
2907        let mut store = SchemaStore::new();
2908        store.register(avro_schema.clone())?;
2909        let mut row_writer = WriterBuilder::new(schema).build_encoder::<AvroSoeFormat>()?;
2910        row_writer.encode(&batch)?;
2911        let rows = row_writer.flush();
2912        let mut decoder = ReaderBuilder::new()
2913            .with_writer_schema_store(store)
2914            .with_batch_size(1024)
2915            .build_decoder()?;
2916        for row in rows.iter() {
2917            let consumed = decoder.decode(row.as_ref())?;
2918            assert_eq!(
2919                consumed,
2920                row.len(),
2921                "decoder should consume the full row frame"
2922            );
2923        }
2924        let out = decoder.flush()?.expect("decoded batch");
2925        let expected = pretty_format_batches(std::slice::from_ref(&batch))?.to_string();
2926        let actual = pretty_format_batches(&[out])?.to_string();
2927        assert_eq!(expected, actual);
2928        Ok(())
2929    }
2930
2931    #[test]
2932    fn test_row_writer_roundtrip_decoder_soe_streaming_chunks() -> Result<(), AvroError> {
2933        let (schema, batch, avro_schema) = make_real_avro_schema_and_batch()?;
2934        let mut store = SchemaStore::new();
2935        store.register(avro_schema.clone())?;
2936        let mut row_writer = WriterBuilder::new(schema).build_encoder::<AvroSoeFormat>()?;
2937        row_writer.encode(&batch)?;
2938        let rows = row_writer.flush();
2939        // Build a contiguous stream and frame boundaries (prefix sums) from EncodedRows.
2940        let mut stream: Vec<u8> = Vec::new();
2941        let mut boundaries: Vec<usize> = Vec::with_capacity(rows.len() + 1);
2942        boundaries.push(0usize);
2943        for row in rows.iter() {
2944            stream.extend_from_slice(row.as_ref());
2945            boundaries.push(stream.len());
2946        }
2947        let mut decoder = ReaderBuilder::new()
2948            .with_writer_schema_store(store)
2949            .with_batch_size(1024)
2950            .build_decoder()?;
2951        let mut buffered = BytesMut::new();
2952        let chunk_rows = [1usize, 2, 3, 1, 4, 2];
2953        let mut row_idx = 0usize;
2954        let mut i = 0usize;
2955        let n_rows = rows.len();
2956        while row_idx < n_rows {
2957            let take = chunk_rows[i % chunk_rows.len()];
2958            i += 1;
2959            let end_row = (row_idx + take).min(n_rows);
2960            let byte_start = boundaries[row_idx];
2961            let byte_end = boundaries[end_row];
2962            buffered.extend_from_slice(&stream[byte_start..byte_end]);
2963            loop {
2964                let consumed = decoder.decode(&buffered)?;
2965                if consumed == 0 {
2966                    break;
2967                }
2968                let _ = buffered.split_to(consumed);
2969            }
2970            assert!(
2971                buffered.is_empty(),
2972                "expected decoder to consume the entire frame-aligned chunk"
2973            );
2974            row_idx = end_row;
2975        }
2976        let out = decoder.flush()?.expect("decoded batch");
2977        let expected = pretty_format_batches(std::slice::from_ref(&batch))?.to_string();
2978        let actual = pretty_format_batches(&[out])?.to_string();
2979        assert_eq!(expected, actual);
2980        Ok(())
2981    }
2982
2983    #[test]
2984    fn test_row_writer_roundtrip_decoder_confluent_wire_format_id() -> Result<(), AvroError> {
2985        let (schema, batch, avro_schema) = make_real_avro_schema_and_batch()?;
2986        let schema_id: u32 = 42;
2987        let mut store = SchemaStore::new_with_type(FingerprintAlgorithm::Id);
2988        store.set(Fingerprint::Id(schema_id), avro_schema.clone())?;
2989        let mut row_writer = WriterBuilder::new(schema)
2990            .with_fingerprint_strategy(FingerprintStrategy::Id(schema_id))
2991            .build_encoder::<AvroSoeFormat>()?;
2992        row_writer.encode(&batch)?;
2993        let rows = row_writer.flush();
2994        let mut decoder = ReaderBuilder::new()
2995            .with_writer_schema_store(store)
2996            .with_batch_size(1024)
2997            .build_decoder()?;
2998        for row in rows.iter() {
2999            let consumed = decoder.decode(row.as_ref())?;
3000            assert_eq!(consumed, row.len());
3001        }
3002        let out = decoder.flush()?.expect("decoded batch");
3003        let expected = pretty_format_batches(std::slice::from_ref(&batch))?.to_string();
3004        let actual = pretty_format_batches(&[out])?.to_string();
3005        assert_eq!(expected, actual);
3006        Ok(())
3007    }
3008    #[test]
3009    fn test_encoder_encode_batches_flush_and_encoded_rows_methods_with_avro_binary_format()
3010    -> Result<(), AvroError> {
3011        use crate::writer::format::AvroBinaryFormat;
3012        use arrow_array::{ArrayRef, Int32Array, RecordBatch};
3013        use arrow_schema::{DataType, Field, Schema};
3014        use std::sync::Arc;
3015        let schema = Schema::new(vec![
3016            Field::new("a", DataType::Int32, false),
3017            Field::new("b", DataType::Int32, false),
3018        ]);
3019        let schema_ref = Arc::new(schema.clone());
3020        let batch1 = RecordBatch::try_new(
3021            schema_ref.clone(),
3022            vec![
3023                Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef,
3024                Arc::new(Int32Array::from(vec![10, 20, 30])) as ArrayRef,
3025            ],
3026        )?;
3027        let batch2 = RecordBatch::try_new(
3028            schema_ref,
3029            vec![
3030                Arc::new(Int32Array::from(vec![4, 5])) as ArrayRef,
3031                Arc::new(Int32Array::from(vec![40, 50])) as ArrayRef,
3032            ],
3033        )?;
3034        let mut encoder = WriterBuilder::new(schema).build_encoder::<AvroBinaryFormat>()?;
3035        let empty = Encoder::flush(&mut encoder);
3036        assert_eq!(EncodedRows::len(&empty), 0);
3037        assert!(EncodedRows::is_empty(&empty));
3038        assert_eq!(EncodedRows::bytes(&empty).as_ref(), &[] as &[u8]);
3039        assert_eq!(EncodedRows::offsets(&empty), &[0usize]);
3040        assert_eq!(EncodedRows::iter(&empty).count(), 0);
3041        let empty_vecs: Vec<Vec<u8>> = empty.iter().map(|b| b.to_vec()).collect();
3042        assert!(empty_vecs.is_empty());
3043        let batches = vec![batch1, batch2];
3044        Encoder::encode_batches(&mut encoder, &batches)?;
3045        assert_eq!(encoder.buffered_len(), 5);
3046        let rows = Encoder::flush(&mut encoder);
3047        assert_eq!(
3048            encoder.buffered_len(),
3049            0,
3050            "Encoder::flush should reset the internal offsets"
3051        );
3052        assert_eq!(EncodedRows::len(&rows), 5);
3053        assert!(!EncodedRows::is_empty(&rows));
3054        let expected_offsets: &[usize] = &[0, 2, 4, 6, 8, 10];
3055        assert_eq!(EncodedRows::offsets(&rows), expected_offsets);
3056        let expected_rows: Vec<Vec<u8>> = vec![
3057            vec![2, 20],
3058            vec![4, 40],
3059            vec![6, 60],
3060            vec![8, 80],
3061            vec![10, 100],
3062        ];
3063        let expected_stream: Vec<u8> = expected_rows.concat();
3064        assert_eq!(
3065            EncodedRows::bytes(&rows).as_ref(),
3066            expected_stream.as_slice()
3067        );
3068        for (i, expected) in expected_rows.iter().enumerate() {
3069            assert_eq!(EncodedRows::row(&rows, i)?.as_ref(), expected.as_slice());
3070        }
3071        let iter_rows: Vec<Vec<u8>> = EncodedRows::iter(&rows).map(|b| b.to_vec()).collect();
3072        assert_eq!(iter_rows, expected_rows);
3073        let recreated = EncodedRows::new(
3074            EncodedRows::bytes(&rows).clone(),
3075            EncodedRows::offsets(&rows).to_vec(),
3076        );
3077        assert_eq!(EncodedRows::len(&recreated), EncodedRows::len(&rows));
3078        assert_eq!(EncodedRows::bytes(&recreated), EncodedRows::bytes(&rows));
3079        assert_eq!(
3080            EncodedRows::offsets(&recreated),
3081            EncodedRows::offsets(&rows)
3082        );
3083        let rec_vecs: Vec<Vec<u8>> = recreated.iter().map(|b| b.to_vec()).collect();
3084        assert_eq!(rec_vecs, iter_rows);
3085        let empty_again = Encoder::flush(&mut encoder);
3086        assert!(EncodedRows::is_empty(&empty_again));
3087        Ok(())
3088    }
3089
3090    #[test]
3091    fn test_writer_builder_build_rejects_avro_binary_format() {
3092        use crate::writer::format::AvroBinaryFormat;
3093        use arrow_schema::{DataType, Field, Schema};
3094        let schema = Schema::new(vec![Field::new("a", DataType::Int32, false)]);
3095        let err = WriterBuilder::new(schema)
3096            .build::<_, AvroBinaryFormat>(Vec::<u8>::new())
3097            .unwrap_err();
3098        match err {
3099            AvroError::InvalidArgument(msg) => assert_eq!(
3100                msg,
3101                "AvroBinaryFormat is only supported with Encoder, use build_encoder instead"
3102            ),
3103            other => panic!("expected InvalidArgumentError, got {:?}", other),
3104        }
3105    }
3106    #[test]
3107    fn test_row_encoder_avro_binary_format_roundtrip_decoder_with_soe_framing()
3108    -> Result<(), AvroError> {
3109        use crate::writer::format::AvroBinaryFormat;
3110        let (schema, batch, avro_schema) = make_real_avro_schema_and_batch()?;
3111        let batches: Vec<RecordBatch> = vec![batch.clone(), batch.slice(1, 2)];
3112        let expected = arrow::compute::concat_batches(&batch.schema(), &batches)?;
3113        let mut binary_encoder =
3114            WriterBuilder::new(schema.clone()).build_encoder::<AvroBinaryFormat>()?;
3115        binary_encoder.encode_batches(&batches)?;
3116        let binary_rows = binary_encoder.flush();
3117        assert_eq!(
3118            binary_rows.len(),
3119            expected.num_rows(),
3120            "binary encoder row count mismatch"
3121        );
3122        let mut soe_encoder = WriterBuilder::new(schema).build_encoder::<AvroSoeFormat>()?;
3123        soe_encoder.encode_batches(&batches)?;
3124        let soe_rows = soe_encoder.flush();
3125        assert_eq!(
3126            soe_rows.len(),
3127            binary_rows.len(),
3128            "SOE vs binary row count mismatch"
3129        );
3130        let mut store = SchemaStore::new(); // Rabin by default
3131        let fp = store.register(avro_schema)?;
3132        let fp_le_bytes = match fp {
3133            Fingerprint::Rabin(v) => v.to_le_bytes(),
3134            other => panic!("expected Rabin fingerprint from SchemaStore::new(), got {other:?}"),
3135        };
3136        const SOE_MAGIC: [u8; 2] = [0xC3, 0x01];
3137        const SOE_PREFIX_LEN: usize = 2 + 8;
3138        for i in 0..binary_rows.len() {
3139            let body = binary_rows.row(i)?;
3140            let soe = soe_rows.row(i)?;
3141            assert!(
3142                soe.len() >= SOE_PREFIX_LEN,
3143                "expected SOE row to include prefix"
3144            );
3145            assert_eq!(&soe.as_ref()[..2], &SOE_MAGIC);
3146            assert_eq!(&soe.as_ref()[2..SOE_PREFIX_LEN], &fp_le_bytes);
3147            assert_eq!(
3148                &soe.as_ref()[SOE_PREFIX_LEN..],
3149                body.as_ref(),
3150                "SOE body bytes differ from AvroBinaryFormat body bytes (row {i})"
3151            );
3152        }
3153        let mut decoder = ReaderBuilder::new()
3154            .with_writer_schema_store(store)
3155            .with_batch_size(1024)
3156            .build_decoder()?;
3157        for body in binary_rows.iter() {
3158            let mut framed = Vec::with_capacity(SOE_PREFIX_LEN + body.len());
3159            framed.extend_from_slice(&SOE_MAGIC);
3160            framed.extend_from_slice(&fp_le_bytes);
3161            framed.extend_from_slice(body.as_ref());
3162            let consumed = decoder.decode(&framed)?;
3163            assert_eq!(
3164                consumed,
3165                framed.len(),
3166                "decoder should consume the full SOE-framed message"
3167            );
3168        }
3169        let out = decoder.flush()?.expect("expected a decoded RecordBatch");
3170        let expected_str = pretty_format_batches(&[expected])?.to_string();
3171        let actual_str = pretty_format_batches(&[out])?.to_string();
3172        assert_eq!(expected_str, actual_str);
3173        Ok(())
3174    }
3175
3176    #[test]
3177    fn test_row_encoder_avro_binary_format_roundtrip_decoder_streaming_chunks()
3178    -> Result<(), AvroError> {
3179        use crate::writer::format::AvroBinaryFormat;
3180        let (schema, batch, avro_schema) = make_real_avro_schema_and_batch()?;
3181        let mut encoder = WriterBuilder::new(schema).build_encoder::<AvroBinaryFormat>()?;
3182        encoder.encode(&batch)?;
3183        let rows = encoder.flush();
3184        let mut store = SchemaStore::new();
3185        let fp = store.register(avro_schema)?;
3186        let fp_le_bytes = match fp {
3187            Fingerprint::Rabin(v) => v.to_le_bytes(),
3188            other => panic!("expected Rabin fingerprint from SchemaStore::new(), got {other:?}"),
3189        };
3190        const SOE_MAGIC: [u8; 2] = [0xC3, 0x01];
3191        const SOE_PREFIX_LEN: usize = 2 + 8;
3192        let mut stream: Vec<u8> = Vec::new();
3193        for body in rows.iter() {
3194            let msg_len: u32 = (SOE_PREFIX_LEN + body.len())
3195                .try_into()
3196                .expect("message length must fit in u32");
3197            stream.extend_from_slice(&msg_len.to_le_bytes());
3198            stream.extend_from_slice(&SOE_MAGIC);
3199            stream.extend_from_slice(&fp_le_bytes);
3200            stream.extend_from_slice(body.as_ref());
3201        }
3202        let mut decoder = ReaderBuilder::new()
3203            .with_writer_schema_store(store)
3204            .with_batch_size(1024)
3205            .build_decoder()?;
3206        let chunk_sizes = [1usize, 2, 3, 5, 8, 13, 21, 34];
3207        let mut pos = 0usize;
3208        let mut i = 0usize;
3209        let mut buffered = BytesMut::new();
3210        let mut decoded_frames = 0usize;
3211        while pos < stream.len() {
3212            let take = chunk_sizes[i % chunk_sizes.len()];
3213            i += 1;
3214            let end = (pos + take).min(stream.len());
3215            buffered.extend_from_slice(&stream[pos..end]);
3216            pos = end;
3217            loop {
3218                if buffered.len() < 4 {
3219                    break;
3220                }
3221                let msg_len =
3222                    u32::from_le_bytes([buffered[0], buffered[1], buffered[2], buffered[3]])
3223                        as usize;
3224                if buffered.len() < 4 + msg_len {
3225                    break;
3226                }
3227                let frame = buffered.split_to(4 + msg_len);
3228                let payload = &frame[4..];
3229                let consumed = decoder.decode(payload)?;
3230                assert_eq!(
3231                    consumed,
3232                    payload.len(),
3233                    "decoder should consume the full SOE-framed message"
3234                );
3235
3236                decoded_frames += 1;
3237            }
3238        }
3239        assert!(
3240            buffered.is_empty(),
3241            "expected transport framer to consume all bytes; leftover = {}",
3242            buffered.len()
3243        );
3244        assert_eq!(
3245            decoded_frames,
3246            rows.len(),
3247            "expected to decode exactly one frame per encoded row"
3248        );
3249        let out = decoder.flush()?.expect("expected decoded RecordBatch");
3250        let expected_str = pretty_format_batches(std::slice::from_ref(&batch))?.to_string();
3251        let actual_str = pretty_format_batches(&[out])?.to_string();
3252        assert_eq!(expected_str, actual_str);
3253        Ok(())
3254    }
3255}