arrow_json/reader/
binary_array.rs1use std::io::Write;
19use std::marker::PhantomData;
20use std::sync::Arc;
21
22use arrow_array::builder::{BinaryViewBuilder, FixedSizeBinaryBuilder, GenericBinaryBuilder};
23use arrow_array::{ArrayRef, GenericStringArray, OffsetSizeTrait};
24use arrow_schema::ArrowError;
25
26use crate::reader::ArrayDecoder;
27use crate::reader::tape::{Tape, TapeElement};
28
29#[inline]
30fn decode_hex_digit(byte: u8) -> Option<u8> {
31 match byte {
32 b'0'..=b'9' => Some(byte - b'0'),
33 b'a'..=b'f' => Some(byte - b'a' + 10),
34 b'A'..=b'F' => Some(byte - b'A' + 10),
35 _ => None,
36 }
37}
38
39fn invalid_hex_error_at(index: usize, byte: u8) -> ArrowError {
40 ArrowError::JsonError(format!(
41 "invalid hex encoding in binary data: invalid digit 0x{byte:02x} at position {index}"
42 ))
43}
44
45fn decode_hex_to_writer<W: Write>(hex_string: &str, writer: &mut W) -> Result<(), ArrowError> {
46 let bytes = hex_string.as_bytes();
47 let mut iter = bytes.chunks_exact(2);
48 let mut buffer = [0u8; 64];
49 let mut buffered = 0;
50
51 for (pair_index, pair) in (&mut iter).enumerate() {
52 let base = pair_index * 2;
53 let high = decode_hex_digit(pair[0]).ok_or_else(|| invalid_hex_error_at(base, pair[0]))?;
54 let low =
55 decode_hex_digit(pair[1]).ok_or_else(|| invalid_hex_error_at(base + 1, pair[1]))?;
56 buffer[buffered] = (high << 4) | low;
57 buffered += 1;
58
59 if buffered == buffer.len() {
60 writer
61 .write_all(&buffer)
62 .map_err(|e| ArrowError::JsonError(format!("failed to write binary data: {e}")))?;
63 buffered = 0;
64 }
65 }
66
67 let remainder = iter.remainder();
68 if !remainder.is_empty() {
69 let index = (bytes.len() / 2) * 2;
70 let low = decode_hex_digit(remainder[0])
71 .ok_or_else(|| invalid_hex_error_at(index, remainder[0]))?;
72 buffer[buffered] = low;
73 buffered += 1;
74 }
75
76 if buffered > 0 {
77 writer
78 .write_all(&buffer[..buffered])
79 .map_err(|e| ArrowError::JsonError(format!("failed to write binary data: {e}")))?;
80 }
81
82 Ok(())
83}
84
85#[derive(Default)]
86pub struct BinaryArrayDecoder<O: OffsetSizeTrait> {
87 phantom: PhantomData<O>,
88}
89
90impl<O: OffsetSizeTrait> ArrayDecoder for BinaryArrayDecoder<O> {
91 fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayRef, ArrowError> {
92 let data_capacity = estimate_data_capacity(tape, pos)?;
93
94 if O::from_usize(data_capacity).is_none() {
95 return Err(ArrowError::JsonError(format!(
96 "offset overflow decoding {}",
97 GenericStringArray::<O>::DATA_TYPE
98 )));
99 }
100
101 let mut builder = GenericBinaryBuilder::<O>::with_capacity(pos.len(), data_capacity);
102
103 for p in pos {
104 match tape.get(*p) {
105 TapeElement::String(idx) => {
106 let string = tape.get_string(idx);
107 decode_hex_to_writer(string, &mut builder)?;
110 builder.append_value(b"");
111 }
112 TapeElement::Null => builder.append_null(),
113 _ => unreachable!(),
114 }
115 }
116
117 Ok(Arc::new(builder.finish()))
118 }
119}
120
121#[derive(Default)]
122pub struct FixedSizeBinaryArrayDecoder {
123 len: i32,
124}
125
126impl FixedSizeBinaryArrayDecoder {
127 pub fn new(len: i32) -> Self {
128 Self { len }
129 }
130}
131
132impl ArrayDecoder for FixedSizeBinaryArrayDecoder {
133 fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayRef, ArrowError> {
134 let mut builder = FixedSizeBinaryBuilder::with_capacity(pos.len(), self.len);
135 let capacity: usize = self.len.try_into().map_err(|_| {
137 ArrowError::InvalidArgumentError(format!("Cannot convert size '{}' to usize", self.len))
138 })?;
139 let mut scratch = Vec::with_capacity(capacity);
140
141 for p in pos {
142 match tape.get(*p) {
143 TapeElement::String(idx) => {
144 let string = tape.get_string(idx);
145 scratch.clear();
146 scratch.reserve(string.len().div_ceil(2));
147 decode_hex_to_writer(string, &mut scratch)?;
148 builder.append_value(&scratch)?;
149 }
150 TapeElement::Null => builder.append_null(),
151 _ => unreachable!(),
152 }
153 }
154
155 Ok(Arc::new(builder.finish()))
156 }
157}
158
159#[derive(Default)]
160pub struct BinaryViewDecoder {}
161
162impl ArrayDecoder for BinaryViewDecoder {
163 fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayRef, ArrowError> {
164 let data_capacity = estimate_data_capacity(tape, pos)?;
165 let mut builder = BinaryViewBuilder::with_capacity(data_capacity);
166 let mut scratch = Vec::new();
167
168 for p in pos {
169 match tape.get(*p) {
170 TapeElement::String(idx) => {
171 let string = tape.get_string(idx);
172 scratch.clear();
173 scratch.reserve(string.len().div_ceil(2));
174 decode_hex_to_writer(string, &mut scratch)?;
175 builder.append_value(&scratch);
176 }
177 TapeElement::Null => builder.append_null(),
178 _ => unreachable!(),
179 }
180 }
181
182 Ok(Arc::new(builder.finish()))
183 }
184}
185
186fn estimate_data_capacity(tape: &Tape<'_>, pos: &[u32]) -> Result<usize, ArrowError> {
187 let mut data_capacity = 0;
188 for p in pos {
189 match tape.get(*p) {
190 TapeElement::String(idx) => {
191 let string_len = tape.get_string(idx).len();
192 let decoded_len = string_len.div_ceil(2);
194 data_capacity += decoded_len;
195 }
196 TapeElement::Null => {}
197 _ => {
198 return Err(tape.error(*p, "binary data encoded as string"));
199 }
200 }
201 }
202 Ok(data_capacity)
203}
204
205#[cfg(test)]
206mod tests {
207 use super::*;
208 use crate::ReaderBuilder;
209 use arrow_schema::{DataType, Field};
210 use std::io::Cursor;
211
212 #[test]
213 fn test_decode_hex_to_writer_empty() {
214 let mut out = Vec::new();
215 decode_hex_to_writer("", &mut out).unwrap();
216 assert!(out.is_empty());
217 }
218
219 #[test]
220 fn test_decode_hex_to_writer_odd_length() {
221 let mut out = Vec::new();
222 decode_hex_to_writer("0f0", &mut out).unwrap();
223 assert_eq!(out, vec![0x0f, 0x00]);
224
225 out.clear();
226 decode_hex_to_writer("a", &mut out).unwrap();
227 assert_eq!(out, vec![0x0a]);
228 }
229
230 #[test]
231 fn test_decode_hex_to_writer_invalid() {
232 let mut out = Vec::new();
233 let err = decode_hex_to_writer("0f0g", &mut out).unwrap_err();
234 match err {
235 ArrowError::JsonError(msg) => {
236 assert!(msg.contains("invalid hex encoding in binary data"));
237 assert!(msg.contains("position 3"));
238 }
239 _ => panic!("expected JsonError"),
240 }
241 }
242
243 #[test]
244 fn test_binary_reader_invalid_hex_is_terminal() {
245 let field = Field::new("item", DataType::Binary, false);
246 let data = b"\"0f0g\"\n\"0f00\"\n";
247 let mut reader = ReaderBuilder::new_with_field(field)
248 .build(Cursor::new(data))
249 .unwrap();
250
251 let err = reader.next().unwrap().unwrap_err().to_string();
252 assert!(err.contains("invalid hex encoding in binary data"));
253
254 match reader.next() {
255 None => {}
256 Some(Err(_)) => {}
257 Some(Ok(_)) => panic!("expected terminal error after invalid hex"),
258 }
259 }
260}