arrow_avro/reader/block.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Decoder for [`Block`]
19
20use crate::reader::vlq::VLQDecoder;
21use arrow_schema::ArrowError;
22
23/// A file data block
24///
25/// <https://avro.apache.org/docs/1.11.1/specification/#object-container-files>
26#[derive(Debug, Default)]
27pub struct Block {
28 /// The number of objects in this block
29 pub count: usize,
30 /// The serialized objects within this block
31 pub data: Vec<u8>,
32 /// The sync marker
33 pub sync: [u8; 16],
34}
35
36/// A decoder for [`Block`]
37#[derive(Debug)]
38pub struct BlockDecoder {
39 state: BlockDecoderState,
40 in_progress: Block,
41 vlq_decoder: VLQDecoder,
42 bytes_remaining: usize,
43}
44
45#[derive(Debug)]
46enum BlockDecoderState {
47 Count,
48 Size,
49 Data,
50 Sync,
51 Finished,
52}
53
54impl Default for BlockDecoder {
55 fn default() -> Self {
56 Self {
57 state: BlockDecoderState::Count,
58 in_progress: Default::default(),
59 vlq_decoder: Default::default(),
60 bytes_remaining: 0,
61 }
62 }
63}
64
65impl BlockDecoder {
66 /// Parse [`Block`] from `buf`, returning the number of bytes read
67 ///
68 /// This method can be called multiple times with consecutive chunks of data, allowing
69 /// integration with chunked IO systems like [`BufRead::fill_buf`]
70 ///
71 /// All errors should be considered fatal, and decoding aborted
72 ///
73 /// Once an entire [`Block`] has been decoded this method will not read any further
74 /// input bytes, until [`Self::flush`] is called. Afterwards [`Self::decode`]
75 /// can then be used again to read the next block, if any
76 ///
77 /// [`BufRead::fill_buf`]: std::io::BufRead::fill_buf
78 pub fn decode(&mut self, mut buf: &[u8]) -> Result<usize, ArrowError> {
79 let max_read = buf.len();
80 while !buf.is_empty() {
81 match self.state {
82 BlockDecoderState::Count => {
83 if let Some(c) = self.vlq_decoder.long(&mut buf) {
84 self.in_progress.count = c.try_into().map_err(|_| {
85 ArrowError::ParseError(format!(
86 "Block count cannot be negative, got {c}"
87 ))
88 })?;
89
90 self.state = BlockDecoderState::Size;
91 }
92 }
93 BlockDecoderState::Size => {
94 if let Some(c) = self.vlq_decoder.long(&mut buf) {
95 self.bytes_remaining = c.try_into().map_err(|_| {
96 ArrowError::ParseError(format!(
97 "Block size cannot be negative, got {c}"
98 ))
99 })?;
100
101 self.in_progress.data.reserve(self.bytes_remaining);
102 self.state = BlockDecoderState::Data;
103 }
104 }
105 BlockDecoderState::Data => {
106 let to_read = self.bytes_remaining.min(buf.len());
107 self.in_progress.data.extend_from_slice(&buf[..to_read]);
108 buf = &buf[to_read..];
109 self.bytes_remaining -= to_read;
110 if self.bytes_remaining == 0 {
111 self.bytes_remaining = 16;
112 self.state = BlockDecoderState::Sync;
113 }
114 }
115 BlockDecoderState::Sync => {
116 let to_decode = buf.len().min(self.bytes_remaining);
117 let write = &mut self.in_progress.sync[16 - to_decode..];
118 write[..to_decode].copy_from_slice(&buf[..to_decode]);
119 self.bytes_remaining -= to_decode;
120 buf = &buf[to_decode..];
121 if self.bytes_remaining == 0 {
122 self.state = BlockDecoderState::Finished;
123 }
124 }
125 BlockDecoderState::Finished => return Ok(max_read - buf.len()),
126 }
127 }
128 Ok(max_read)
129 }
130
131 /// Flush this decoder returning the parsed [`Block`] if any
132 pub fn flush(&mut self) -> Option<Block> {
133 match self.state {
134 BlockDecoderState::Finished => {
135 self.state = BlockDecoderState::Count;
136 Some(std::mem::take(&mut self.in_progress))
137 }
138 _ => None,
139 }
140 }
141}