Skip to main content

arrow_data/transform/
utils.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use arrow_buffer::{ArrowNativeType, MutableBuffer, bit_util};
19use arrow_schema::ArrowError;
20use num_integer::Integer;
21use num_traits::CheckedAdd;
22
23/// extends the `buffer` to be able to hold `len` bits, setting all bits of the new size to zero.
24#[inline]
25pub(super) fn resize_for_bits(buffer: &mut MutableBuffer, len: usize) {
26    let needed_bytes = bit_util::ceil(len, 8);
27    if buffer.len() < needed_bytes {
28        buffer.resize(needed_bytes, 0);
29    }
30}
31
32/// Extends `buffer` with the re-based offsets from `offsets`, returning an error on overflow.
33pub(super) fn try_extend_offsets<T: ArrowNativeType + Integer + CheckedAdd>(
34    buffer: &mut MutableBuffer,
35    mut last_offset: T,
36    offsets: &[T],
37) -> Result<(), ArrowError> {
38    buffer.reserve(std::mem::size_of_val(offsets));
39    // Snapshot the length so we can roll back partial writes on overflow.
40    let original_len = buffer.len();
41    for window in offsets.windows(2) {
42        let length = window[1] - window[0];
43        match last_offset.checked_add(&length) {
44            Some(new_offset) => {
45                last_offset = new_offset;
46                buffer.push(last_offset);
47            }
48            None => {
49                // Restore the buffer to its state before this call so the
50                // caller is not left with a partially-written offset sequence.
51                buffer.resize(original_len, 0);
52                return Err(ArrowError::InvalidArgumentError(
53                    "offset overflow: data exceeds the capacity of the offset type. \
54                     Try splitting into smaller batches or using a larger type \
55                     (e.g. LargeStringArray / LargeBinaryArray instead of StringArray / BinaryArray)"
56                        .to_string(),
57                ));
58            }
59        }
60    }
61    Ok(())
62}
63
64#[inline]
65pub(super) unsafe fn get_last_offset<T: ArrowNativeType>(offset_buffer: &MutableBuffer) -> T {
66    // JUSTIFICATION
67    //  Benefit
68    //      20% performance improvement extend of variable sized arrays (see bench `mutable_array`)
69    //  Soundness
70    //      * offset buffer is always extended in slices of T and aligned accordingly.
71    //      * Buffer[0] is initialized with one element, 0, and thus `mutable_offsets.len() - 1` is always valid.
72    let (prefix, offsets, suffix) = unsafe { offset_buffer.as_slice().align_to::<T>() };
73    debug_assert!(prefix.is_empty() && suffix.is_empty());
74    *unsafe { offsets.get_unchecked(offsets.len() - 1) }
75}
76
77#[cfg(test)]
78mod tests {
79    use crate::transform::utils::try_extend_offsets;
80    use arrow_buffer::MutableBuffer;
81
82    #[test]
83    fn test_overflow_returns_error() {
84        let mut buffer = MutableBuffer::new(10);
85        let err = try_extend_offsets(&mut buffer, i32::MAX - 4, &[0i32, 5]).unwrap_err();
86        assert!(
87            err.to_string().contains("offset overflow"),
88            "unexpected error: {err}"
89        );
90    }
91
92    #[test]
93    fn test_overflow_restores_buffer() {
94        // Pre-populate the buffer with a known-good offset so we can verify
95        // it is unchanged after a failed extend.
96        let mut buffer = MutableBuffer::new(16);
97        buffer.push(0i32);
98        buffer.push(10i32);
99        let len_before = buffer.len();
100
101        // Offsets [0, 3, i32::MAX]: the second window (3 → i32::MAX) will overflow
102        // because last_offset (i32::MAX - 4 + 3 = i32::MAX - 1) + (i32::MAX - 3) overflows.
103        // Use a simpler case: start near MAX so the very first window overflows.
104        let err = try_extend_offsets(&mut buffer, i32::MAX - 2, &[0i32, 5]).unwrap_err();
105        assert!(
106            err.to_string().contains("offset overflow"),
107            "unexpected error: {err}"
108        );
109        // Buffer must be exactly as it was before the failed call.
110        assert_eq!(
111            buffer.len(),
112            len_before,
113            "buffer length changed after overflow rollback"
114        );
115    }
116}