Apache Arrow (C++)
A columnar in-memory analytics layer designed to accelerate big data.
bit-stream-utils.h
Go to the documentation of this file.
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 // From Apache Impala (incubating) as of 2016-01-29
19 
20 #ifndef ARROW_UTIL_BIT_STREAM_UTILS_H
21 #define ARROW_UTIL_BIT_STREAM_UTILS_H
22 
23 #include <string.h>
24 #include <algorithm>
25 #include <cstdint>
26 
27 #include "arrow/util/bit-util.h"
28 #include "arrow/util/bpacking.h"
29 #include "arrow/util/logging.h"
30 #include "arrow/util/macros.h"
31 
32 namespace arrow {
33 
37 class BitWriter {
38  public:
41  BitWriter(uint8_t* buffer, int buffer_len) : buffer_(buffer), max_bytes_(buffer_len) {
42  Clear();
43  }
44 
45  void Clear() {
46  buffered_values_ = 0;
47  byte_offset_ = 0;
48  bit_offset_ = 0;
49  }
50 
53  int bytes_written() const {
54  return byte_offset_ + static_cast<int>(BitUtil::Ceil(bit_offset_, 8));
55  }
56  uint8_t* buffer() const { return buffer_; }
57  int buffer_len() const { return max_bytes_; }
58 
61  bool PutValue(uint64_t v, int num_bits);
62 
66  template <typename T>
67  bool PutAligned(T v, int num_bytes);
68 
73  bool PutVlqInt(uint32_t v);
74 
75  // Writes an int zigzag encoded.
76  bool PutZigZagVlqInt(int32_t v);
77 
81  uint8_t* GetNextBytePtr(int num_bytes = 1);
82 
86  void Flush(bool align = false);
87 
88  private:
89  uint8_t* buffer_;
90  int max_bytes_;
91 
94  uint64_t buffered_values_;
95 
96  int byte_offset_; // Offset in buffer_
97  int bit_offset_; // Offset in buffered_values_
98 };
99 
103 class BitReader {
104  public:
106  BitReader(const uint8_t* buffer, int buffer_len)
107  : buffer_(buffer), max_bytes_(buffer_len), byte_offset_(0), bit_offset_(0) {
108  int num_bytes = std::min(8, max_bytes_ - byte_offset_);
109  memcpy(&buffered_values_, buffer_ + byte_offset_, num_bytes);
110  }
111 
112  BitReader() : buffer_(NULL), max_bytes_(0) {}
113 
114  void Reset(const uint8_t* buffer, int buffer_len) {
115  buffer_ = buffer;
116  max_bytes_ = buffer_len;
117  byte_offset_ = 0;
118  bit_offset_ = 0;
119  int num_bytes = std::min(8, max_bytes_ - byte_offset_);
120  memcpy(&buffered_values_, buffer_ + byte_offset_, num_bytes);
121  }
122 
125  template <typename T>
126  bool GetValue(int num_bits, T* v);
127 
129  template <typename T>
130  int GetBatch(int num_bits, T* v, int batch_size);
131 
137  template <typename T>
138  bool GetAligned(int num_bytes, T* v);
139 
143  bool GetVlqInt(int32_t* v);
144 
145  // Reads a zigzag encoded int `into` v.
146  bool GetZigZagVlqInt(int32_t* v);
147 
150  int bytes_left() {
151  return max_bytes_ - (byte_offset_ + static_cast<int>(BitUtil::Ceil(bit_offset_, 8)));
152  }
153 
155  static const int MAX_VLQ_BYTE_LEN = 5;
156 
157  private:
158  const uint8_t* buffer_;
159  int max_bytes_;
160 
163  uint64_t buffered_values_;
164 
165  int byte_offset_; // Offset in buffer_
166  int bit_offset_; // Offset in buffered_values_
167 };
168 
169 inline bool BitWriter::PutValue(uint64_t v, int num_bits) {
170  // TODO: revisit this limit if necessary (can be raised to 64 by fixing some edge cases)
171  DCHECK_LE(num_bits, 32);
172  DCHECK_EQ(v >> num_bits, 0) << "v = " << v << ", num_bits = " << num_bits;
173 
174  if (ARROW_PREDICT_FALSE(byte_offset_ * 8 + bit_offset_ + num_bits > max_bytes_ * 8))
175  return false;
176 
177  buffered_values_ |= v << bit_offset_;
178  bit_offset_ += num_bits;
179 
180  if (ARROW_PREDICT_FALSE(bit_offset_ >= 64)) {
181  // Flush buffered_values_ and write out bits of v that did not fit
182  memcpy(buffer_ + byte_offset_, &buffered_values_, 8);
183  buffered_values_ = 0;
184  byte_offset_ += 8;
185  bit_offset_ -= 64;
186  buffered_values_ = v >> (num_bits - bit_offset_);
187  }
188  DCHECK_LT(bit_offset_, 64);
189  return true;
190 }
191 
192 inline void BitWriter::Flush(bool align) {
193  int num_bytes = static_cast<int>(BitUtil::Ceil(bit_offset_, 8));
194  DCHECK_LE(byte_offset_ + num_bytes, max_bytes_);
195  memcpy(buffer_ + byte_offset_, &buffered_values_, num_bytes);
196 
197  if (align) {
198  buffered_values_ = 0;
199  byte_offset_ += num_bytes;
200  bit_offset_ = 0;
201  }
202 }
203 
204 inline uint8_t* BitWriter::GetNextBytePtr(int num_bytes) {
205  Flush(/* align */ true);
206  DCHECK_LE(byte_offset_, max_bytes_);
207  if (byte_offset_ + num_bytes > max_bytes_) return NULL;
208  uint8_t* ptr = buffer_ + byte_offset_;
209  byte_offset_ += num_bytes;
210  return ptr;
211 }
212 
213 template <typename T>
214 inline bool BitWriter::PutAligned(T val, int num_bytes) {
215  uint8_t* ptr = GetNextBytePtr(num_bytes);
216  if (ptr == NULL) return false;
217  memcpy(ptr, &val, num_bytes);
218  return true;
219 }
220 
221 inline bool BitWriter::PutVlqInt(uint32_t v) {
222  bool result = true;
223  while ((v & 0xFFFFFF80) != 0L) {
224  result &= PutAligned<uint8_t>(static_cast<uint8_t>((v & 0x7F) | 0x80), 1);
225  v >>= 7;
226  }
227  result &= PutAligned<uint8_t>(static_cast<uint8_t>(v & 0x7F), 1);
228  return result;
229 }
230 
231 namespace detail {
232 
233 template <typename T>
234 inline void GetValue_(int num_bits, T* v, int max_bytes, const uint8_t* buffer,
235  int* bit_offset, int* byte_offset, uint64_t* buffered_values) {
236 #ifdef _MSC_VER
237 #pragma warning(push)
238 #pragma warning(disable : 4800)
239 #endif
240  *v = static_cast<T>(BitUtil::TrailingBits(*buffered_values, *bit_offset + num_bits) >>
241  *bit_offset);
242 #ifdef _MSC_VER
243 #pragma warning(pop)
244 #endif
245  *bit_offset += num_bits;
246  if (*bit_offset >= 64) {
247  *byte_offset += 8;
248  *bit_offset -= 64;
249 
250  int bytes_remaining = max_bytes - *byte_offset;
251  if (ARROW_PREDICT_TRUE(bytes_remaining >= 8)) {
252  memcpy(buffered_values, buffer + *byte_offset, 8);
253  } else {
254  memcpy(buffered_values, buffer + *byte_offset, bytes_remaining);
255  }
256 #ifdef _MSC_VER
257 #pragma warning(push)
258 #pragma warning(disable : 4800 4805)
259 #endif
260  // Read bits of v that crossed into new buffered_values_
261  *v = *v | static_cast<T>(BitUtil::TrailingBits(*buffered_values, *bit_offset)
262  << (num_bits - *bit_offset));
263 #ifdef _MSC_VER
264 #pragma warning(pop)
265 #endif
266  DCHECK_LE(*bit_offset, 64);
267  }
268 }
269 
270 } // namespace detail
271 
272 template <typename T>
273 inline bool BitReader::GetValue(int num_bits, T* v) {
274  return GetBatch(num_bits, v, 1) == 1;
275 }
276 
277 template <typename T>
278 inline int BitReader::GetBatch(int num_bits, T* v, int batch_size) {
279  DCHECK(buffer_ != NULL);
280  // TODO: revisit this limit if necessary
281  DCHECK_LE(num_bits, 32);
282  DCHECK_LE(num_bits, static_cast<int>(sizeof(T) * 8));
283 
284  int bit_offset = bit_offset_;
285  int byte_offset = byte_offset_;
286  uint64_t buffered_values = buffered_values_;
287  int max_bytes = max_bytes_;
288  const uint8_t* buffer = buffer_;
289 
290  uint64_t needed_bits = num_bits * batch_size;
291  uint64_t remaining_bits = (max_bytes - byte_offset) * 8 - bit_offset;
292  if (remaining_bits < needed_bits) {
293  batch_size = static_cast<int>(remaining_bits) / num_bits;
294  }
295 
296  int i = 0;
297  if (ARROW_PREDICT_FALSE(bit_offset != 0)) {
298  for (; i < batch_size && bit_offset != 0; ++i) {
299  detail::GetValue_(num_bits, &v[i], max_bytes, buffer, &bit_offset, &byte_offset,
300  &buffered_values);
301  }
302  }
303 
304  if (sizeof(T) == 4) {
305  int num_unpacked =
306  internal::unpack32(reinterpret_cast<const uint32_t*>(buffer + byte_offset),
307  reinterpret_cast<uint32_t*>(v + i), batch_size - i, num_bits);
308  i += num_unpacked;
309  byte_offset += num_unpacked * num_bits / 8;
310  } else {
311  const int buffer_size = 1024;
312  uint32_t unpack_buffer[buffer_size];
313  while (i < batch_size) {
314  int unpack_size = std::min(buffer_size, batch_size - i);
315  int num_unpacked =
316  internal::unpack32(reinterpret_cast<const uint32_t*>(buffer + byte_offset),
317  unpack_buffer, unpack_size, num_bits);
318  if (num_unpacked == 0) {
319  break;
320  }
321  for (int k = 0; k < num_unpacked; ++k) {
322 #ifdef _MSC_VER
323 #pragma warning(push)
324 #pragma warning(disable : 4800)
325 #endif
326  v[i + k] = static_cast<T>(unpack_buffer[k]);
327 #ifdef _MSC_VER
328 #pragma warning(pop)
329 #endif
330  }
331  i += num_unpacked;
332  byte_offset += num_unpacked * num_bits / 8;
333  }
334  }
335 
336  int bytes_remaining = max_bytes - byte_offset;
337  if (bytes_remaining >= 8) {
338  memcpy(&buffered_values, buffer + byte_offset, 8);
339  } else {
340  memcpy(&buffered_values, buffer + byte_offset, bytes_remaining);
341  }
342 
343  for (; i < batch_size; ++i) {
344  detail::GetValue_(num_bits, &v[i], max_bytes, buffer, &bit_offset, &byte_offset,
345  &buffered_values);
346  }
347 
348  bit_offset_ = bit_offset;
349  byte_offset_ = byte_offset;
350  buffered_values_ = buffered_values;
351 
352  return batch_size;
353 }
354 
355 template <typename T>
356 inline bool BitReader::GetAligned(int num_bytes, T* v) {
357  DCHECK_LE(num_bytes, static_cast<int>(sizeof(T)));
358  int bytes_read = static_cast<int>(BitUtil::Ceil(bit_offset_, 8));
359  if (ARROW_PREDICT_FALSE(byte_offset_ + bytes_read + num_bytes > max_bytes_))
360  return false;
361 
362  // Advance byte_offset to next unread byte and read num_bytes
363  byte_offset_ += bytes_read;
364  memcpy(v, buffer_ + byte_offset_, num_bytes);
365  byte_offset_ += num_bytes;
366 
367  // Reset buffered_values_
368  bit_offset_ = 0;
369  int bytes_remaining = max_bytes_ - byte_offset_;
370  if (ARROW_PREDICT_TRUE(bytes_remaining >= 8)) {
371  memcpy(&buffered_values_, buffer_ + byte_offset_, 8);
372  } else {
373  memcpy(&buffered_values_, buffer_ + byte_offset_, bytes_remaining);
374  }
375  return true;
376 }
377 
378 inline bool BitReader::GetVlqInt(int32_t* v) {
379  *v = 0;
380  int shift = 0;
381  int num_bytes = 0;
382  uint8_t byte = 0;
383  do {
384  if (!GetAligned<uint8_t>(1, &byte)) return false;
385  *v |= (byte & 0x7F) << shift;
386  shift += 7;
387  DCHECK_LE(++num_bytes, MAX_VLQ_BYTE_LEN);
388  } while ((byte & 0x80) != 0);
389  return true;
390 }
391 
392 inline bool BitWriter::PutZigZagVlqInt(int32_t v) {
393  uint32_t u = (v << 1) ^ (v >> 31);
394  return PutVlqInt(u);
395 }
396 
397 inline bool BitReader::GetZigZagVlqInt(int32_t* v) {
398  int32_t u_signed;
399  if (!GetVlqInt(&u_signed)) return false;
400  uint32_t u = static_cast<uint32_t>(u_signed);
401  *reinterpret_cast<uint32_t*>(v) = (u >> 1) ^ -(static_cast<int32_t>(u & 1));
402  return true;
403 }
404 
405 } // namespace arrow
406 
407 #endif // ARROW_UTIL_BIT_STREAM_UTILS_H
bool PutValue(uint64_t v, int num_bits)
Writes a value to buffered_values_, flushing to buffer_ if necessary.
Definition: bit-stream-utils.h:169
Utility class to write bit/byte streams.
Definition: bit-stream-utils.h:37
bool GetAligned(int num_bytes, T *v)
Reads a &#39;num_bytes&#39;-sized value from the buffer and stores it in &#39;v&#39;.
Definition: bit-stream-utils.h:356
void Flush(bool align=false)
Flushes all buffered values to the buffer.
Definition: bit-stream-utils.h:192
void Reset(const uint8_t *buffer, int buffer_len)
Definition: bit-stream-utils.h:114
Utility class to read bit/byte stream.
Definition: bit-stream-utils.h:103
#define ARROW_PREDICT_TRUE(x)
Definition: macros.h:49
void Clear()
Definition: bit-stream-utils.h:45
bool GetValue(int num_bits, T *v)
Gets the next value from the buffer.
Definition: bit-stream-utils.h:273
int GetBatch(int num_bits, T *v, int batch_size)
Get a number of values from the buffer. Return the number of values actually read.
Definition: bit-stream-utils.h:278
uint8_t * buffer() const
Definition: bit-stream-utils.h:56
int bytes_written() const
The number of current bytes written, including the current byte (i.e.
Definition: bit-stream-utils.h:53
bool PutZigZagVlqInt(int32_t v)
Definition: bit-stream-utils.h:392
#define ARROW_PREDICT_FALSE(x)
Definition: macros.h:48
#define DCHECK_LT(val1, val2)
Definition: logging.h:82
int buffer_len() const
Definition: bit-stream-utils.h:57
bool PutAligned(T v, int num_bytes)
Writes v to the next aligned byte using num_bytes.
Definition: bit-stream-utils.h:214
int bytes_left()
Returns the number of bytes left in the stream, not including the current byte (i.e., there may be an additional fraction of a byte).
Definition: bit-stream-utils.h:150
uint8_t * GetNextBytePtr(int num_bytes=1)
Get a pointer to the next aligned byte and advance the underlying buffer by num_bytes.
Definition: bit-stream-utils.h:204
Top-level namespace for Apache Arrow C++ API.
Definition: allocator.h:29
#define DCHECK_EQ(val1, val2)
Definition: logging.h:79
bool PutVlqInt(uint32_t v)
Write a Vlq encoded int to the buffer.
Definition: bit-stream-utils.h:221
#define DCHECK(condition)
Definition: logging.h:78
BitWriter(uint8_t *buffer, int buffer_len)
buffer: buffer to write bits to.
Definition: bit-stream-utils.h:41
#define DCHECK_LE(val1, val2)
Definition: logging.h:81
BitReader(const uint8_t *buffer, int buffer_len)
&#39;buffer&#39; is the buffer to read from. The buffer&#39;s length is &#39;buffer_len&#39;.
Definition: bit-stream-utils.h:106
bool GetZigZagVlqInt(int32_t *v)
Definition: bit-stream-utils.h:397
bool GetVlqInt(int32_t *v)
Reads a vlq encoded int from the stream.
Definition: bit-stream-utils.h:378
BitReader()
Definition: bit-stream-utils.h:112