1use arrow_array::cast::AsArray;
21use arrow_array::types::{
22 ArrowPrimitiveType, Float32Type, Float64Type, Int32Type, Int64Type, TimestampMicrosecondType,
23};
24use arrow_array::OffsetSizeTrait;
25use arrow_array::{Array, GenericBinaryArray, PrimitiveArray, RecordBatch};
26use arrow_buffer::NullBuffer;
27use arrow_schema::{ArrowError, DataType, FieldRef, TimeUnit};
28use std::io::Write;
29
30#[derive(Debug, Clone, Copy, Default)]
36pub struct EncoderOptions {
37 impala_mode: bool, }
39
40#[inline]
44pub fn write_long<W: Write + ?Sized>(writer: &mut W, value: i64) -> Result<(), ArrowError> {
45 let mut zz = ((value << 1) ^ (value >> 63)) as u64;
46 let mut buf = [0u8; 10];
48 let mut i = 0;
49 while (zz & !0x7F) != 0 {
50 buf[i] = ((zz & 0x7F) as u8) | 0x80;
51 i += 1;
52 zz >>= 7;
53 }
54 buf[i] = (zz & 0x7F) as u8;
55 i += 1;
56 writer
57 .write_all(&buf[..i])
58 .map_err(|e| ArrowError::IoError(format!("write long: {e}"), e))
59}
60
61#[inline]
62fn write_int<W: Write + ?Sized>(writer: &mut W, value: i32) -> Result<(), ArrowError> {
63 write_long(writer, value as i64)
64}
65
66#[inline]
67fn write_len_prefixed<W: Write + ?Sized>(writer: &mut W, bytes: &[u8]) -> Result<(), ArrowError> {
68 write_long(writer, bytes.len() as i64)?;
69 writer
70 .write_all(bytes)
71 .map_err(|e| ArrowError::IoError(format!("write bytes: {e}"), e))
72}
73
74#[inline]
75fn write_bool<W: Write + ?Sized>(writer: &mut W, v: bool) -> Result<(), ArrowError> {
76 writer
77 .write_all(&[if v { 1 } else { 0 }])
78 .map_err(|e| ArrowError::IoError(format!("write bool: {e}"), e))
79}
80
81#[inline]
87fn write_optional_branch<W: Write + ?Sized>(
88 writer: &mut W,
89 is_null: bool,
90 impala_mode: bool,
91) -> Result<(), ArrowError> {
92 let branch = if impala_mode == is_null { 1 } else { 0 };
93 write_int(writer, branch)
94}
95
96pub fn encode_record_batch<W: Write>(batch: &RecordBatch, out: &mut W) -> Result<(), ArrowError> {
98 encode_record_batch_with_options(batch, out, &EncoderOptions::default())
99}
100
101pub fn encode_record_batch_with_options<W: Write>(
103 batch: &RecordBatch,
104 out: &mut W,
105 opts: &EncoderOptions,
106) -> Result<(), ArrowError> {
107 let mut encoders = batch
108 .schema()
109 .fields()
110 .iter()
111 .zip(batch.columns())
112 .map(|(field, array)| Ok((field.is_nullable(), make_encoder(array.as_ref())?)))
113 .collect::<Result<Vec<_>, ArrowError>>()?;
114 (0..batch.num_rows()).try_for_each(|row| {
115 encoders.iter_mut().try_for_each(|(is_nullable, enc)| {
116 if *is_nullable {
117 let is_null = enc.is_null(row);
118 write_optional_branch(out, is_null, opts.impala_mode)?;
119 if is_null {
120 return Ok(());
121 }
122 }
123 enc.encode(row, out)
124 })
125 })
126}
127
128enum Encoder<'a> {
130 Boolean(BooleanEncoder<'a>),
131 Int(IntEncoder<'a, Int32Type>),
132 Long(LongEncoder<'a, Int64Type>),
133 Timestamp(LongEncoder<'a, TimestampMicrosecondType>),
134 Float32(F32Encoder<'a>),
135 Float64(F64Encoder<'a>),
136 Binary(BinaryEncoder<'a, i32>),
137}
138
139impl<'a> Encoder<'a> {
140 #[inline]
142 fn encode<W: Write + ?Sized>(&mut self, idx: usize, out: &mut W) -> Result<(), ArrowError> {
143 match self {
144 Encoder::Boolean(e) => e.encode(idx, out),
145 Encoder::Int(e) => e.encode(idx, out),
146 Encoder::Long(e) => e.encode(idx, out),
147 Encoder::Timestamp(e) => e.encode(idx, out),
148 Encoder::Float32(e) => e.encode(idx, out),
149 Encoder::Float64(e) => e.encode(idx, out),
150 Encoder::Binary(e) => e.encode(idx, out),
151 }
152 }
153}
154
155pub struct NullableEncoder<'a> {
157 encoder: Encoder<'a>,
158 nulls: Option<NullBuffer>,
159}
160
161impl<'a> NullableEncoder<'a> {
162 #[inline]
164 fn new(encoder: Encoder<'a>, nulls: Option<NullBuffer>) -> Self {
165 Self { encoder, nulls }
166 }
167
168 #[inline]
170 fn encode<W: Write + ?Sized>(&mut self, idx: usize, out: &mut W) -> Result<(), ArrowError> {
171 self.encoder.encode(idx, out)
172 }
173
174 #[inline]
176 fn is_null(&self, idx: usize) -> bool {
177 self.nulls.as_ref().is_some_and(|nulls| nulls.is_null(idx))
178 }
179}
180
181pub fn make_encoder<'a>(array: &'a dyn Array) -> Result<NullableEncoder<'a>, ArrowError> {
183 let nulls = array.nulls().cloned();
184 let enc = match array.data_type() {
185 DataType::Boolean => {
186 let arr = array.as_boolean();
187 NullableEncoder::new(Encoder::Boolean(BooleanEncoder(arr)), nulls)
188 }
189 DataType::Int32 => {
190 let arr = array.as_primitive::<Int32Type>();
191 NullableEncoder::new(Encoder::Int(IntEncoder(arr)), nulls)
192 }
193 DataType::Int64 => {
194 let arr = array.as_primitive::<Int64Type>();
195 NullableEncoder::new(Encoder::Long(LongEncoder(arr)), nulls)
196 }
197 DataType::Float32 => {
198 let arr = array.as_primitive::<Float32Type>();
199 NullableEncoder::new(Encoder::Float32(F32Encoder(arr)), nulls)
200 }
201 DataType::Float64 => {
202 let arr = array.as_primitive::<Float64Type>();
203 NullableEncoder::new(Encoder::Float64(F64Encoder(arr)), nulls)
204 }
205 DataType::Binary => {
206 let arr = array.as_binary::<i32>();
207 NullableEncoder::new(Encoder::Binary(BinaryEncoder(arr)), nulls)
208 }
209 DataType::Timestamp(TimeUnit::Microsecond, _) => {
210 let arr = array.as_primitive::<TimestampMicrosecondType>();
211 NullableEncoder::new(Encoder::Timestamp(LongEncoder(arr)), nulls)
212 }
213 other => {
214 return Err(ArrowError::NotYetImplemented(format!(
215 "Unsupported data type for Avro encoding in slim build: {other:?}"
216 )))
217 }
218 };
219 Ok(enc)
220}
221
222struct BooleanEncoder<'a>(&'a arrow_array::BooleanArray);
223impl BooleanEncoder<'_> {
224 #[inline]
225 fn encode<W: Write + ?Sized>(&mut self, idx: usize, out: &mut W) -> Result<(), ArrowError> {
226 write_bool(out, self.0.value(idx))
227 }
228}
229
230struct IntEncoder<'a, P: ArrowPrimitiveType<Native = i32>>(&'a PrimitiveArray<P>);
232impl<'a, P: ArrowPrimitiveType<Native = i32>> IntEncoder<'a, P> {
233 #[inline]
234 fn encode<W: Write + ?Sized>(&mut self, idx: usize, out: &mut W) -> Result<(), ArrowError> {
235 write_int(out, self.0.value(idx))
236 }
237}
238
239struct LongEncoder<'a, P: ArrowPrimitiveType<Native = i64>>(&'a PrimitiveArray<P>);
241impl<'a, P: ArrowPrimitiveType<Native = i64>> LongEncoder<'a, P> {
242 #[inline]
243 fn encode<W: Write + ?Sized>(&mut self, idx: usize, out: &mut W) -> Result<(), ArrowError> {
244 write_long(out, self.0.value(idx))
245 }
246}
247
248struct BinaryEncoder<'a, O: OffsetSizeTrait>(&'a GenericBinaryArray<O>);
250impl<'a, O: OffsetSizeTrait> BinaryEncoder<'a, O> {
251 #[inline]
252 fn encode<W: Write + ?Sized>(&mut self, idx: usize, out: &mut W) -> Result<(), ArrowError> {
253 write_len_prefixed(out, self.0.value(idx))
254 }
255}
256
257struct F32Encoder<'a>(&'a arrow_array::Float32Array);
258impl F32Encoder<'_> {
259 #[inline]
260 fn encode<W: Write + ?Sized>(&mut self, idx: usize, out: &mut W) -> Result<(), ArrowError> {
261 let bits = self.0.value(idx).to_bits();
263 out.write_all(&bits.to_le_bytes())
264 .map_err(|e| ArrowError::IoError(format!("write f32: {e}"), e))
265 }
266}
267
268struct F64Encoder<'a>(&'a arrow_array::Float64Array);
269impl F64Encoder<'_> {
270 #[inline]
271 fn encode<W: Write + ?Sized>(&mut self, idx: usize, out: &mut W) -> Result<(), ArrowError> {
272 let bits = self.0.value(idx).to_bits();
274 out.write_all(&bits.to_le_bytes())
275 .map_err(|e| ArrowError::IoError(format!("write f64: {e}"), e))
276 }
277}