arrow_avro/reader/
block.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Decoder for [`Block`]
19
20use crate::reader::vlq::VLQDecoder;
21use arrow_schema::ArrowError;
22
23/// A file data block
24///
25/// <https://avro.apache.org/docs/1.11.1/specification/#object-container-files>
26#[derive(Debug, Default)]
27pub struct Block {
28    /// The number of objects in this block
29    pub count: usize,
30    /// The serialized objects within this block
31    pub data: Vec<u8>,
32    /// The sync marker
33    pub sync: [u8; 16],
34}
35
36/// A decoder for [`Block`]
37#[derive(Debug)]
38pub struct BlockDecoder {
39    state: BlockDecoderState,
40    in_progress: Block,
41    vlq_decoder: VLQDecoder,
42    bytes_remaining: usize,
43}
44
45#[derive(Debug)]
46enum BlockDecoderState {
47    Count,
48    Size,
49    Data,
50    Sync,
51    Finished,
52}
53
54impl Default for BlockDecoder {
55    fn default() -> Self {
56        Self {
57            state: BlockDecoderState::Count,
58            in_progress: Default::default(),
59            vlq_decoder: Default::default(),
60            bytes_remaining: 0,
61        }
62    }
63}
64
65impl BlockDecoder {
66    /// Parse [`Block`] from `buf`, returning the number of bytes read
67    ///
68    /// This method can be called multiple times with consecutive chunks of data, allowing
69    /// integration with chunked IO systems like [`BufRead::fill_buf`]
70    ///
71    /// All errors should be considered fatal, and decoding aborted
72    ///
73    /// Once an entire [`Block`] has been decoded this method will not read any further
74    /// input bytes, until [`Self::flush`] is called. Afterwards [`Self::decode`]
75    /// can then be used again to read the next block, if any
76    ///
77    /// [`BufRead::fill_buf`]: std::io::BufRead::fill_buf
78    pub fn decode(&mut self, mut buf: &[u8]) -> Result<usize, ArrowError> {
79        let max_read = buf.len();
80        while !buf.is_empty() {
81            match self.state {
82                BlockDecoderState::Count => {
83                    if let Some(c) = self.vlq_decoder.long(&mut buf) {
84                        self.in_progress.count = c.try_into().map_err(|_| {
85                            ArrowError::ParseError(format!(
86                                "Block count cannot be negative, got {c}"
87                            ))
88                        })?;
89
90                        self.state = BlockDecoderState::Size;
91                    }
92                }
93                BlockDecoderState::Size => {
94                    if let Some(c) = self.vlq_decoder.long(&mut buf) {
95                        self.bytes_remaining = c.try_into().map_err(|_| {
96                            ArrowError::ParseError(format!(
97                                "Block size cannot be negative, got {c}"
98                            ))
99                        })?;
100
101                        self.in_progress.data.reserve(self.bytes_remaining);
102                        self.state = BlockDecoderState::Data;
103                    }
104                }
105                BlockDecoderState::Data => {
106                    let to_read = self.bytes_remaining.min(buf.len());
107                    self.in_progress.data.extend_from_slice(&buf[..to_read]);
108                    buf = &buf[to_read..];
109                    self.bytes_remaining -= to_read;
110                    if self.bytes_remaining == 0 {
111                        self.bytes_remaining = 16;
112                        self.state = BlockDecoderState::Sync;
113                    }
114                }
115                BlockDecoderState::Sync => {
116                    let to_decode = buf.len().min(self.bytes_remaining);
117                    let write = &mut self.in_progress.sync[16 - to_decode..];
118                    write[..to_decode].copy_from_slice(&buf[..to_decode]);
119                    self.bytes_remaining -= to_decode;
120                    buf = &buf[to_decode..];
121                    if self.bytes_remaining == 0 {
122                        self.state = BlockDecoderState::Finished;
123                    }
124                }
125                BlockDecoderState::Finished => return Ok(max_read - buf.len()),
126            }
127        }
128        Ok(max_read)
129    }
130
131    /// Flush this decoder returning the parsed [`Block`] if any
132    pub fn flush(&mut self) -> Option<Block> {
133        match self.state {
134            BlockDecoderState::Finished => {
135                self.state = BlockDecoderState::Count;
136                Some(std::mem::take(&mut self.in_progress))
137            }
138            _ => None,
139        }
140    }
141}