1use std::sync::Arc;
21
22use rand::{
23 Rng,
24 distr::uniform::{SampleRange, SampleUniform},
25};
26
27use crate::array::*;
28use crate::error::{ArrowError, Result};
29use crate::{
30 buffer::{Buffer, MutableBuffer},
31 datatypes::*,
32};
33
34use super::{bench_util::*, bit_util, test_util::seedable_rng};
35
36pub fn create_random_batch(
38 schema: SchemaRef,
39 size: usize,
40 null_density: f32,
41 true_density: f32,
42) -> Result<RecordBatch> {
43 let columns = schema
44 .fields()
45 .iter()
46 .map(|field| create_random_array(field, size, null_density, true_density))
47 .collect::<Result<Vec<ArrayRef>>>()?;
48
49 RecordBatch::try_new_with_options(
50 schema,
51 columns,
52 &RecordBatchOptions::new().with_match_field_names(false),
53 )
54}
55
56pub fn create_random_array(
67 field: &Field,
68 size: usize,
69 mut null_density: f32,
70 true_density: f32,
71) -> Result<ArrayRef> {
72 if !field.data_type().is_nested() && !matches!(field.data_type(), Dictionary(_, _)) {
76 null_density = match field.is_nullable() {
78 true => null_density,
79 false => 0.0,
80 };
81 }
82
83 use DataType::*;
84 let array = match field.data_type() {
85 Null => Arc::new(NullArray::new(size)) as ArrayRef,
86 Boolean => Arc::new(create_boolean_array(size, null_density, true_density)),
87 Int8 => Arc::new(create_primitive_array::<Int8Type>(size, null_density)),
88 Int16 => Arc::new(create_primitive_array::<Int16Type>(size, null_density)),
89 Int32 => Arc::new(create_primitive_array::<Int32Type>(size, null_density)),
90 Int64 => Arc::new(create_primitive_array::<Int64Type>(size, null_density)),
91 UInt8 => Arc::new(create_primitive_array::<UInt8Type>(size, null_density)),
92 UInt16 => Arc::new(create_primitive_array::<UInt16Type>(size, null_density)),
93 UInt32 => Arc::new(create_primitive_array::<UInt32Type>(size, null_density)),
94 UInt64 => Arc::new(create_primitive_array::<UInt64Type>(size, null_density)),
95 Float16 => Arc::new(create_primitive_array::<Float16Type>(size, null_density)),
96 Float32 => Arc::new(create_primitive_array::<Float32Type>(size, null_density)),
97 Float64 => Arc::new(create_primitive_array::<Float64Type>(size, null_density)),
98 Timestamp(unit, tz) => match unit {
99 TimeUnit::Second => Arc::new(
100 create_random_temporal_array::<TimestampSecondType>(size, null_density)
101 .with_timezone_opt(tz.clone()),
102 ) as ArrayRef,
103 TimeUnit::Millisecond => Arc::new(
104 create_random_temporal_array::<TimestampMillisecondType>(size, null_density)
105 .with_timezone_opt(tz.clone()),
106 ),
107 TimeUnit::Microsecond => Arc::new(
108 create_random_temporal_array::<TimestampMicrosecondType>(size, null_density)
109 .with_timezone_opt(tz.clone()),
110 ),
111 TimeUnit::Nanosecond => Arc::new(
112 create_random_temporal_array::<TimestampNanosecondType>(size, null_density)
113 .with_timezone_opt(tz.clone()),
114 ),
115 },
116 Date32 => Arc::new(create_random_temporal_array::<Date32Type>(
117 size,
118 null_density,
119 )),
120 Date64 => Arc::new(create_random_temporal_array::<Date64Type>(
121 size,
122 null_density,
123 )),
124 Time32(unit) => match unit {
125 TimeUnit::Second => Arc::new(create_random_temporal_array::<Time32SecondType>(
126 size,
127 null_density,
128 )) as ArrayRef,
129 TimeUnit::Millisecond => Arc::new(
130 create_random_temporal_array::<Time32MillisecondType>(size, null_density),
131 ),
132 _ => {
133 return Err(ArrowError::InvalidArgumentError(format!(
134 "Unsupported unit {unit:?} for Time32"
135 )));
136 }
137 },
138 Time64(unit) => match unit {
139 TimeUnit::Microsecond => Arc::new(
140 create_random_temporal_array::<Time64MicrosecondType>(size, null_density),
141 ) as ArrayRef,
142 TimeUnit::Nanosecond => Arc::new(create_random_temporal_array::<Time64NanosecondType>(
143 size,
144 null_density,
145 )),
146 _ => {
147 return Err(ArrowError::InvalidArgumentError(format!(
148 "Unsupported unit {unit:?} for Time64"
149 )));
150 }
151 },
152 Utf8 => Arc::new(create_string_array::<i32>(size, null_density)),
153 LargeUtf8 => Arc::new(create_string_array::<i64>(size, null_density)),
154 Utf8View => Arc::new(create_string_view_array_with_len(
155 size,
156 null_density,
157 4,
158 false,
159 )),
160 Binary => Arc::new(create_binary_array::<i32>(size, null_density)),
161 LargeBinary => Arc::new(create_binary_array::<i64>(size, null_density)),
162 FixedSizeBinary(len) => Arc::new(create_fsb_array(size, null_density, *len as usize)),
163 BinaryView => Arc::new(
164 create_string_view_array_with_len(size, null_density, 4, false).to_binary_view(),
165 ),
166 List(_) => create_random_list_array(field, size, null_density, true_density)?,
167 LargeList(_) => create_random_list_array(field, size, null_density, true_density)?,
168 Struct(_) => create_random_struct_array(field, size, null_density, true_density)?,
169 d @ Dictionary(_, value_type) if crate::compute::can_cast_types(value_type, d) => {
170 let f = Field::new(
171 field.name(),
172 value_type.as_ref().clone(),
173 field.is_nullable(),
174 );
175 let v = create_random_array(&f, size, null_density, true_density)?;
176 crate::compute::cast(&v, d)?
177 }
178 Map(_, _) => create_random_map_array(field, size, null_density, true_density)?,
179 Decimal128(_, _) => create_random_decimal_array(field, size, null_density)?,
180 Decimal256(_, _) => create_random_decimal_array(field, size, null_density)?,
181 other => {
182 return Err(ArrowError::NotYetImplemented(format!(
183 "Generating random arrays not yet implemented for {other:?}"
184 )));
185 }
186 };
187
188 if !field.is_nullable() {
189 assert_eq!(array.null_count(), 0);
190 }
191
192 Ok(array)
193}
194
195#[inline]
196fn create_random_decimal_array(field: &Field, size: usize, null_density: f32) -> Result<ArrayRef> {
197 let mut rng = seedable_rng();
198
199 match field.data_type() {
200 DataType::Decimal128(precision, scale) => {
201 let values = (0..size)
202 .map(|_| {
203 if rng.random::<f32>() < null_density {
204 None
205 } else {
206 Some(rng.random::<i128>())
207 }
208 })
209 .collect::<Vec<_>>();
210 Ok(Arc::new(
211 Decimal128Array::from(values).with_precision_and_scale(*precision, *scale)?,
212 ))
213 }
214 DataType::Decimal256(precision, scale) => {
215 let values = (0..size)
216 .map(|_| {
217 if rng.random::<f32>() < null_density {
218 None
219 } else {
220 Some(i256::from_parts(rng.random::<u128>(), rng.random::<i128>()))
221 }
222 })
223 .collect::<Vec<_>>();
224 Ok(Arc::new(
225 Decimal256Array::from(values).with_precision_and_scale(*precision, *scale)?,
226 ))
227 }
228 _ => Err(ArrowError::InvalidArgumentError(format!(
229 "Cannot create decimal array for field {field}"
230 ))),
231 }
232}
233
234#[inline]
235fn create_random_list_array(
236 field: &Field,
237 size: usize,
238 null_density: f32,
239 true_density: f32,
240) -> Result<ArrayRef> {
241 let list_null_density = match field.is_nullable() {
243 true => null_density,
244 false => 0.0,
245 };
246 let list_field;
247 let (offsets, child_len) = match field.data_type() {
248 DataType::List(f) => {
249 let (offsets, child_len) = create_random_offsets::<i32>(size, 0, 5);
250 list_field = f;
251 (Buffer::from(offsets.to_byte_slice()), child_len as usize)
252 }
253 DataType::LargeList(f) => {
254 let (offsets, child_len) = create_random_offsets::<i64>(size, 0, 5);
255 list_field = f;
256 (Buffer::from(offsets.to_byte_slice()), child_len as usize)
257 }
258 _ => {
259 return Err(ArrowError::InvalidArgumentError(format!(
260 "Cannot create list array for field {field}"
261 )));
262 }
263 };
264
265 let child_array = create_random_array(list_field, child_len, null_density, true_density)?;
267 let child_data = child_array.to_data();
268 let null_buffer = match field.is_nullable() {
270 true => Some(create_random_null_buffer(size, list_null_density)),
271 false => None,
272 };
273 let list_data = unsafe {
274 ArrayData::new_unchecked(
275 field.data_type().clone(),
276 size,
277 None,
278 null_buffer,
279 0,
280 vec![offsets],
281 vec![child_data],
282 )
283 };
284 Ok(make_array(list_data))
285}
286
287#[inline]
288fn create_random_struct_array(
289 field: &Field,
290 size: usize,
291 null_density: f32,
292 true_density: f32,
293) -> Result<ArrayRef> {
294 let struct_fields = match field.data_type() {
295 DataType::Struct(fields) => fields,
296 _ => {
297 return Err(ArrowError::InvalidArgumentError(format!(
298 "Cannot create struct array for field {field}"
299 )));
300 }
301 };
302
303 let child_arrays = struct_fields
304 .iter()
305 .map(|struct_field| create_random_array(struct_field, size, null_density, true_density))
306 .collect::<Result<Vec<_>>>()?;
307
308 let null_buffer = match field.is_nullable() {
309 true => {
310 let nulls = arrow_buffer::BooleanBuffer::new(
311 create_random_null_buffer(size, null_density),
312 0,
313 size,
314 );
315 Some(nulls.into())
316 }
317 false => None,
318 };
319
320 Ok(Arc::new(StructArray::try_new(
321 struct_fields.clone(),
322 child_arrays,
323 null_buffer,
324 )?))
325}
326
327#[inline]
328fn create_random_map_array(
329 field: &Field,
330 size: usize,
331 null_density: f32,
332 true_density: f32,
333) -> Result<ArrayRef> {
334 let map_null_density = match field.is_nullable() {
336 true => null_density,
337 false => 0.0,
338 };
339
340 let entries_field = match field.data_type() {
341 DataType::Map(f, _) => f,
342 _ => {
343 return Err(ArrowError::InvalidArgumentError(format!(
344 "Cannot create map array for field {field:?}"
345 )));
346 }
347 };
348
349 let (offsets, child_len) = create_random_offsets::<i32>(size, 0, 5);
350 let offsets = Buffer::from(offsets.to_byte_slice());
351
352 let entries = create_random_array(
353 entries_field,
354 child_len as usize,
355 null_density,
356 true_density,
357 )?
358 .to_data();
359
360 let null_buffer = match field.is_nullable() {
361 true => Some(create_random_null_buffer(size, map_null_density)),
362 false => None,
363 };
364
365 let map_data = unsafe {
366 ArrayData::new_unchecked(
367 field.data_type().clone(),
368 size,
369 None,
370 null_buffer,
371 0,
372 vec![offsets],
373 vec![entries],
374 )
375 };
376 Ok(make_array(map_data))
377}
378
379fn create_random_offsets<T: OffsetSizeTrait + SampleUniform>(
381 size: usize,
382 min: T,
383 max: T,
384) -> (Vec<T>, T) {
385 let rng = &mut seedable_rng();
386
387 let mut current_offset = T::zero();
388
389 let mut offsets = Vec::with_capacity(size + 1);
390 offsets.push(current_offset);
391
392 (0..size).for_each(|_| {
393 current_offset += rng.random_range(min..max);
394 offsets.push(current_offset);
395 });
396
397 (offsets, current_offset)
398}
399
400fn create_random_null_buffer(size: usize, null_density: f32) -> Buffer {
401 let mut rng = seedable_rng();
402 let mut mut_buf = MutableBuffer::new_null(size);
403 {
404 let mut_slice = mut_buf.as_slice_mut();
405 (0..size).for_each(|i| {
406 if rng.random::<f32>() >= null_density {
407 bit_util::set_bit(mut_slice, i)
408 }
409 })
410 };
411 mut_buf.into()
412}
413
414pub trait RandomTemporalValue: ArrowTemporalType {
417 fn value_range() -> impl SampleRange<Self::Native>;
419
420 fn gen_range<R: Rng>(rng: &mut R) -> Self::Native
422 where
423 Self::Native: SampleUniform,
424 {
425 rng.random_range(Self::value_range())
426 }
427
428 fn random<R: Rng>(rng: &mut R) -> Self::Native
430 where
431 Self::Native: SampleUniform,
432 {
433 Self::gen_range(rng)
434 }
435}
436
437impl RandomTemporalValue for TimestampSecondType {
438 fn value_range() -> impl SampleRange<Self::Native> {
441 0..60 * 60 * 24 * 365 * 100
442 }
443}
444
445impl RandomTemporalValue for TimestampMillisecondType {
446 fn value_range() -> impl SampleRange<Self::Native> {
449 0..1_000 * 60 * 60 * 24 * 365 * 100
450 }
451}
452
453impl RandomTemporalValue for TimestampMicrosecondType {
454 fn value_range() -> impl SampleRange<Self::Native> {
457 0..1_000 * 1_000 * 60 * 60 * 24 * 365 * 100
458 }
459}
460
461impl RandomTemporalValue for TimestampNanosecondType {
462 fn value_range() -> impl SampleRange<Self::Native> {
465 0..1_000 * 1_000 * 1_000 * 60 * 60 * 24 * 365 * 100
466 }
467}
468
469impl RandomTemporalValue for Date32Type {
470 fn value_range() -> impl SampleRange<Self::Native> {
473 0..365 * 100
474 }
475}
476
477impl RandomTemporalValue for Date64Type {
478 fn value_range() -> impl SampleRange<Self::Native> {
481 0..1_000 * 60 * 60 * 24 * 365 * 100
482 }
483}
484
485impl RandomTemporalValue for Time32SecondType {
486 fn value_range() -> impl SampleRange<Self::Native> {
489 0..60 * 60 * 24
490 }
491}
492
493impl RandomTemporalValue for Time32MillisecondType {
494 fn value_range() -> impl SampleRange<Self::Native> {
497 0..1_000 * 60 * 60 * 24
498 }
499}
500
501impl RandomTemporalValue for Time64MicrosecondType {
502 fn value_range() -> impl SampleRange<Self::Native> {
505 0..1_000 * 1_000 * 60 * 60 * 24
506 }
507}
508
509impl RandomTemporalValue for Time64NanosecondType {
510 fn value_range() -> impl SampleRange<Self::Native> {
513 0..1_000 * 1_000 * 1_000 * 60 * 60 * 24
514 }
515}
516
517fn create_random_temporal_array<T>(size: usize, null_density: f32) -> PrimitiveArray<T>
518where
519 T: RandomTemporalValue,
520 <T as ArrowPrimitiveType>::Native: SampleUniform,
521{
522 let mut rng = seedable_rng();
523
524 (0..size)
525 .map(|_| {
526 if rng.random::<f32>() < null_density {
527 None
528 } else {
529 Some(T::random(&mut rng))
530 }
531 })
532 .collect()
533}
534
535#[cfg(test)]
536mod tests {
537 use super::*;
538
539 #[test]
540 fn test_create_batch() {
541 let size = 32;
542 let fields = vec![
543 Field::new("a", DataType::Int32, true),
544 Field::new("f16", DataType::Float16, true),
545 Field::new(
546 "timestamp_without_timezone",
547 DataType::Timestamp(TimeUnit::Nanosecond, None),
548 true,
549 ),
550 Field::new(
551 "timestamp_with_timezone",
552 DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
553 true,
554 ),
555 ];
556 let schema = Schema::new(fields);
557 let schema_ref = Arc::new(schema);
558 let batch = create_random_batch(schema_ref.clone(), size, 0.35, 0.7).unwrap();
559
560 assert_eq!(batch.schema(), schema_ref);
561 assert_eq!(batch.num_columns(), schema_ref.fields().len());
562 for array in batch.columns() {
563 assert_eq!(array.len(), size);
564 }
565 }
566
567 #[test]
568 fn test_create_batch_non_null() {
569 let size = 32;
570 let fields = vec![
571 Field::new("a", DataType::Int32, false),
572 Field::new(
573 "b",
574 DataType::List(Arc::new(Field::new_list_field(DataType::LargeUtf8, false))),
575 false,
576 ),
577 Field::new("a", DataType::Int32, false),
578 ];
579 let schema = Schema::new(fields);
580 let schema_ref = Arc::new(schema);
581 let batch = create_random_batch(schema_ref.clone(), size, 0.35, 0.7).unwrap();
582
583 assert_eq!(batch.schema(), schema_ref);
584 assert_eq!(batch.num_columns(), schema_ref.fields().len());
585 for array in batch.columns() {
586 assert_eq!(array.null_count(), 0);
587 assert_eq!(array.logical_null_count(), 0);
588 }
589 let b_array = batch.column(1);
591 let list_array = b_array.as_list::<i32>();
592 let child_array = list_array.values();
593 assert_eq!(child_array.null_count(), 0);
594 assert!(child_array.len() > list_array.len());
596 }
597
598 #[test]
599 fn test_create_struct_array() {
600 let size = 32;
601 let struct_fields = Fields::from(vec![
602 Field::new("b", DataType::Boolean, true),
603 Field::new(
604 "c",
605 DataType::LargeList(Arc::new(Field::new_list_field(
606 DataType::List(Arc::new(Field::new_list_field(
607 DataType::FixedSizeBinary(6),
608 true,
609 ))),
610 false,
611 ))),
612 true,
613 ),
614 Field::new(
615 "d",
616 DataType::Struct(Fields::from(vec![
617 Field::new("d_x", DataType::Int32, true),
618 Field::new("d_y", DataType::Float32, false),
619 Field::new("d_z", DataType::Binary, true),
620 ])),
621 true,
622 ),
623 ]);
624 let field = Field::new("struct", DataType::Struct(struct_fields), true);
625 let array = create_random_array(&field, size, 0.2, 0.5).unwrap();
626
627 assert_eq!(array.len(), 32);
628 let struct_array = array.as_any().downcast_ref::<StructArray>().unwrap();
629 assert_eq!(struct_array.columns().len(), 3);
630
631 let col_c = struct_array.column_by_name("c").unwrap();
634 let col_c = col_c.as_any().downcast_ref::<LargeListArray>().unwrap();
635 assert_eq!(col_c.len(), size);
636 let col_c_list = col_c.values().as_list::<i32>();
637 assert!(col_c_list.len() > size);
638 let fsb = col_c_list.values();
640 assert_eq!(fsb.data_type(), &DataType::FixedSizeBinary(6));
641 assert!(fsb.len() > col_c_list.len());
642
643 let col_d = struct_array.column_by_name("d").unwrap();
645 let col_d = col_d.as_any().downcast_ref::<StructArray>().unwrap();
646 let col_d_y = col_d.column_by_name("d_y").unwrap();
647 assert_eq!(col_d_y.data_type(), &DataType::Float32);
648 assert_eq!(col_d_y.null_count(), 0);
649 }
650
651 #[test]
652 fn test_create_list_array_nested_nullability() {
653 let list_field = Field::new_list(
654 "not_null_list",
655 Field::new_list_field(DataType::Boolean, true),
656 false,
657 );
658
659 let list_array = create_random_array(&list_field, 100, 0.95, 0.5).unwrap();
660
661 assert_eq!(list_array.null_count(), 0);
662 assert!(list_array.as_list::<i32>().values().null_count() > 0);
663 }
664
665 #[test]
666 fn test_create_struct_array_nested_nullability() {
667 let struct_child_fields = vec![
668 Field::new("null_int", DataType::Int32, true),
669 Field::new("int", DataType::Int32, false),
670 ];
671 let struct_field = Field::new_struct("not_null_struct", struct_child_fields, false);
672
673 let struct_array = create_random_array(&struct_field, 100, 0.95, 0.5).unwrap();
674
675 assert_eq!(struct_array.null_count(), 0);
676 assert!(
677 struct_array
678 .as_struct()
679 .column_by_name("null_int")
680 .unwrap()
681 .null_count()
682 > 0
683 );
684 assert_eq!(
685 struct_array
686 .as_struct()
687 .column_by_name("int")
688 .unwrap()
689 .null_count(),
690 0
691 );
692 }
693
694 #[test]
695 fn test_create_list_array_nested_struct_nullability() {
696 let struct_child_fields = vec![
697 Field::new("null_int", DataType::Int32, true),
698 Field::new("int", DataType::Int32, false),
699 ];
700 let list_item_field =
701 Field::new_list_field(DataType::Struct(struct_child_fields.into()), true);
702 let list_field = Field::new_list("not_null_list", list_item_field, false);
703
704 let list_array = create_random_array(&list_field, 100, 0.95, 0.5).unwrap();
705
706 assert_eq!(list_array.null_count(), 0);
707 assert!(list_array.as_list::<i32>().values().null_count() > 0);
708 assert!(
709 list_array
710 .as_list::<i32>()
711 .values()
712 .as_struct()
713 .column_by_name("null_int")
714 .unwrap()
715 .null_count()
716 > 0
717 );
718 assert_eq!(
719 list_array
720 .as_list::<i32>()
721 .values()
722 .as_struct()
723 .column_by_name("int")
724 .unwrap()
725 .null_count(),
726 0
727 );
728 }
729
730 #[test]
731 fn test_create_map_array() {
732 let map_field = Field::new_map(
733 "map",
734 "entries",
735 Field::new("key", DataType::Utf8, false),
736 Field::new("value", DataType::Utf8, true),
737 false,
738 false,
739 );
740 let array = create_random_array(&map_field, 100, 0.8, 0.5).unwrap();
741
742 assert_eq!(array.len(), 100);
743 assert_eq!(array.null_count(), 0);
745 assert_eq!(array.logical_null_count(), 0);
746 assert!(array.as_map().keys().len() > array.len());
748 assert!(array.as_map().values().len() > array.len());
749 assert_eq!(array.as_map().keys().null_count(), 0);
751 assert!(array.as_map().values().null_count() > 0);
753
754 assert_eq!(array.as_map().keys().data_type(), &DataType::Utf8);
755 assert_eq!(array.as_map().values().data_type(), &DataType::Utf8);
756 }
757
758 #[test]
759 fn test_create_decimal_array() {
760 let size = 10;
761 let fields = vec![
762 Field::new("a", DataType::Decimal128(10, -2), true),
763 Field::new("b", DataType::Decimal256(10, -2), true),
764 ];
765 let schema = Schema::new(fields);
766 let schema_ref = Arc::new(schema);
767 let batch = create_random_batch(schema_ref.clone(), size, 0.35, 0.7).unwrap();
768
769 assert_eq!(batch.schema(), schema_ref);
770 assert_eq!(batch.num_columns(), schema_ref.fields().len());
771 for array in batch.columns() {
772 assert_eq!(array.len(), size);
773 }
774 }
775
776 #[test]
777 fn create_non_nullable_decimal_array_with_null_density() {
778 let size = 10;
779 let fields = vec![
780 Field::new("a", DataType::Decimal128(10, -2), false),
781 Field::new("b", DataType::Decimal256(10, -2), false),
782 ];
783 let schema = Schema::new(fields);
784 let schema_ref = Arc::new(schema);
785 let batch = create_random_batch(schema_ref.clone(), size, 0.35, 0.7).unwrap();
786
787 assert_eq!(batch.schema(), schema_ref);
788 assert_eq!(batch.num_columns(), schema_ref.fields().len());
789 for array in batch.columns() {
790 assert_eq!(array.len(), size);
791 assert_eq!(array.null_count(), 0);
792 }
793 }
794}