arrow_avro/writer/
encoder.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Avro Encoder for Arrow types.
19
20use arrow_array::cast::AsArray;
21use arrow_array::types::{
22    ArrowPrimitiveType, Float32Type, Float64Type, Int32Type, Int64Type, TimestampMicrosecondType,
23};
24use arrow_array::OffsetSizeTrait;
25use arrow_array::{Array, GenericBinaryArray, PrimitiveArray, RecordBatch};
26use arrow_buffer::NullBuffer;
27use arrow_schema::{ArrowError, DataType, FieldRef, TimeUnit};
28use std::io::Write;
29
30/// Behavior knobs for the Avro encoder.
31///
32/// When `impala_mode` is `true`, optional/nullable values are encoded
33/// as Avro unions with **null second** (`[T, "null"]`). When `false`
34/// (default), we use **null first** (`["null", T]`).
35#[derive(Debug, Clone, Copy, Default)]
36pub struct EncoderOptions {
37    impala_mode: bool, // Will be fully implemented in a follow-up PR
38}
39
40/// Encode a single Avro-`long` using ZigZag + variable length, buffered.
41///
42/// Spec: <https://avro.apache.org/docs/1.11.1/specification/#binary-encoding>
43#[inline]
44pub fn write_long<W: Write + ?Sized>(writer: &mut W, value: i64) -> Result<(), ArrowError> {
45    let mut zz = ((value << 1) ^ (value >> 63)) as u64;
46    // At most 10 bytes for 64-bit varint
47    let mut buf = [0u8; 10];
48    let mut i = 0;
49    while (zz & !0x7F) != 0 {
50        buf[i] = ((zz & 0x7F) as u8) | 0x80;
51        i += 1;
52        zz >>= 7;
53    }
54    buf[i] = (zz & 0x7F) as u8;
55    i += 1;
56    writer
57        .write_all(&buf[..i])
58        .map_err(|e| ArrowError::IoError(format!("write long: {e}"), e))
59}
60
61#[inline]
62fn write_int<W: Write + ?Sized>(writer: &mut W, value: i32) -> Result<(), ArrowError> {
63    write_long(writer, value as i64)
64}
65
66#[inline]
67fn write_len_prefixed<W: Write + ?Sized>(writer: &mut W, bytes: &[u8]) -> Result<(), ArrowError> {
68    write_long(writer, bytes.len() as i64)?;
69    writer
70        .write_all(bytes)
71        .map_err(|e| ArrowError::IoError(format!("write bytes: {e}"), e))
72}
73
74#[inline]
75fn write_bool<W: Write + ?Sized>(writer: &mut W, v: bool) -> Result<(), ArrowError> {
76    writer
77        .write_all(&[if v { 1 } else { 0 }])
78        .map_err(|e| ArrowError::IoError(format!("write bool: {e}"), e))
79}
80
81/// Write the union branch index for an optional field.
82///
83/// Branch index is 0-based per Avro unions:
84/// - Null-first (default): null => 0, value => 1
85/// - Null-second (Impala): value => 0, null => 1
86#[inline]
87fn write_optional_branch<W: Write + ?Sized>(
88    writer: &mut W,
89    is_null: bool,
90    impala_mode: bool,
91) -> Result<(), ArrowError> {
92    let branch = if impala_mode == is_null { 1 } else { 0 };
93    write_int(writer, branch)
94}
95
96/// Encode a `RecordBatch` in Avro binary format using **default options**.
97pub fn encode_record_batch<W: Write>(batch: &RecordBatch, out: &mut W) -> Result<(), ArrowError> {
98    encode_record_batch_with_options(batch, out, &EncoderOptions::default())
99}
100
101/// Encode a `RecordBatch` with explicit `EncoderOptions`.
102pub fn encode_record_batch_with_options<W: Write>(
103    batch: &RecordBatch,
104    out: &mut W,
105    opts: &EncoderOptions,
106) -> Result<(), ArrowError> {
107    let mut encoders = batch
108        .schema()
109        .fields()
110        .iter()
111        .zip(batch.columns())
112        .map(|(field, array)| Ok((field.is_nullable(), make_encoder(array.as_ref())?)))
113        .collect::<Result<Vec<_>, ArrowError>>()?;
114    (0..batch.num_rows()).try_for_each(|row| {
115        encoders.iter_mut().try_for_each(|(is_nullable, enc)| {
116            if *is_nullable {
117                let is_null = enc.is_null(row);
118                write_optional_branch(out, is_null, opts.impala_mode)?;
119                if is_null {
120                    return Ok(());
121                }
122            }
123            enc.encode(row, out)
124        })
125    })
126}
127
128/// Enum for static dispatch of concrete encoders.
129enum Encoder<'a> {
130    Boolean(BooleanEncoder<'a>),
131    Int(IntEncoder<'a, Int32Type>),
132    Long(LongEncoder<'a, Int64Type>),
133    Timestamp(LongEncoder<'a, TimestampMicrosecondType>),
134    Float32(F32Encoder<'a>),
135    Float64(F64Encoder<'a>),
136    Binary(BinaryEncoder<'a, i32>),
137}
138
139impl<'a> Encoder<'a> {
140    /// Encode the value at `idx`.
141    #[inline]
142    fn encode<W: Write + ?Sized>(&mut self, idx: usize, out: &mut W) -> Result<(), ArrowError> {
143        match self {
144            Encoder::Boolean(e) => e.encode(idx, out),
145            Encoder::Int(e) => e.encode(idx, out),
146            Encoder::Long(e) => e.encode(idx, out),
147            Encoder::Timestamp(e) => e.encode(idx, out),
148            Encoder::Float32(e) => e.encode(idx, out),
149            Encoder::Float64(e) => e.encode(idx, out),
150            Encoder::Binary(e) => e.encode(idx, out),
151        }
152    }
153}
154
155/// An encoder + a null buffer for nullable fields.
156pub struct NullableEncoder<'a> {
157    encoder: Encoder<'a>,
158    nulls: Option<NullBuffer>,
159}
160
161impl<'a> NullableEncoder<'a> {
162    /// Create a new nullable encoder, wrapping a non-null encoder and a null buffer.
163    #[inline]
164    fn new(encoder: Encoder<'a>, nulls: Option<NullBuffer>) -> Self {
165        Self { encoder, nulls }
166    }
167
168    /// Encode the value at `idx`, assuming it's not-null.
169    #[inline]
170    fn encode<W: Write + ?Sized>(&mut self, idx: usize, out: &mut W) -> Result<(), ArrowError> {
171        self.encoder.encode(idx, out)
172    }
173
174    /// Check if the value at `idx` is null.
175    #[inline]
176    fn is_null(&self, idx: usize) -> bool {
177        self.nulls.as_ref().is_some_and(|nulls| nulls.is_null(idx))
178    }
179}
180
181/// Creates an Avro encoder for the given `array`.
182pub fn make_encoder<'a>(array: &'a dyn Array) -> Result<NullableEncoder<'a>, ArrowError> {
183    let nulls = array.nulls().cloned();
184    let enc = match array.data_type() {
185        DataType::Boolean => {
186            let arr = array.as_boolean();
187            NullableEncoder::new(Encoder::Boolean(BooleanEncoder(arr)), nulls)
188        }
189        DataType::Int32 => {
190            let arr = array.as_primitive::<Int32Type>();
191            NullableEncoder::new(Encoder::Int(IntEncoder(arr)), nulls)
192        }
193        DataType::Int64 => {
194            let arr = array.as_primitive::<Int64Type>();
195            NullableEncoder::new(Encoder::Long(LongEncoder(arr)), nulls)
196        }
197        DataType::Float32 => {
198            let arr = array.as_primitive::<Float32Type>();
199            NullableEncoder::new(Encoder::Float32(F32Encoder(arr)), nulls)
200        }
201        DataType::Float64 => {
202            let arr = array.as_primitive::<Float64Type>();
203            NullableEncoder::new(Encoder::Float64(F64Encoder(arr)), nulls)
204        }
205        DataType::Binary => {
206            let arr = array.as_binary::<i32>();
207            NullableEncoder::new(Encoder::Binary(BinaryEncoder(arr)), nulls)
208        }
209        DataType::Timestamp(TimeUnit::Microsecond, _) => {
210            let arr = array.as_primitive::<TimestampMicrosecondType>();
211            NullableEncoder::new(Encoder::Timestamp(LongEncoder(arr)), nulls)
212        }
213        other => {
214            return Err(ArrowError::NotYetImplemented(format!(
215                "Unsupported data type for Avro encoding in slim build: {other:?}"
216            )))
217        }
218    };
219    Ok(enc)
220}
221
222struct BooleanEncoder<'a>(&'a arrow_array::BooleanArray);
223impl BooleanEncoder<'_> {
224    #[inline]
225    fn encode<W: Write + ?Sized>(&mut self, idx: usize, out: &mut W) -> Result<(), ArrowError> {
226        write_bool(out, self.0.value(idx))
227    }
228}
229
230/// Generic Avro `int` encoder for primitive arrays with `i32` native values.
231struct IntEncoder<'a, P: ArrowPrimitiveType<Native = i32>>(&'a PrimitiveArray<P>);
232impl<'a, P: ArrowPrimitiveType<Native = i32>> IntEncoder<'a, P> {
233    #[inline]
234    fn encode<W: Write + ?Sized>(&mut self, idx: usize, out: &mut W) -> Result<(), ArrowError> {
235        write_int(out, self.0.value(idx))
236    }
237}
238
239/// Generic Avro `long` encoder for primitive arrays with `i64` native values.
240struct LongEncoder<'a, P: ArrowPrimitiveType<Native = i64>>(&'a PrimitiveArray<P>);
241impl<'a, P: ArrowPrimitiveType<Native = i64>> LongEncoder<'a, P> {
242    #[inline]
243    fn encode<W: Write + ?Sized>(&mut self, idx: usize, out: &mut W) -> Result<(), ArrowError> {
244        write_long(out, self.0.value(idx))
245    }
246}
247
248/// Unified binary encoder generic over offset size (i32/i64).
249struct BinaryEncoder<'a, O: OffsetSizeTrait>(&'a GenericBinaryArray<O>);
250impl<'a, O: OffsetSizeTrait> BinaryEncoder<'a, O> {
251    #[inline]
252    fn encode<W: Write + ?Sized>(&mut self, idx: usize, out: &mut W) -> Result<(), ArrowError> {
253        write_len_prefixed(out, self.0.value(idx))
254    }
255}
256
257struct F32Encoder<'a>(&'a arrow_array::Float32Array);
258impl F32Encoder<'_> {
259    #[inline]
260    fn encode<W: Write + ?Sized>(&mut self, idx: usize, out: &mut W) -> Result<(), ArrowError> {
261        // Avro float: 4 bytes, IEEE-754 little-endian
262        let bits = self.0.value(idx).to_bits();
263        out.write_all(&bits.to_le_bytes())
264            .map_err(|e| ArrowError::IoError(format!("write f32: {e}"), e))
265    }
266}
267
268struct F64Encoder<'a>(&'a arrow_array::Float64Array);
269impl F64Encoder<'_> {
270    #[inline]
271    fn encode<W: Write + ?Sized>(&mut self, idx: usize, out: &mut W) -> Result<(), ArrowError> {
272        // Avro double: 8 bytes, IEEE-754 little-endian
273        let bits = self.0.value(idx).to_bits();
274        out.write_all(&bits.to_le_bytes())
275            .map_err(|e| ArrowError::IoError(format!("write f64: {e}"), e))
276    }
277}