arrow_data/transform/utils.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use arrow_buffer::{ArrowNativeType, MutableBuffer, bit_util};
19use arrow_schema::ArrowError;
20use num_integer::Integer;
21use num_traits::CheckedAdd;
22
23/// extends the `buffer` to be able to hold `len` bits, setting all bits of the new size to zero.
24#[inline]
25pub(super) fn resize_for_bits(buffer: &mut MutableBuffer, len: usize) {
26 let needed_bytes = bit_util::ceil(len, 8);
27 if buffer.len() < needed_bytes {
28 buffer.resize(needed_bytes, 0);
29 }
30}
31
32/// Extends `buffer` with the re-based offsets from `offsets`, returning an error on overflow.
33pub(super) fn try_extend_offsets<T: ArrowNativeType + Integer + CheckedAdd>(
34 buffer: &mut MutableBuffer,
35 mut last_offset: T,
36 offsets: &[T],
37) -> Result<(), ArrowError> {
38 buffer.reserve(std::mem::size_of_val(offsets));
39 // Snapshot the length so we can roll back partial writes on overflow.
40 let original_len = buffer.len();
41 for window in offsets.windows(2) {
42 let length = window[1] - window[0];
43 match last_offset.checked_add(&length) {
44 Some(new_offset) => {
45 last_offset = new_offset;
46 buffer.push(last_offset);
47 }
48 None => {
49 // Restore the buffer to its state before this call so the
50 // caller is not left with a partially-written offset sequence.
51 buffer.resize(original_len, 0);
52 return Err(ArrowError::InvalidArgumentError(
53 "offset overflow: data exceeds the capacity of the offset type. \
54 Try splitting into smaller batches or using a larger type \
55 (e.g. LargeStringArray / LargeBinaryArray instead of StringArray / BinaryArray)"
56 .to_string(),
57 ));
58 }
59 }
60 }
61 Ok(())
62}
63
64#[inline]
65pub(super) unsafe fn get_last_offset<T: ArrowNativeType>(offset_buffer: &MutableBuffer) -> T {
66 // JUSTIFICATION
67 // Benefit
68 // 20% performance improvement extend of variable sized arrays (see bench `mutable_array`)
69 // Soundness
70 // * offset buffer is always extended in slices of T and aligned accordingly.
71 // * Buffer[0] is initialized with one element, 0, and thus `mutable_offsets.len() - 1` is always valid.
72 let (prefix, offsets, suffix) = unsafe { offset_buffer.as_slice().align_to::<T>() };
73 debug_assert!(prefix.is_empty() && suffix.is_empty());
74 *unsafe { offsets.get_unchecked(offsets.len() - 1) }
75}
76
77#[cfg(test)]
78mod tests {
79 use crate::transform::utils::try_extend_offsets;
80 use arrow_buffer::MutableBuffer;
81
82 #[test]
83 fn test_overflow_returns_error() {
84 let mut buffer = MutableBuffer::new(10);
85 let err = try_extend_offsets(&mut buffer, i32::MAX - 4, &[0i32, 5]).unwrap_err();
86 assert!(
87 err.to_string().contains("offset overflow"),
88 "unexpected error: {err}"
89 );
90 }
91
92 #[test]
93 fn test_overflow_restores_buffer() {
94 // Pre-populate the buffer with a known-good offset so we can verify
95 // it is unchanged after a failed extend.
96 let mut buffer = MutableBuffer::new(16);
97 buffer.push(0i32);
98 buffer.push(10i32);
99 let len_before = buffer.len();
100
101 // Offsets [0, 3, i32::MAX]: the second window (3 → i32::MAX) will overflow
102 // because last_offset (i32::MAX - 4 + 3 = i32::MAX - 1) + (i32::MAX - 3) overflows.
103 // Use a simpler case: start near MAX so the very first window overflows.
104 let err = try_extend_offsets(&mut buffer, i32::MAX - 2, &[0i32, 5]).unwrap_err();
105 assert!(
106 err.to_string().contains("offset overflow"),
107 "unexpected error: {err}"
108 );
109 // Buffer must be exactly as it was before the failed call.
110 assert_eq!(
111 buffer.len(),
112 len_before,
113 "buffer length changed after overflow rollback"
114 );
115 }
116}