Skip to main content

arrow_json/reader/
binary_array.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::io::Write;
19use std::marker::PhantomData;
20use std::sync::Arc;
21
22use arrow_array::builder::{BinaryViewBuilder, FixedSizeBinaryBuilder, GenericBinaryBuilder};
23use arrow_array::{ArrayRef, GenericStringArray, OffsetSizeTrait};
24use arrow_schema::ArrowError;
25
26use crate::reader::ArrayDecoder;
27use crate::reader::tape::{Tape, TapeElement};
28
29#[inline]
30fn decode_hex_digit(byte: u8) -> Option<u8> {
31    match byte {
32        b'0'..=b'9' => Some(byte - b'0'),
33        b'a'..=b'f' => Some(byte - b'a' + 10),
34        b'A'..=b'F' => Some(byte - b'A' + 10),
35        _ => None,
36    }
37}
38
39fn invalid_hex_error_at(index: usize, byte: u8) -> ArrowError {
40    ArrowError::JsonError(format!(
41        "invalid hex encoding in binary data: invalid digit 0x{byte:02x} at position {index}"
42    ))
43}
44
45fn decode_hex_to_writer<W: Write>(hex_string: &str, writer: &mut W) -> Result<(), ArrowError> {
46    let bytes = hex_string.as_bytes();
47    let mut iter = bytes.chunks_exact(2);
48    let mut buffer = [0u8; 64];
49    let mut buffered = 0;
50
51    for (pair_index, pair) in (&mut iter).enumerate() {
52        let base = pair_index * 2;
53        let high = decode_hex_digit(pair[0]).ok_or_else(|| invalid_hex_error_at(base, pair[0]))?;
54        let low =
55            decode_hex_digit(pair[1]).ok_or_else(|| invalid_hex_error_at(base + 1, pair[1]))?;
56        buffer[buffered] = (high << 4) | low;
57        buffered += 1;
58
59        if buffered == buffer.len() {
60            writer
61                .write_all(&buffer)
62                .map_err(|e| ArrowError::JsonError(format!("failed to write binary data: {e}")))?;
63            buffered = 0;
64        }
65    }
66
67    let remainder = iter.remainder();
68    if !remainder.is_empty() {
69        let index = (bytes.len() / 2) * 2;
70        let low = decode_hex_digit(remainder[0])
71            .ok_or_else(|| invalid_hex_error_at(index, remainder[0]))?;
72        buffer[buffered] = low;
73        buffered += 1;
74    }
75
76    if buffered > 0 {
77        writer
78            .write_all(&buffer[..buffered])
79            .map_err(|e| ArrowError::JsonError(format!("failed to write binary data: {e}")))?;
80    }
81
82    Ok(())
83}
84
85#[derive(Default)]
86pub struct BinaryArrayDecoder<O: OffsetSizeTrait> {
87    phantom: PhantomData<O>,
88}
89
90impl<O: OffsetSizeTrait> ArrayDecoder for BinaryArrayDecoder<O> {
91    fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayRef, ArrowError> {
92        let data_capacity = estimate_data_capacity(tape, pos)?;
93
94        if O::from_usize(data_capacity).is_none() {
95            return Err(ArrowError::JsonError(format!(
96                "offset overflow decoding {}",
97                GenericStringArray::<O>::DATA_TYPE
98            )));
99        }
100
101        let mut builder = GenericBinaryBuilder::<O>::with_capacity(pos.len(), data_capacity);
102
103        for p in pos {
104            match tape.get(*p) {
105                TapeElement::String(idx) => {
106                    let string = tape.get_string(idx);
107                    // Decode directly into the builder for performance. If decoding fails,
108                    // the error is terminal and the builder is discarded by the caller.
109                    decode_hex_to_writer(string, &mut builder)?;
110                    builder.append_value(b"");
111                }
112                TapeElement::Null => builder.append_null(),
113                _ => unreachable!(),
114            }
115        }
116
117        Ok(Arc::new(builder.finish()))
118    }
119}
120
121#[derive(Default)]
122pub struct FixedSizeBinaryArrayDecoder {
123    len: i32,
124}
125
126impl FixedSizeBinaryArrayDecoder {
127    pub fn new(len: i32) -> Self {
128        Self { len }
129    }
130}
131
132impl ArrayDecoder for FixedSizeBinaryArrayDecoder {
133    fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayRef, ArrowError> {
134        let mut builder = FixedSizeBinaryBuilder::with_capacity(pos.len(), self.len);
135        // Preallocate for the decoded byte width (FixedSizeBinary len), not the hex string length.
136        let capacity: usize = self.len.try_into().map_err(|_| {
137            ArrowError::InvalidArgumentError(format!("Cannot convert size '{}' to usize", self.len))
138        })?;
139        let mut scratch = Vec::with_capacity(capacity);
140
141        for p in pos {
142            match tape.get(*p) {
143                TapeElement::String(idx) => {
144                    let string = tape.get_string(idx);
145                    scratch.clear();
146                    scratch.reserve(string.len().div_ceil(2));
147                    decode_hex_to_writer(string, &mut scratch)?;
148                    builder.append_value(&scratch)?;
149                }
150                TapeElement::Null => builder.append_null(),
151                _ => unreachable!(),
152            }
153        }
154
155        Ok(Arc::new(builder.finish()))
156    }
157}
158
159#[derive(Default)]
160pub struct BinaryViewDecoder {}
161
162impl ArrayDecoder for BinaryViewDecoder {
163    fn decode(&mut self, tape: &Tape<'_>, pos: &[u32]) -> Result<ArrayRef, ArrowError> {
164        let data_capacity = estimate_data_capacity(tape, pos)?;
165        let mut builder = BinaryViewBuilder::with_capacity(data_capacity);
166        let mut scratch = Vec::new();
167
168        for p in pos {
169            match tape.get(*p) {
170                TapeElement::String(idx) => {
171                    let string = tape.get_string(idx);
172                    scratch.clear();
173                    scratch.reserve(string.len().div_ceil(2));
174                    decode_hex_to_writer(string, &mut scratch)?;
175                    builder.append_value(&scratch);
176                }
177                TapeElement::Null => builder.append_null(),
178                _ => unreachable!(),
179            }
180        }
181
182        Ok(Arc::new(builder.finish()))
183    }
184}
185
186fn estimate_data_capacity(tape: &Tape<'_>, pos: &[u32]) -> Result<usize, ArrowError> {
187    let mut data_capacity = 0;
188    for p in pos {
189        match tape.get(*p) {
190            TapeElement::String(idx) => {
191                let string_len = tape.get_string(idx).len();
192                // two hex characters represent one byte
193                let decoded_len = string_len.div_ceil(2);
194                data_capacity += decoded_len;
195            }
196            TapeElement::Null => {}
197            _ => {
198                return Err(tape.error(*p, "binary data encoded as string"));
199            }
200        }
201    }
202    Ok(data_capacity)
203}
204
205#[cfg(test)]
206mod tests {
207    use super::*;
208    use crate::ReaderBuilder;
209    use arrow_schema::{DataType, Field};
210    use std::io::Cursor;
211
212    #[test]
213    fn test_decode_hex_to_writer_empty() {
214        let mut out = Vec::new();
215        decode_hex_to_writer("", &mut out).unwrap();
216        assert!(out.is_empty());
217    }
218
219    #[test]
220    fn test_decode_hex_to_writer_odd_length() {
221        let mut out = Vec::new();
222        decode_hex_to_writer("0f0", &mut out).unwrap();
223        assert_eq!(out, vec![0x0f, 0x00]);
224
225        out.clear();
226        decode_hex_to_writer("a", &mut out).unwrap();
227        assert_eq!(out, vec![0x0a]);
228    }
229
230    #[test]
231    fn test_decode_hex_to_writer_invalid() {
232        let mut out = Vec::new();
233        let err = decode_hex_to_writer("0f0g", &mut out).unwrap_err();
234        match err {
235            ArrowError::JsonError(msg) => {
236                assert!(msg.contains("invalid hex encoding in binary data"));
237                assert!(msg.contains("position 3"));
238            }
239            _ => panic!("expected JsonError"),
240        }
241    }
242
243    #[test]
244    fn test_binary_reader_invalid_hex_is_terminal() {
245        let field = Field::new("item", DataType::Binary, false);
246        let data = b"\"0f0g\"\n\"0f00\"\n";
247        let mut reader = ReaderBuilder::new_with_field(field)
248            .build(Cursor::new(data))
249            .unwrap();
250
251        let err = reader.next().unwrap().unwrap_err().to_string();
252        assert!(err.contains("invalid hex encoding in binary data"));
253
254        match reader.next() {
255            None => {}
256            Some(Err(_)) => {}
257            Some(Ok(_)) => panic!("expected terminal error after invalid hex"),
258        }
259    }
260}