arrow_avro/reader/block.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Decoder for [`Block`]
19
20use crate::errors::AvroError;
21use crate::reader::vlq::VLQDecoder;
22
23/// A file data block
24///
25/// <https://avro.apache.org/docs/1.11.1/specification/#object-container-files>
26#[derive(Debug, Default)]
27pub struct Block {
28 /// The number of objects in this block
29 pub count: usize,
30 /// The serialized objects within this block
31 pub data: Vec<u8>,
32 /// The sync marker
33 pub sync: [u8; 16],
34}
35
36/// A decoder for [`Block`]
37#[derive(Debug)]
38pub struct BlockDecoder {
39 state: BlockDecoderState,
40 in_progress: Block,
41 vlq_decoder: VLQDecoder,
42 bytes_remaining: usize,
43}
44
45#[derive(Debug)]
46pub(crate) enum BlockDecoderState {
47 Count,
48 Size,
49 Data,
50 Sync,
51 Finished,
52}
53
54impl Default for BlockDecoder {
55 fn default() -> Self {
56 Self {
57 state: BlockDecoderState::Count,
58 in_progress: Default::default(),
59 vlq_decoder: Default::default(),
60 bytes_remaining: 0,
61 }
62 }
63}
64
65impl BlockDecoder {
66 /// Parse [`Block`] from `buf`, returning the number of bytes read
67 ///
68 /// This method can be called multiple times with consecutive chunks of data, allowing
69 /// integration with chunked IO systems like [`BufRead::fill_buf`]
70 ///
71 /// All errors should be considered fatal, and decoding aborted
72 ///
73 /// Once an entire [`Block`] has been decoded this method will not read any further
74 /// input bytes, until [`Self::flush`] is called. Afterwards [`Self::decode`]
75 /// can then be used again to read the next block, if any
76 ///
77 /// [`BufRead::fill_buf`]: std::io::BufRead::fill_buf
78 pub fn decode(&mut self, mut buf: &[u8]) -> Result<usize, AvroError> {
79 let max_read = buf.len();
80 while !buf.is_empty() {
81 match self.state {
82 BlockDecoderState::Count => {
83 if let Some(c) = self.vlq_decoder.long(&mut buf) {
84 self.in_progress.count = c.try_into().map_err(|_| {
85 AvroError::ParseError(format!(
86 "Block count cannot be negative, got {c}"
87 ))
88 })?;
89
90 self.state = BlockDecoderState::Size;
91 }
92 }
93 BlockDecoderState::Size => {
94 if let Some(c) = self.vlq_decoder.long(&mut buf) {
95 self.bytes_remaining = c.try_into().map_err(|_| {
96 AvroError::ParseError(format!("Block size cannot be negative, got {c}"))
97 })?;
98
99 self.in_progress.data.reserve(self.bytes_remaining);
100 self.state = BlockDecoderState::Data;
101 }
102 }
103 BlockDecoderState::Data => {
104 let to_read = self.bytes_remaining.min(buf.len());
105 self.in_progress.data.extend_from_slice(&buf[..to_read]);
106 buf = &buf[to_read..];
107 self.bytes_remaining -= to_read;
108 if self.bytes_remaining == 0 {
109 self.bytes_remaining = 16;
110 self.state = BlockDecoderState::Sync;
111 }
112 }
113 BlockDecoderState::Sync => {
114 let to_decode = buf.len().min(self.bytes_remaining);
115 let write = &mut self.in_progress.sync[16 - to_decode..];
116 write[..to_decode].copy_from_slice(&buf[..to_decode]);
117 self.bytes_remaining -= to_decode;
118 buf = &buf[to_decode..];
119 if self.bytes_remaining == 0 {
120 self.state = BlockDecoderState::Finished;
121 }
122 }
123 BlockDecoderState::Finished => return Ok(max_read - buf.len()),
124 }
125 }
126 Ok(max_read)
127 }
128
129 /// Flush this decoder returning the parsed [`Block`] if any
130 pub fn flush(&mut self) -> Option<Block> {
131 match self.state {
132 BlockDecoderState::Finished => {
133 self.state = BlockDecoderState::Count;
134 Some(std::mem::take(&mut self.in_progress))
135 }
136 _ => None,
137 }
138 }
139}
140
141#[cfg(feature = "async")]
142impl BlockDecoder {
143 pub(crate) fn state(&self) -> &BlockDecoderState {
144 &self.state
145 }
146
147 pub(crate) fn bytes_remaining(&self) -> usize {
148 self.bytes_remaining
149 }
150}