arrow_json/reader/
binary_array.rs1use std::io::Write;
19use std::marker::PhantomData;
20use std::sync::Arc;
21
22use arrow_array::builder::{BinaryViewBuilder, FixedSizeBinaryBuilder, GenericBinaryBuilder};
23use arrow_array::{ArrayRef, GenericStringArray, OffsetSizeTrait};
24use arrow_schema::ArrowError;
25
26use crate::reader::ArrayDecoder;
27use crate::reader::tape::{Tape, TapeElement};
28
29#[inline]
30fn decode_hex_digit(byte: u8) -> Option<u8> {
31 match byte {
32 b'0'..=b'9' => Some(byte - b'0'),
33 b'a'..=b'f' => Some(byte - b'a' + 10),
34 b'A'..=b'F' => Some(byte - b'A' + 10),
35 _ => None,
36 }
37}
38
39fn invalid_hex_error_at(index: usize, byte: u8) -> ArrowError {
40 ArrowError::JsonError(format!(
41 "invalid hex encoding in binary data: invalid digit 0x{byte:02x} at position {index}"
42 ))
43}
44
45fn decode_hex_to_writer<W: Write>(hex_string: &str, writer: &mut W) -> Result<(), ArrowError> {
46 let bytes = hex_string.as_bytes();
47 let mut iter = bytes.chunks_exact(2);
48 let mut buffer = [0u8; 64];
49 let mut buffered = 0;
50
51 for (pair_index, pair) in (&mut iter).enumerate() {
52 let base = pair_index * 2;
53 let high = decode_hex_digit(pair[0]).ok_or_else(|| invalid_hex_error_at(base, pair[0]))?;
54 let low =
55 decode_hex_digit(pair[1]).ok_or_else(|| invalid_hex_error_at(base + 1, pair[1]))?;
56 buffer[buffered] = (high << 4) | low;
57 buffered += 1;
58
59 if buffered == buffer.len() {
60 writer
61 .write_all(&buffer)
62 .map_err(|e| ArrowError::JsonError(format!("failed to write binary data: {e}")))?;
63 buffered = 0;
64 }
65 }
66
67 let remainder = iter.remainder();
68 if !remainder.is_empty() {
69 let index = (bytes.len() / 2) * 2;
70 let low = decode_hex_digit(remainder[0])
71 .ok_or_else(|| invalid_hex_error_at(index, remainder[0]))?;
72 buffer[buffered] = low;
73 buffered += 1;
74 }
75
76 if buffered > 0 {
77 writer
78 .write_all(&buffer[..buffered])
79 .map_err(|e| ArrowError::JsonError(format!("failed to write binary data: {e}")))?;
80 }
81
82 Ok(())
83}
84
85#[derive(Default)]
86pub struct BinaryArrayDecoder<O: OffsetSizeTrait> {
87 phantom: PhantomData<O>,
88}
89
90impl<O: OffsetSizeTrait> ArrayDecoder for BinaryArrayDecoder<O> {
91 fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayRef, ArrowError> {
92 let data_capacity = estimate_data_capacity(tape, pos)?;
93
94 if O::from_usize(data_capacity).is_none() {
95 return Err(ArrowError::JsonError(format!(
96 "offset overflow decoding {}",
97 GenericStringArray::<O>::DATA_TYPE
98 )));
99 }
100
101 let mut builder = GenericBinaryBuilder::<O>::with_capacity(pos.len(), data_capacity);
102
103 for p in pos {
104 match tape.get(*p) {
105 TapeElement::String(idx) => {
106 let string = tape.get_string(idx);
107 decode_hex_to_writer(string, &mut builder)?;
110 builder.append_value(b"");
111 }
112 TapeElement::Null => builder.append_null(),
113 _ => unreachable!(),
114 }
115 }
116
117 Ok(Arc::new(builder.finish()))
118 }
119}
120
121#[derive(Default)]
122pub struct FixedSizeBinaryArrayDecoder {
123 len: i32,
124}
125
126impl FixedSizeBinaryArrayDecoder {
127 pub fn new(len: i32) -> Self {
128 Self { len }
129 }
130}
131
132impl ArrayDecoder for FixedSizeBinaryArrayDecoder {
133 fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayRef, ArrowError> {
134 let mut builder = FixedSizeBinaryBuilder::with_capacity(pos.len(), self.len);
135 let mut scratch = Vec::with_capacity(self.len as usize);
137
138 for p in pos {
139 match tape.get(*p) {
140 TapeElement::String(idx) => {
141 let string = tape.get_string(idx);
142 scratch.clear();
143 scratch.reserve(string.len().div_ceil(2));
144 decode_hex_to_writer(string, &mut scratch)?;
145 builder.append_value(&scratch)?;
146 }
147 TapeElement::Null => builder.append_null(),
148 _ => unreachable!(),
149 }
150 }
151
152 Ok(Arc::new(builder.finish()))
153 }
154}
155
156#[derive(Default)]
157pub struct BinaryViewDecoder {}
158
159impl ArrayDecoder for BinaryViewDecoder {
160 fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayRef, ArrowError> {
161 let data_capacity = estimate_data_capacity(tape, pos)?;
162 let mut builder = BinaryViewBuilder::with_capacity(data_capacity);
163 let mut scratch = Vec::new();
164
165 for p in pos {
166 match tape.get(*p) {
167 TapeElement::String(idx) => {
168 let string = tape.get_string(idx);
169 scratch.clear();
170 scratch.reserve(string.len().div_ceil(2));
171 decode_hex_to_writer(string, &mut scratch)?;
172 builder.append_value(&scratch);
173 }
174 TapeElement::Null => builder.append_null(),
175 _ => unreachable!(),
176 }
177 }
178
179 Ok(Arc::new(builder.finish()))
180 }
181}
182
183fn estimate_data_capacity(tape: &Tape<'_>, pos: &[u32]) -> Result<usize, ArrowError> {
184 let mut data_capacity = 0;
185 for p in pos {
186 match tape.get(*p) {
187 TapeElement::String(idx) => {
188 let string_len = tape.get_string(idx).len();
189 let decoded_len = string_len.div_ceil(2);
191 data_capacity += decoded_len;
192 }
193 TapeElement::Null => {}
194 _ => {
195 return Err(tape.error(*p, "binary data encoded as string"));
196 }
197 }
198 }
199 Ok(data_capacity)
200}
201
202#[cfg(test)]
203mod tests {
204 use super::*;
205 use crate::ReaderBuilder;
206 use arrow_schema::{DataType, Field};
207 use std::io::Cursor;
208
209 #[test]
210 fn test_decode_hex_to_writer_empty() {
211 let mut out = Vec::new();
212 decode_hex_to_writer("", &mut out).unwrap();
213 assert!(out.is_empty());
214 }
215
216 #[test]
217 fn test_decode_hex_to_writer_odd_length() {
218 let mut out = Vec::new();
219 decode_hex_to_writer("0f0", &mut out).unwrap();
220 assert_eq!(out, vec![0x0f, 0x00]);
221
222 out.clear();
223 decode_hex_to_writer("a", &mut out).unwrap();
224 assert_eq!(out, vec![0x0a]);
225 }
226
227 #[test]
228 fn test_decode_hex_to_writer_invalid() {
229 let mut out = Vec::new();
230 let err = decode_hex_to_writer("0f0g", &mut out).unwrap_err();
231 match err {
232 ArrowError::JsonError(msg) => {
233 assert!(msg.contains("invalid hex encoding in binary data"));
234 assert!(msg.contains("position 3"));
235 }
236 _ => panic!("expected JsonError"),
237 }
238 }
239
240 #[test]
241 fn test_binary_reader_invalid_hex_is_terminal() {
242 let field = Field::new("item", DataType::Binary, false);
243 let data = b"\"0f0g\"\n\"0f00\"\n";
244 let mut reader = ReaderBuilder::new_with_field(field)
245 .build(Cursor::new(data))
246 .unwrap();
247
248 let err = reader.next().unwrap().unwrap_err().to_string();
249 assert!(err.contains("invalid hex encoding in binary data"));
250
251 match reader.next() {
252 None => {}
253 Some(Err(_)) => {}
254 Some(Ok(_)) => panic!("expected terminal error after invalid hex"),
255 }
256 }
257}