Apache Arrow (C++)
A columnar in-memory analytics layer designed to accelerate big data.
rle-encoding.h
Go to the documentation of this file.
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 // Imported from Apache Impala (incubating) on 2016-01-29 and modified for use
19 // in parquet-cpp, Arrow
20 
21 #ifndef ARROW_UTIL_RLE_ENCODING_H
22 #define ARROW_UTIL_RLE_ENCODING_H
23 
24 #include <math.h>
25 #include <algorithm>
26 
28 #include "arrow/util/bit-util.h"
29 #include "arrow/util/macros.h"
30 
31 namespace arrow {
32 
49 //
54 //
64 //
70 //
76 //
81 //
82 
84 class RleDecoder {
85  public:
88  RleDecoder(const uint8_t* buffer, int buffer_len, int bit_width)
89  : bit_reader_(buffer, buffer_len),
90  bit_width_(bit_width),
91  current_value_(0),
92  repeat_count_(0),
93  literal_count_(0) {
95  DCHECK_LE(bit_width_, 64);
96  }
97 
99 
100  void Reset(const uint8_t* buffer, int buffer_len, int bit_width) {
101  DCHECK_GE(bit_width, 0);
102  DCHECK_LE(bit_width, 64);
103  bit_reader_.Reset(buffer, buffer_len);
104  bit_width_ = bit_width;
105  current_value_ = 0;
106  repeat_count_ = 0;
107  literal_count_ = 0;
108  }
109 
111  template <typename T>
112  bool Get(T* val);
113 
115  template <typename T>
116  int GetBatch(T* values, int batch_size);
117 
119  template <typename T>
120  int GetBatchWithDict(const T* dictionary, T* values, int batch_size);
121 
123  template <typename T>
124  int GetBatchWithDictSpaced(const T* dictionary, T* values, int batch_size,
125  int null_count, const uint8_t* valid_bits,
126  int64_t valid_bits_offset);
127 
128  protected:
132  uint64_t current_value_;
133  uint32_t repeat_count_;
134  uint32_t literal_count_;
135 
136  private:
139  template <typename T>
140  bool NextCounts();
141 };
142 
149 class RleEncoder {
150  public:
157  RleEncoder(uint8_t* buffer, int buffer_len, int bit_width)
158  : bit_width_(bit_width), bit_writer_(buffer, buffer_len) {
159  DCHECK_GE(bit_width_, 0);
160  DCHECK_LE(bit_width_, 64);
161  max_run_byte_size_ = MinBufferSize(bit_width);
162  DCHECK_GE(buffer_len, max_run_byte_size_) << "Input buffer not big enough.";
163  Clear();
164  }
165 
169  static int MinBufferSize(int bit_width) {
171  int max_literal_run_size =
172  1 + static_cast<int>(BitUtil::Ceil(MAX_VALUES_PER_LITERAL_RUN * bit_width, 8));
174  int max_repeated_run_size =
175  BitReader::MAX_VLQ_BYTE_LEN + static_cast<int>(BitUtil::Ceil(bit_width, 8));
176  return std::max(max_literal_run_size, max_repeated_run_size);
177  }
178 
180  static int MaxBufferSize(int bit_width, int num_values) {
181  // For a bit_width > 1, the worst case is the repetition of "literal run of length 8
182  // and then a repeated run of length 8".
183  // 8 values per smallest run, 8 bits per byte
184  // int bytes_per_run = BitUtil::Ceil(bit_width * 8, 8);
185  int bytes_per_run = bit_width;
186  int num_runs = static_cast<int>(BitUtil::Ceil(num_values, 8));
187  int literal_max_size = num_runs + num_runs * bytes_per_run;
188 
189  // In the very worst case scenario, the data is a concatenation of repeated
190  // runs of 8 values. Repeated run has a 1 byte varint followed by the
191  // bit-packed repeated value
192  int min_repeated_run_size = 1 + static_cast<int>(BitUtil::Ceil(bit_width, 8));
193  int repeated_max_size =
194  static_cast<int>(BitUtil::Ceil(num_values, 8)) * min_repeated_run_size;
195 
196  return std::max(literal_max_size, repeated_max_size);
197  }
198 
201  bool Put(uint64_t value);
202 
205  int Flush();
206 
208  void Clear();
209 
211  uint8_t* buffer() { return bit_writer_.buffer(); }
212  int32_t len() { return bit_writer_.bytes_written(); }
213 
214  private:
223  void FlushBufferedValues(bool done);
224 
227  void FlushLiteralRun(bool update_indicator_byte);
228 
230  void FlushRepeatedRun();
231 
234  void CheckBufferFull();
235 
238  static const int MAX_VALUES_PER_LITERAL_RUN = (1 << 6) * 8;
239 
241  const int bit_width_;
242 
244  BitWriter bit_writer_;
245 
247  bool buffer_full_;
248 
250  int max_run_byte_size_;
251 
255  int64_t buffered_values_[8];
256 
258  int num_buffered_values_;
259 
264  uint64_t current_value_;
265  int repeat_count_;
266 
270  int literal_count_;
271 
275  uint8_t* literal_indicator_byte_;
276 };
277 
278 template <typename T>
279 inline bool RleDecoder::Get(T* val) {
280  return GetBatch(val, 1) == 1;
281 }
282 
283 template <typename T>
284 inline int RleDecoder::GetBatch(T* values, int batch_size) {
285  DCHECK_GE(bit_width_, 0);
286  int values_read = 0;
287 
288  while (values_read < batch_size) {
289  if (repeat_count_ > 0) {
290  int repeat_batch =
291  std::min(batch_size - values_read, static_cast<int>(repeat_count_));
292  std::fill(values + values_read, values + values_read + repeat_batch,
293  static_cast<T>(current_value_));
294  repeat_count_ -= repeat_batch;
295  values_read += repeat_batch;
296  } else if (literal_count_ > 0) {
297  int literal_batch =
298  std::min(batch_size - values_read, static_cast<int>(literal_count_));
299  int actual_read =
300  bit_reader_.GetBatch(bit_width_, values + values_read, literal_batch);
301  DCHECK_EQ(actual_read, literal_batch);
302  literal_count_ -= literal_batch;
303  values_read += literal_batch;
304  } else {
305  if (!NextCounts<T>()) return values_read;
306  }
307  }
308 
309  return values_read;
310 }
311 
312 template <typename T>
313 inline int RleDecoder::GetBatchWithDict(const T* dictionary, T* values, int batch_size) {
314  DCHECK_GE(bit_width_, 0);
315  int values_read = 0;
316 
317  while (values_read < batch_size) {
318  if (repeat_count_ > 0) {
319  int repeat_batch =
320  std::min(batch_size - values_read, static_cast<int>(repeat_count_));
321  std::fill(values + values_read, values + values_read + repeat_batch,
322  dictionary[current_value_]);
323  repeat_count_ -= repeat_batch;
324  values_read += repeat_batch;
325  } else if (literal_count_ > 0) {
326  int literal_batch =
327  std::min(batch_size - values_read, static_cast<int>(literal_count_));
328 
329  const int buffer_size = 1024;
330  int indices[buffer_size];
331  literal_batch = std::min(literal_batch, buffer_size);
332  int actual_read = bit_reader_.GetBatch(bit_width_, &indices[0], literal_batch);
333  DCHECK_EQ(actual_read, literal_batch);
334  for (int i = 0; i < literal_batch; ++i) {
335  values[values_read + i] = dictionary[indices[i]];
336  }
337  literal_count_ -= literal_batch;
338  values_read += literal_batch;
339  } else {
340  if (!NextCounts<T>()) return values_read;
341  }
342  }
343 
344  return values_read;
345 }
346 
347 template <typename T>
348 inline int RleDecoder::GetBatchWithDictSpaced(const T* dictionary, T* values,
349  int batch_size, int null_count,
350  const uint8_t* valid_bits,
351  int64_t valid_bits_offset) {
352  DCHECK_GE(bit_width_, 0);
353  int values_read = 0;
354  int remaining_nulls = null_count;
355 
356  internal::BitmapReader bit_reader(valid_bits, valid_bits_offset, batch_size);
357 
358  while (values_read < batch_size) {
359  bool is_valid = bit_reader.IsSet();
360  bit_reader.Next();
361 
362  if (is_valid) {
363  if ((repeat_count_ == 0) && (literal_count_ == 0)) {
364  if (!NextCounts<T>()) return values_read;
365  }
366  if (repeat_count_ > 0) {
367  T value = dictionary[current_value_];
368  // The current index is already valid, we don't need to check that again
369  int repeat_batch = 1;
370  repeat_count_--;
371 
372  while (repeat_count_ > 0 && (values_read + repeat_batch) < batch_size) {
373  if (bit_reader.IsSet()) {
374  repeat_count_--;
375  } else {
376  remaining_nulls--;
377  }
378  repeat_batch++;
379 
380  bit_reader.Next();
381  }
382  std::fill(values + values_read, values + values_read + repeat_batch, value);
383  values_read += repeat_batch;
384  } else if (literal_count_ > 0) {
385  int literal_batch = std::min(batch_size - values_read - remaining_nulls,
386  static_cast<int>(literal_count_));
387 
388  // Decode the literals
389  constexpr int kBufferSize = 1024;
390  int indices[kBufferSize];
391  literal_batch = std::min(literal_batch, kBufferSize);
392  int actual_read = bit_reader_.GetBatch(bit_width_, &indices[0], literal_batch);
393  DCHECK_EQ(actual_read, literal_batch);
394 
395  int skipped = 0;
396  int literals_read = 1;
397  values[values_read] = dictionary[indices[0]];
398 
399  // Read the first bitset to the end
400  while (literals_read < literal_batch) {
401  if (bit_reader.IsSet()) {
402  values[values_read + literals_read + skipped] =
403  dictionary[indices[literals_read]];
404  literals_read++;
405  } else {
406  skipped++;
407  }
408 
409  bit_reader.Next();
410  }
411  literal_count_ -= literal_batch;
412  values_read += literal_batch + skipped;
413  remaining_nulls -= skipped;
414  }
415  } else {
416  values_read++;
417  remaining_nulls--;
418  }
419  }
420 
421  return values_read;
422 }
423 
424 template <typename T>
425 bool RleDecoder::NextCounts() {
426  // Read the next run's indicator int, it could be a literal or repeated run.
427  // The int is encoded as a vlq-encoded value.
428  int32_t indicator_value = 0;
429  bool result = bit_reader_.GetVlqInt(&indicator_value);
430  if (!result) return false;
431 
432  // lsb indicates if it is a literal run or repeated run
433  bool is_literal = indicator_value & 1;
434  if (is_literal) {
435  literal_count_ = (indicator_value >> 1) * 8;
436  } else {
437  repeat_count_ = indicator_value >> 1;
438  bool result =
439  bit_reader_.GetAligned<T>(static_cast<int>(BitUtil::Ceil(bit_width_, 8)),
440  reinterpret_cast<T*>(&current_value_));
441  DCHECK(result);
442  }
443  return true;
444 }
445 
448 inline bool RleEncoder::Put(uint64_t value) {
449  DCHECK(bit_width_ == 64 || value < (1ULL << bit_width_));
450  if (ARROW_PREDICT_FALSE(buffer_full_)) return false;
451 
452  if (ARROW_PREDICT_TRUE(current_value_ == value)) {
453  ++repeat_count_;
454  if (repeat_count_ > 8) {
455  // This is just a continuation of the current run, no need to buffer the
456  // values.
457  // Note that this is the fast path for long repeated runs.
458  return true;
459  }
460  } else {
461  if (repeat_count_ >= 8) {
462  // We had a run that was long enough but it has ended. Flush the
463  // current repeated run.
465  FlushRepeatedRun();
466  }
467  repeat_count_ = 1;
468  current_value_ = value;
469  }
470 
471  buffered_values_[num_buffered_values_] = value;
472  if (++num_buffered_values_ == 8) {
473  DCHECK_EQ(literal_count_ % 8, 0);
474  FlushBufferedValues(false);
475  }
476  return true;
477 }
478 
479 inline void RleEncoder::FlushLiteralRun(bool update_indicator_byte) {
480  if (literal_indicator_byte_ == NULL) {
481  // The literal indicator byte has not been reserved yet, get one now.
482  literal_indicator_byte_ = bit_writer_.GetNextBytePtr();
483  DCHECK(literal_indicator_byte_ != NULL);
484  }
485 
486  // Write all the buffered values as bit packed literals
487  for (int i = 0; i < num_buffered_values_; ++i) {
488  bool success = bit_writer_.PutValue(buffered_values_[i], bit_width_);
489  DCHECK(success) << "There is a bug in using CheckBufferFull()";
490  }
491  num_buffered_values_ = 0;
492 
493  if (update_indicator_byte) {
494  // At this point we need to write the indicator byte for the literal run.
495  // We only reserve one byte, to allow for streaming writes of literal values.
496  // The logic makes sure we flush literal runs often enough to not overrun
497  // the 1 byte.
498  DCHECK_EQ(literal_count_ % 8, 0);
499  int num_groups = literal_count_ / 8;
500  int32_t indicator_value = (num_groups << 1) | 1;
501  DCHECK_EQ(indicator_value & 0xFFFFFF00, 0);
502  *literal_indicator_byte_ = static_cast<uint8_t>(indicator_value);
503  literal_indicator_byte_ = NULL;
504  literal_count_ = 0;
505  CheckBufferFull();
506  }
507 }
508 
509 inline void RleEncoder::FlushRepeatedRun() {
511  bool result = true;
512  // The lsb of 0 indicates this is a repeated run
513  int32_t indicator_value = repeat_count_ << 1 | 0;
514  result &= bit_writer_.PutVlqInt(indicator_value);
515  result &= bit_writer_.PutAligned(current_value_,
516  static_cast<int>(BitUtil::Ceil(bit_width_, 8)));
517  DCHECK(result);
518  num_buffered_values_ = 0;
519  repeat_count_ = 0;
520  CheckBufferFull();
521 }
522 
525 inline void RleEncoder::FlushBufferedValues(bool done) {
526  if (repeat_count_ >= 8) {
527  // Clear the buffered values. They are part of the repeated run now and we
528  // don't want to flush them out as literals.
529  num_buffered_values_ = 0;
530  if (literal_count_ != 0) {
531  // There was a current literal run. All the values in it have been flushed
532  // but we still need to update the indicator byte.
533  DCHECK_EQ(literal_count_ % 8, 0);
535  FlushLiteralRun(true);
536  }
538  return;
539  }
540 
541  literal_count_ += num_buffered_values_;
542  DCHECK_EQ(literal_count_ % 8, 0);
543  int num_groups = literal_count_ / 8;
544  if (num_groups + 1 >= (1 << 6)) {
545  // We need to start a new literal run because the indicator byte we've reserved
546  // cannot store more values.
547  DCHECK(literal_indicator_byte_ != NULL);
548  FlushLiteralRun(true);
549  } else {
550  FlushLiteralRun(done);
551  }
552  repeat_count_ = 0;
553 }
554 
555 inline int RleEncoder::Flush() {
556  if (literal_count_ > 0 || repeat_count_ > 0 || num_buffered_values_ > 0) {
557  bool all_repeat = literal_count_ == 0 && (repeat_count_ == num_buffered_values_ ||
558  num_buffered_values_ == 0);
559  // There is something pending, figure out if it's a repeated or literal run
560  if (repeat_count_ > 0 && all_repeat) {
561  FlushRepeatedRun();
562  } else {
563  DCHECK_EQ(literal_count_ % 8, 0);
564  // Buffer the last group of literals to 8 by padding with 0s.
565  for (; num_buffered_values_ != 0 && num_buffered_values_ < 8;
566  ++num_buffered_values_) {
567  buffered_values_[num_buffered_values_] = 0;
568  }
569  literal_count_ += num_buffered_values_;
570  FlushLiteralRun(true);
571  repeat_count_ = 0;
572  }
573  }
574  bit_writer_.Flush();
575  DCHECK_EQ(num_buffered_values_, 0);
578 
579  return bit_writer_.bytes_written();
580 }
581 
582 inline void RleEncoder::CheckBufferFull() {
583  int bytes_written = bit_writer_.bytes_written();
584  if (bytes_written + max_run_byte_size_ > bit_writer_.buffer_len()) {
585  buffer_full_ = true;
586  }
587 }
588 
589 inline void RleEncoder::Clear() {
590  buffer_full_ = false;
591  current_value_ = 0;
592  repeat_count_ = 0;
593  num_buffered_values_ = 0;
594  literal_count_ = 0;
595  literal_indicator_byte_ = NULL;
596  bit_writer_.Clear();
597 }
598 
599 } // namespace arrow
600 
601 #endif // ARROW_UTIL_RLE_ENCODING_H
int Flush()
Flushes any pending values to the underlying buffer.
Definition: rle-encoding.h:555
Utility class to write bit/byte streams.
Definition: bit-stream-utils.h:37
bool GetAligned(int num_bytes, T *v)
Reads a &#39;num_bytes&#39;-sized value from the buffer and stores it in &#39;v&#39;.
Definition: bit-stream-utils.h:356
int GetBatchWithDictSpaced(const T *dictionary, T *values, int batch_size, int null_count, const uint8_t *valid_bits, int64_t valid_bits_offset)
Like GetBatchWithDict but add spacing for null entries.
Definition: rle-encoding.h:348
uint32_t repeat_count_
Definition: rle-encoding.h:133
void Reset(const uint8_t *buffer, int buffer_len, int bit_width)
Definition: rle-encoding.h:100
void Reset(const uint8_t *buffer, int buffer_len)
Definition: bit-stream-utils.h:114
Utility class to read bit/byte stream.
Definition: bit-stream-utils.h:103
#define ARROW_PREDICT_TRUE(x)
Definition: macros.h:49
RleEncoder(uint8_t *buffer, int buffer_len, int bit_width)
buffer/buffer_len: preallocated output buffer.
Definition: rle-encoding.h:157
int GetBatch(int num_bits, T *v, int batch_size)
Get a number of values from the buffer. Return the number of values actually read.
Definition: bit-stream-utils.h:278
#define DCHECK_GT(val1, val2)
Definition: logging.h:84
uint32_t literal_count_
Definition: rle-encoding.h:134
#define ARROW_PREDICT_FALSE(x)
Definition: macros.h:48
std::shared_ptr< DataType > dictionary(const std::shared_ptr< DataType > &index_type, const std::shared_ptr< Array > &values, bool ordered=false)
Create an instance of Dictionary type.
uint8_t * buffer()
Returns pointer to underlying buffer.
Definition: rle-encoding.h:211
int bit_width_
Number of bits needed to encode the value. Must be between 0 and 64.
Definition: rle-encoding.h:131
static const int MAX_VLQ_BYTE_LEN
Maximum byte length of a vlq encoded int.
Definition: bit-stream-utils.h:155
RleDecoder(const uint8_t *buffer, int buffer_len, int bit_width)
Create a decoder object.
Definition: rle-encoding.h:88
Class to incrementally build the rle data.
Definition: rle-encoding.h:149
Utility classes to do run length encoding (RLE) for fixed bit width values.
Definition: rle-encoding.h:84
Top-level namespace for Apache Arrow C++ API.
Definition: allocator.h:29
#define DCHECK_EQ(val1, val2)
Definition: logging.h:79
static int MinBufferSize(int bit_width)
Returns the minimum buffer size needed to use the encoder for &#39;bit_width&#39; This is the maximum length ...
Definition: rle-encoding.h:169
void Clear()
Resets all the state in the encoder.
Definition: rle-encoding.h:589
#define DCHECK(condition)
Definition: logging.h:78
int32_t len()
Definition: rle-encoding.h:212
RleDecoder()
Definition: rle-encoding.h:98
BitReader bit_reader_
Definition: rle-encoding.h:129
bool Get(T *val)
Gets the next value. Returns false if there are no more.
Definition: rle-encoding.h:279
#define DCHECK_LE(val1, val2)
Definition: logging.h:81
int GetBatch(T *values, int batch_size)
Gets a batch of values. Returns the number of decoded elements.
Definition: rle-encoding.h:284
uint64_t current_value_
Definition: rle-encoding.h:132
int GetBatchWithDict(const T *dictionary, T *values, int batch_size)
Like GetBatch but the values are then decoded using the provided dictionary.
Definition: rle-encoding.h:313
bool Put(uint64_t value)
Encode value.
Definition: rle-encoding.h:448
#define DCHECK_GE(val1, val2)
Definition: logging.h:83
bool GetVlqInt(int32_t *v)
Reads a vlq encoded int from the stream.
Definition: bit-stream-utils.h:378
static int MaxBufferSize(int bit_width, int num_values)
Returns the maximum byte size it could take to encode &#39;num_values&#39;.
Definition: rle-encoding.h:180