1use std::{any::Any, sync::Arc};
19
20use crate::{ArrayRef, ArrowPrimitiveType, RunArray, types::RunEndIndexType};
21
22use super::{ArrayBuilder, PrimitiveBuilder};
23
24use arrow_buffer::ArrowNativeType;
25
26#[derive(Debug)]
61pub struct PrimitiveRunBuilder<R, V>
62where
63 R: RunEndIndexType,
64 V: ArrowPrimitiveType,
65{
66 run_ends_builder: PrimitiveBuilder<R>,
67 values_builder: PrimitiveBuilder<V>,
68 current_value: Option<V::Native>,
69 current_run_end_index: usize,
70 prev_run_end_index: usize,
71}
72
73impl<R, V> Default for PrimitiveRunBuilder<R, V>
74where
75 R: RunEndIndexType,
76 V: ArrowPrimitiveType,
77{
78 fn default() -> Self {
79 Self::new()
80 }
81}
82
83impl<R, V> PrimitiveRunBuilder<R, V>
84where
85 R: RunEndIndexType,
86 V: ArrowPrimitiveType,
87{
88 pub fn new() -> Self {
90 Self {
91 run_ends_builder: PrimitiveBuilder::new(),
92 values_builder: PrimitiveBuilder::new(),
93 current_value: None,
94 current_run_end_index: 0,
95 prev_run_end_index: 0,
96 }
97 }
98
99 pub fn with_capacity(capacity: usize) -> Self {
103 Self {
104 run_ends_builder: PrimitiveBuilder::with_capacity(capacity),
105 values_builder: PrimitiveBuilder::with_capacity(capacity),
106 current_value: None,
107 current_run_end_index: 0,
108 prev_run_end_index: 0,
109 }
110 }
111
112 pub fn with_data_type(mut self, data_type: arrow_schema::DataType) -> Self {
122 self.values_builder = self.values_builder.with_data_type(data_type);
123 self
124 }
125}
126
127impl<R, V> ArrayBuilder for PrimitiveRunBuilder<R, V>
128where
129 R: RunEndIndexType,
130 V: ArrowPrimitiveType,
131{
132 fn as_any(&self) -> &dyn Any {
134 self
135 }
136
137 fn as_any_mut(&mut self) -> &mut dyn Any {
139 self
140 }
141
142 fn into_box_any(self: Box<Self>) -> Box<dyn Any> {
144 self
145 }
146
147 fn len(&self) -> usize {
150 self.current_run_end_index
151 }
152
153 fn finish(&mut self) -> ArrayRef {
155 Arc::new(self.finish())
156 }
157
158 fn finish_cloned(&self) -> ArrayRef {
160 Arc::new(self.finish_cloned())
161 }
162}
163
164impl<R, V> PrimitiveRunBuilder<R, V>
165where
166 R: RunEndIndexType,
167 V: ArrowPrimitiveType,
168{
169 pub fn append_option(&mut self, value: Option<V::Native>) {
171 if self.current_run_end_index == 0 {
172 self.current_run_end_index = 1;
173 self.current_value = value;
174 return;
175 }
176 if self.current_value != value {
177 self.append_run_end();
178 self.current_value = value;
179 }
180
181 self.current_run_end_index += 1;
182 }
183
184 pub fn append_value(&mut self, value: V::Native) {
186 self.append_option(Some(value))
187 }
188
189 pub fn append_null(&mut self) {
191 self.append_option(None)
192 }
193
194 pub fn finish(&mut self) -> RunArray<R> {
197 self.append_run_end();
199
200 self.current_value = None;
202 self.current_run_end_index = 0;
203
204 let run_ends_array = self.run_ends_builder.finish();
206 let values_array = self.values_builder.finish();
207 RunArray::<R>::try_new(&run_ends_array, &values_array).unwrap()
208 }
209
210 pub fn finish_cloned(&self) -> RunArray<R> {
213 let mut run_ends_array = self.run_ends_builder.finish_cloned();
214 let mut values_array = self.values_builder.finish_cloned();
215
216 if self.prev_run_end_index != self.current_run_end_index {
218 let mut run_end_builder = run_ends_array.into_builder().unwrap();
219 let mut values_builder = values_array.into_builder().unwrap();
220 self.append_run_end_with_builders(&mut run_end_builder, &mut values_builder);
221 run_ends_array = run_end_builder.finish();
222 values_array = values_builder.finish();
223 }
224
225 RunArray::try_new(&run_ends_array, &values_array).unwrap()
226 }
227
228 fn append_run_end(&mut self) {
230 if self.prev_run_end_index == self.current_run_end_index {
232 return;
233 }
234 let run_end_index = self.run_end_index_as_native();
235 self.run_ends_builder.append_value(run_end_index);
236 self.values_builder.append_option(self.current_value);
237 self.prev_run_end_index = self.current_run_end_index;
238 }
239
240 fn append_run_end_with_builders(
243 &self,
244 run_ends_builder: &mut PrimitiveBuilder<R>,
245 values_builder: &mut PrimitiveBuilder<V>,
246 ) {
247 let run_end_index = self.run_end_index_as_native();
248 run_ends_builder.append_value(run_end_index);
249 values_builder.append_option(self.current_value);
250 }
251
252 fn run_end_index_as_native(&self) -> R::Native {
253 R::Native::from_usize(self.current_run_end_index)
254 .unwrap_or_else(|| panic!(
255 "Cannot convert `current_run_end_index` {} from `usize` to native form of arrow datatype {}",
256 self.current_run_end_index,
257 R::DATA_TYPE
258 ))
259 }
260}
261
262impl<R, V> Extend<Option<V::Native>> for PrimitiveRunBuilder<R, V>
263where
264 R: RunEndIndexType,
265 V: ArrowPrimitiveType,
266{
267 fn extend<T: IntoIterator<Item = Option<V::Native>>>(&mut self, iter: T) {
268 for elem in iter {
269 self.append_option(elem);
270 }
271 }
272}
273
274#[cfg(test)]
275mod tests {
276 use arrow_schema::DataType;
277
278 use crate::builder::PrimitiveRunBuilder;
279 use crate::cast::AsArray;
280 use crate::types::{Decimal128Type, Int16Type, TimestampMicrosecondType, UInt32Type};
281 use crate::{Array, Decimal128Array, TimestampMicrosecondArray, UInt32Array};
282
283 #[test]
284 fn test_primitive_ree_array_builder() {
285 let mut builder = PrimitiveRunBuilder::<Int16Type, UInt32Type>::new();
286 builder.append_value(1234);
287 builder.append_value(1234);
288 builder.append_value(1234);
289 builder.append_null();
290 builder.append_value(5678);
291 builder.append_value(5678);
292
293 let array = builder.finish();
294
295 assert_eq!(array.null_count(), 0);
296 assert_eq!(array.logical_null_count(), 1);
297 assert_eq!(array.len(), 6);
298
299 assert_eq!(array.run_ends().values(), &[3, 4, 6]);
300
301 let av = array.values();
302
303 assert!(!av.is_null(0));
304 assert!(av.is_null(1));
305 assert!(!av.is_null(2));
306
307 let ava: &UInt32Array = av.as_primitive::<UInt32Type>();
309
310 assert_eq!(ava, &UInt32Array::from(vec![Some(1234), None, Some(5678)]));
311 }
312
313 #[test]
314 fn test_extend() {
315 let mut builder = PrimitiveRunBuilder::<Int16Type, Int16Type>::new();
316 builder.extend([1, 2, 2, 5, 5, 4, 4].into_iter().map(Some));
317 builder.extend([4, 4, 6, 2].into_iter().map(Some));
318 let array = builder.finish();
319
320 assert_eq!(array.len(), 11);
321 assert_eq!(array.null_count(), 0);
322 assert_eq!(array.logical_null_count(), 0);
323 assert_eq!(array.run_ends().values(), &[1, 3, 5, 9, 10, 11]);
324 assert_eq!(
325 array.values().as_primitive::<Int16Type>().values(),
326 &[1, 2, 5, 4, 6, 2]
327 );
328 }
329
330 #[test]
331 #[should_panic]
332 fn test_override_data_type_invalid() {
333 PrimitiveRunBuilder::<Int16Type, UInt32Type>::new().with_data_type(DataType::UInt64);
334 }
335
336 #[test]
337 fn test_override_data_type() {
338 PrimitiveRunBuilder::<Int16Type, UInt32Type>::new().with_data_type(DataType::UInt32);
340
341 let mut builder = PrimitiveRunBuilder::<Int16Type, Decimal128Type>::new()
343 .with_data_type(DataType::Decimal128(1, 2));
344 builder.append_value(123);
345 let array = builder.finish();
346 let array = array.downcast::<Decimal128Array>().unwrap();
347 let values = array.values();
348 assert_eq!(values.precision(), 1);
349 assert_eq!(values.scale(), 2);
350
351 let mut builder = PrimitiveRunBuilder::<Int16Type, TimestampMicrosecondType>::new()
353 .with_data_type(DataType::Timestamp(
354 arrow_schema::TimeUnit::Microsecond,
355 Some("Europe/Paris".into()),
356 ));
357 builder.append_value(1);
358 let array = builder.finish();
359 let array = array.downcast::<TimestampMicrosecondArray>().unwrap();
360 let values = array.values();
361 assert_eq!(values.timezone(), Some("Europe/Paris"));
362 }
363}