Skip to main content

arrow_json/reader/
binary_array.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::io::Write;
19use std::marker::PhantomData;
20use std::sync::Arc;
21
22use arrow_array::builder::{BinaryViewBuilder, FixedSizeBinaryBuilder, GenericBinaryBuilder};
23use arrow_array::{ArrayRef, GenericStringArray, OffsetSizeTrait};
24use arrow_schema::ArrowError;
25
26use crate::reader::ArrayDecoder;
27use crate::reader::tape::{Tape, TapeElement};
28
29#[inline]
30fn decode_hex_digit(byte: u8) -> Option<u8> {
31    match byte {
32        b'0'..=b'9' => Some(byte - b'0'),
33        b'a'..=b'f' => Some(byte - b'a' + 10),
34        b'A'..=b'F' => Some(byte - b'A' + 10),
35        _ => None,
36    }
37}
38
39fn invalid_hex_error_at(index: usize, byte: u8) -> ArrowError {
40    ArrowError::JsonError(format!(
41        "invalid hex encoding in binary data: invalid digit 0x{byte:02x} at position {index}"
42    ))
43}
44
45fn decode_hex_to_writer<W: Write>(hex_string: &str, writer: &mut W) -> Result<(), ArrowError> {
46    let bytes = hex_string.as_bytes();
47    let mut iter = bytes.chunks_exact(2);
48    let mut buffer = [0u8; 64];
49    let mut buffered = 0;
50
51    for (pair_index, pair) in (&mut iter).enumerate() {
52        let base = pair_index * 2;
53        let high = decode_hex_digit(pair[0]).ok_or_else(|| invalid_hex_error_at(base, pair[0]))?;
54        let low =
55            decode_hex_digit(pair[1]).ok_or_else(|| invalid_hex_error_at(base + 1, pair[1]))?;
56        buffer[buffered] = (high << 4) | low;
57        buffered += 1;
58
59        if buffered == buffer.len() {
60            writer
61                .write_all(&buffer)
62                .map_err(|e| ArrowError::JsonError(format!("failed to write binary data: {e}")))?;
63            buffered = 0;
64        }
65    }
66
67    let remainder = iter.remainder();
68    if !remainder.is_empty() {
69        let index = (bytes.len() / 2) * 2;
70        let low = decode_hex_digit(remainder[0])
71            .ok_or_else(|| invalid_hex_error_at(index, remainder[0]))?;
72        buffer[buffered] = low;
73        buffered += 1;
74    }
75
76    if buffered > 0 {
77        writer
78            .write_all(&buffer[..buffered])
79            .map_err(|e| ArrowError::JsonError(format!("failed to write binary data: {e}")))?;
80    }
81
82    Ok(())
83}
84
85#[derive(Default)]
86pub struct BinaryArrayDecoder<O: OffsetSizeTrait> {
87    phantom: PhantomData<O>,
88}
89
90impl<O: OffsetSizeTrait> ArrayDecoder for BinaryArrayDecoder<O> {
91    fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayRef, ArrowError> {
92        let data_capacity = estimate_data_capacity(tape, pos)?;
93
94        if O::from_usize(data_capacity).is_none() {
95            return Err(ArrowError::JsonError(format!(
96                "offset overflow decoding {}",
97                GenericStringArray::<O>::DATA_TYPE
98            )));
99        }
100
101        let mut builder = GenericBinaryBuilder::<O>::with_capacity(pos.len(), data_capacity);
102
103        for p in pos {
104            match tape.get(*p) {
105                TapeElement::String(idx) => {
106                    let string = tape.get_string(idx);
107                    // Decode directly into the builder for performance. If decoding fails,
108                    // the error is terminal and the builder is discarded by the caller.
109                    decode_hex_to_writer(string, &mut builder)?;
110                    builder.append_value(b"");
111                }
112                TapeElement::Null => builder.append_null(),
113                _ => unreachable!(),
114            }
115        }
116
117        Ok(Arc::new(builder.finish()))
118    }
119}
120
121#[derive(Default)]
122pub struct FixedSizeBinaryArrayDecoder {
123    len: i32,
124}
125
126impl FixedSizeBinaryArrayDecoder {
127    pub fn new(len: i32) -> Self {
128        Self { len }
129    }
130}
131
132impl ArrayDecoder for FixedSizeBinaryArrayDecoder {
133    fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayRef, ArrowError> {
134        let mut builder = FixedSizeBinaryBuilder::with_capacity(pos.len(), self.len);
135        // Preallocate for the decoded byte width (FixedSizeBinary len), not the hex string length.
136        let mut scratch = Vec::with_capacity(self.len as usize);
137
138        for p in pos {
139            match tape.get(*p) {
140                TapeElement::String(idx) => {
141                    let string = tape.get_string(idx);
142                    scratch.clear();
143                    scratch.reserve(string.len().div_ceil(2));
144                    decode_hex_to_writer(string, &mut scratch)?;
145                    builder.append_value(&scratch)?;
146                }
147                TapeElement::Null => builder.append_null(),
148                _ => unreachable!(),
149            }
150        }
151
152        Ok(Arc::new(builder.finish()))
153    }
154}
155
156#[derive(Default)]
157pub struct BinaryViewDecoder {}
158
159impl ArrayDecoder for BinaryViewDecoder {
160    fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayRef, ArrowError> {
161        let data_capacity = estimate_data_capacity(tape, pos)?;
162        let mut builder = BinaryViewBuilder::with_capacity(data_capacity);
163        let mut scratch = Vec::new();
164
165        for p in pos {
166            match tape.get(*p) {
167                TapeElement::String(idx) => {
168                    let string = tape.get_string(idx);
169                    scratch.clear();
170                    scratch.reserve(string.len().div_ceil(2));
171                    decode_hex_to_writer(string, &mut scratch)?;
172                    builder.append_value(&scratch);
173                }
174                TapeElement::Null => builder.append_null(),
175                _ => unreachable!(),
176            }
177        }
178
179        Ok(Arc::new(builder.finish()))
180    }
181}
182
183fn estimate_data_capacity(tape: &Tape<'_>, pos: &[u32]) -> Result<usize, ArrowError> {
184    let mut data_capacity = 0;
185    for p in pos {
186        match tape.get(*p) {
187            TapeElement::String(idx) => {
188                let string_len = tape.get_string(idx).len();
189                // two hex characters represent one byte
190                let decoded_len = string_len.div_ceil(2);
191                data_capacity += decoded_len;
192            }
193            TapeElement::Null => {}
194            _ => {
195                return Err(tape.error(*p, "binary data encoded as string"));
196            }
197        }
198    }
199    Ok(data_capacity)
200}
201
202#[cfg(test)]
203mod tests {
204    use super::*;
205    use crate::ReaderBuilder;
206    use arrow_schema::{DataType, Field};
207    use std::io::Cursor;
208
209    #[test]
210    fn test_decode_hex_to_writer_empty() {
211        let mut out = Vec::new();
212        decode_hex_to_writer("", &mut out).unwrap();
213        assert!(out.is_empty());
214    }
215
216    #[test]
217    fn test_decode_hex_to_writer_odd_length() {
218        let mut out = Vec::new();
219        decode_hex_to_writer("0f0", &mut out).unwrap();
220        assert_eq!(out, vec![0x0f, 0x00]);
221
222        out.clear();
223        decode_hex_to_writer("a", &mut out).unwrap();
224        assert_eq!(out, vec![0x0a]);
225    }
226
227    #[test]
228    fn test_decode_hex_to_writer_invalid() {
229        let mut out = Vec::new();
230        let err = decode_hex_to_writer("0f0g", &mut out).unwrap_err();
231        match err {
232            ArrowError::JsonError(msg) => {
233                assert!(msg.contains("invalid hex encoding in binary data"));
234                assert!(msg.contains("position 3"));
235            }
236            _ => panic!("expected JsonError"),
237        }
238    }
239
240    #[test]
241    fn test_binary_reader_invalid_hex_is_terminal() {
242        let field = Field::new("item", DataType::Binary, false);
243        let data = b"\"0f0g\"\n\"0f00\"\n";
244        let mut reader = ReaderBuilder::new_with_field(field)
245            .build(Cursor::new(data))
246            .unwrap();
247
248        let err = reader.next().unwrap().unwrap_err().to_string();
249        assert!(err.contains("invalid hex encoding in binary data"));
250
251        match reader.next() {
252            None => {}
253            Some(Err(_)) => {}
254            Some(Ok(_)) => panic!("expected terminal error after invalid hex"),
255        }
256    }
257}