1use std::sync::Arc;
21
22use rand::{
23 distr::uniform::{SampleRange, SampleUniform},
24 Rng,
25};
26
27use crate::array::*;
28use crate::error::{ArrowError, Result};
29use crate::{
30 buffer::{Buffer, MutableBuffer},
31 datatypes::*,
32};
33
34use super::{bench_util::*, bit_util, test_util::seedable_rng};
35
36pub fn create_random_batch(
38 schema: SchemaRef,
39 size: usize,
40 null_density: f32,
41 true_density: f32,
42) -> Result<RecordBatch> {
43 let columns = schema
44 .fields()
45 .iter()
46 .map(|field| create_random_array(field, size, null_density, true_density))
47 .collect::<Result<Vec<ArrayRef>>>()?;
48
49 RecordBatch::try_new_with_options(
50 schema,
51 columns,
52 &RecordBatchOptions::new().with_match_field_names(false),
53 )
54}
55
56pub fn create_random_array(
67 field: &Field,
68 size: usize,
69 null_density: f32,
70 true_density: f32,
71) -> Result<ArrayRef> {
72 let primitive_null_density = match field.is_nullable() {
75 true => null_density,
76 false => 0.0,
77 };
78 use DataType::*;
79 Ok(match field.data_type() {
80 Null => Arc::new(NullArray::new(size)) as ArrayRef,
81 Boolean => Arc::new(create_boolean_array(
82 size,
83 primitive_null_density,
84 true_density,
85 )),
86 Int8 => Arc::new(create_primitive_array::<Int8Type>(
87 size,
88 primitive_null_density,
89 )),
90 Int16 => Arc::new(create_primitive_array::<Int16Type>(
91 size,
92 primitive_null_density,
93 )),
94 Int32 => Arc::new(create_primitive_array::<Int32Type>(
95 size,
96 primitive_null_density,
97 )),
98 Int64 => Arc::new(create_primitive_array::<Int64Type>(
99 size,
100 primitive_null_density,
101 )),
102 UInt8 => Arc::new(create_primitive_array::<UInt8Type>(
103 size,
104 primitive_null_density,
105 )),
106 UInt16 => Arc::new(create_primitive_array::<UInt16Type>(
107 size,
108 primitive_null_density,
109 )),
110 UInt32 => Arc::new(create_primitive_array::<UInt32Type>(
111 size,
112 primitive_null_density,
113 )),
114 UInt64 => Arc::new(create_primitive_array::<UInt64Type>(
115 size,
116 primitive_null_density,
117 )),
118 Float16 => {
119 return Err(ArrowError::NotYetImplemented(
120 "Float16 is not implemented".to_string(),
121 ))
122 }
123 Float32 => Arc::new(create_primitive_array::<Float32Type>(
124 size,
125 primitive_null_density,
126 )),
127 Float64 => Arc::new(create_primitive_array::<Float64Type>(
128 size,
129 primitive_null_density,
130 )),
131 Timestamp(unit, tz) => match unit {
132 TimeUnit::Second => Arc::new(
133 create_random_temporal_array::<TimestampSecondType>(size, primitive_null_density)
134 .with_timezone_opt(tz.clone()),
135 ),
136 TimeUnit::Millisecond => Arc::new(
137 create_random_temporal_array::<TimestampMillisecondType>(
138 size,
139 primitive_null_density,
140 )
141 .with_timezone_opt(tz.clone()),
142 ),
143 TimeUnit::Microsecond => Arc::new(
144 create_random_temporal_array::<TimestampMicrosecondType>(
145 size,
146 primitive_null_density,
147 )
148 .with_timezone_opt(tz.clone()),
149 ),
150 TimeUnit::Nanosecond => Arc::new(
151 create_random_temporal_array::<TimestampNanosecondType>(
152 size,
153 primitive_null_density,
154 )
155 .with_timezone_opt(tz.clone()),
156 ),
157 },
158 Date32 => Arc::new(create_random_temporal_array::<Date32Type>(
159 size,
160 primitive_null_density,
161 )),
162 Date64 => Arc::new(create_random_temporal_array::<Date64Type>(
163 size,
164 primitive_null_density,
165 )),
166 Time32(unit) => match unit {
167 TimeUnit::Second => Arc::new(create_random_temporal_array::<Time32SecondType>(
168 size,
169 primitive_null_density,
170 )) as ArrayRef,
171 TimeUnit::Millisecond => Arc::new(
172 create_random_temporal_array::<Time32MillisecondType>(size, primitive_null_density),
173 ),
174 _ => {
175 return Err(ArrowError::InvalidArgumentError(format!(
176 "Unsupported unit {unit:?} for Time32"
177 )))
178 }
179 },
180 Time64(unit) => match unit {
181 TimeUnit::Microsecond => Arc::new(
182 create_random_temporal_array::<Time64MicrosecondType>(size, primitive_null_density),
183 ) as ArrayRef,
184 TimeUnit::Nanosecond => Arc::new(create_random_temporal_array::<Time64NanosecondType>(
185 size,
186 primitive_null_density,
187 )),
188 _ => {
189 return Err(ArrowError::InvalidArgumentError(format!(
190 "Unsupported unit {unit:?} for Time64"
191 )))
192 }
193 },
194 Utf8 => Arc::new(create_string_array::<i32>(size, primitive_null_density)),
195 LargeUtf8 => Arc::new(create_string_array::<i64>(size, primitive_null_density)),
196 Utf8View => Arc::new(create_string_view_array_with_len(
197 size,
198 primitive_null_density,
199 4,
200 false,
201 )),
202 Binary => Arc::new(create_binary_array::<i32>(size, primitive_null_density)),
203 LargeBinary => Arc::new(create_binary_array::<i64>(size, primitive_null_density)),
204 FixedSizeBinary(len) => Arc::new(create_fsb_array(
205 size,
206 primitive_null_density,
207 *len as usize,
208 )),
209 BinaryView => Arc::new(
210 create_string_view_array_with_len(size, primitive_null_density, 4, false)
211 .to_binary_view(),
212 ),
213 List(_) => create_random_list_array(field, size, null_density, true_density)?,
214 LargeList(_) => create_random_list_array(field, size, null_density, true_density)?,
215 Struct(_) => create_random_struct_array(field, size, null_density, true_density)?,
216 d @ Dictionary(_, value_type) if crate::compute::can_cast_types(value_type, d) => {
217 let f = Field::new(
218 field.name(),
219 value_type.as_ref().clone(),
220 field.is_nullable(),
221 );
222 let v = create_random_array(&f, size, null_density, true_density)?;
223 crate::compute::cast(&v, d)?
224 }
225 Map(_, _) => create_random_map_array(field, size, null_density, true_density)?,
226 Decimal128(_, _) => create_random_decimal_array(field, size, null_density)?,
227 Decimal256(_, _) => create_random_decimal_array(field, size, null_density)?,
228 other => {
229 return Err(ArrowError::NotYetImplemented(format!(
230 "Generating random arrays not yet implemented for {other:?}"
231 )))
232 }
233 })
234}
235
236#[inline]
237fn create_random_decimal_array(field: &Field, size: usize, null_density: f32) -> Result<ArrayRef> {
238 let mut rng = seedable_rng();
239
240 match field.data_type() {
241 DataType::Decimal128(precision, scale) => {
242 let values = (0..size)
243 .map(|_| {
244 if rng.random::<f32>() < null_density {
245 None
246 } else {
247 Some(rng.random::<i128>())
248 }
249 })
250 .collect::<Vec<_>>();
251 Ok(Arc::new(
252 Decimal128Array::from(values).with_precision_and_scale(*precision, *scale)?,
253 ))
254 }
255 DataType::Decimal256(precision, scale) => {
256 let values = (0..size)
257 .map(|_| {
258 if rng.random::<f32>() < null_density {
259 None
260 } else {
261 Some(i256::from_parts(rng.random::<u128>(), rng.random::<i128>()))
262 }
263 })
264 .collect::<Vec<_>>();
265 Ok(Arc::new(
266 Decimal256Array::from(values).with_precision_and_scale(*precision, *scale)?,
267 ))
268 }
269 _ => Err(ArrowError::InvalidArgumentError(format!(
270 "Cannot create decimal array for field {field:?}"
271 ))),
272 }
273}
274
275#[inline]
276fn create_random_list_array(
277 field: &Field,
278 size: usize,
279 null_density: f32,
280 true_density: f32,
281) -> Result<ArrayRef> {
282 let list_null_density = match field.is_nullable() {
284 true => null_density,
285 false => 0.0,
286 };
287 let list_field;
288 let (offsets, child_len) = match field.data_type() {
289 DataType::List(f) => {
290 let (offsets, child_len) = create_random_offsets::<i32>(size, 0, 5);
291 list_field = f;
292 (Buffer::from(offsets.to_byte_slice()), child_len as usize)
293 }
294 DataType::LargeList(f) => {
295 let (offsets, child_len) = create_random_offsets::<i64>(size, 0, 5);
296 list_field = f;
297 (Buffer::from(offsets.to_byte_slice()), child_len as usize)
298 }
299 _ => {
300 return Err(ArrowError::InvalidArgumentError(format!(
301 "Cannot create list array for field {field:?}"
302 )))
303 }
304 };
305
306 let child_array = create_random_array(list_field, child_len, null_density, true_density)?;
308 let child_data = child_array.to_data();
309 let null_buffer = match field.is_nullable() {
311 true => Some(create_random_null_buffer(size, list_null_density)),
312 false => None,
313 };
314 let list_data = unsafe {
315 ArrayData::new_unchecked(
316 field.data_type().clone(),
317 size,
318 None,
319 null_buffer,
320 0,
321 vec![offsets],
322 vec![child_data],
323 )
324 };
325 Ok(make_array(list_data))
326}
327
328#[inline]
329fn create_random_struct_array(
330 field: &Field,
331 size: usize,
332 null_density: f32,
333 true_density: f32,
334) -> Result<ArrayRef> {
335 let struct_fields = match field.data_type() {
336 DataType::Struct(fields) => fields,
337 _ => {
338 return Err(ArrowError::InvalidArgumentError(format!(
339 "Cannot create struct array for field {field:?}"
340 )))
341 }
342 };
343
344 let child_arrays = struct_fields
345 .iter()
346 .map(|struct_field| create_random_array(struct_field, size, null_density, true_density))
347 .collect::<Result<Vec<_>>>()?;
348
349 let null_buffer = match field.is_nullable() {
350 true => {
351 let nulls = arrow_buffer::BooleanBuffer::new(
352 create_random_null_buffer(size, null_density),
353 0,
354 size,
355 );
356 Some(nulls.into())
357 }
358 false => None,
359 };
360
361 Ok(Arc::new(StructArray::try_new(
362 struct_fields.clone(),
363 child_arrays,
364 null_buffer,
365 )?))
366}
367
368#[inline]
369fn create_random_map_array(
370 field: &Field,
371 size: usize,
372 null_density: f32,
373 true_density: f32,
374) -> Result<ArrayRef> {
375 let map_null_density = match field.is_nullable() {
377 true => null_density,
378 false => 0.0,
379 };
380
381 let entries_field = match field.data_type() {
382 DataType::Map(f, _) => f,
383 _ => {
384 return Err(ArrowError::InvalidArgumentError(format!(
385 "Cannot create map array for field {field:?}"
386 )))
387 }
388 };
389
390 let (offsets, child_len) = create_random_offsets::<i32>(size, 0, 5);
391 let offsets = Buffer::from(offsets.to_byte_slice());
392
393 let entries = create_random_array(
394 entries_field,
395 child_len as usize,
396 null_density,
397 true_density,
398 )?
399 .to_data();
400
401 let null_buffer = match field.is_nullable() {
402 true => Some(create_random_null_buffer(size, map_null_density)),
403 false => None,
404 };
405
406 let map_data = unsafe {
407 ArrayData::new_unchecked(
408 field.data_type().clone(),
409 size,
410 None,
411 null_buffer,
412 0,
413 vec![offsets],
414 vec![entries],
415 )
416 };
417 Ok(make_array(map_data))
418}
419
420fn create_random_offsets<T: OffsetSizeTrait + SampleUniform>(
422 size: usize,
423 min: T,
424 max: T,
425) -> (Vec<T>, T) {
426 let rng = &mut seedable_rng();
427
428 let mut current_offset = T::zero();
429
430 let mut offsets = Vec::with_capacity(size + 1);
431 offsets.push(current_offset);
432
433 (0..size).for_each(|_| {
434 current_offset += rng.random_range(min..max);
435 offsets.push(current_offset);
436 });
437
438 (offsets, current_offset)
439}
440
441fn create_random_null_buffer(size: usize, null_density: f32) -> Buffer {
442 let mut rng = seedable_rng();
443 let mut mut_buf = MutableBuffer::new_null(size);
444 {
445 let mut_slice = mut_buf.as_slice_mut();
446 (0..size).for_each(|i| {
447 if rng.random::<f32>() >= null_density {
448 bit_util::set_bit(mut_slice, i)
449 }
450 })
451 };
452 mut_buf.into()
453}
454
455pub trait RandomTemporalValue: ArrowTemporalType {
458 fn value_range() -> impl SampleRange<Self::Native>;
460
461 fn gen_range<R: Rng>(rng: &mut R) -> Self::Native
463 where
464 Self::Native: SampleUniform,
465 {
466 rng.random_range(Self::value_range())
467 }
468
469 fn random<R: Rng>(rng: &mut R) -> Self::Native
471 where
472 Self::Native: SampleUniform,
473 {
474 Self::gen_range(rng)
475 }
476}
477
478impl RandomTemporalValue for TimestampSecondType {
479 fn value_range() -> impl SampleRange<Self::Native> {
482 0..60 * 60 * 24 * 365 * 100
483 }
484}
485
486impl RandomTemporalValue for TimestampMillisecondType {
487 fn value_range() -> impl SampleRange<Self::Native> {
490 0..1_000 * 60 * 60 * 24 * 365 * 100
491 }
492}
493
494impl RandomTemporalValue for TimestampMicrosecondType {
495 fn value_range() -> impl SampleRange<Self::Native> {
498 0..1_000 * 1_000 * 60 * 60 * 24 * 365 * 100
499 }
500}
501
502impl RandomTemporalValue for TimestampNanosecondType {
503 fn value_range() -> impl SampleRange<Self::Native> {
506 0..1_000 * 1_000 * 1_000 * 60 * 60 * 24 * 365 * 100
507 }
508}
509
510impl RandomTemporalValue for Date32Type {
511 fn value_range() -> impl SampleRange<Self::Native> {
514 0..365 * 100
515 }
516}
517
518impl RandomTemporalValue for Date64Type {
519 fn value_range() -> impl SampleRange<Self::Native> {
522 0..1_000 * 60 * 60 * 24 * 365 * 100
523 }
524}
525
526impl RandomTemporalValue for Time32SecondType {
527 fn value_range() -> impl SampleRange<Self::Native> {
530 0..60 * 60 * 24
531 }
532}
533
534impl RandomTemporalValue for Time32MillisecondType {
535 fn value_range() -> impl SampleRange<Self::Native> {
538 0..1_000 * 60 * 60 * 24
539 }
540}
541
542impl RandomTemporalValue for Time64MicrosecondType {
543 fn value_range() -> impl SampleRange<Self::Native> {
546 0..1_000 * 1_000 * 60 * 60 * 24
547 }
548}
549
550impl RandomTemporalValue for Time64NanosecondType {
551 fn value_range() -> impl SampleRange<Self::Native> {
554 0..1_000 * 1_000 * 1_000 * 60 * 60 * 24
555 }
556}
557
558fn create_random_temporal_array<T>(size: usize, null_density: f32) -> PrimitiveArray<T>
559where
560 T: RandomTemporalValue,
561 <T as ArrowPrimitiveType>::Native: SampleUniform,
562{
563 let mut rng = seedable_rng();
564
565 (0..size)
566 .map(|_| {
567 if rng.random::<f32>() < null_density {
568 None
569 } else {
570 Some(T::random(&mut rng))
571 }
572 })
573 .collect()
574}
575
576#[cfg(test)]
577mod tests {
578 use super::*;
579
580 #[test]
581 fn test_create_batch() {
582 let size = 32;
583 let fields = vec![
584 Field::new("a", DataType::Int32, true),
585 Field::new(
586 "timestamp_without_timezone",
587 DataType::Timestamp(TimeUnit::Nanosecond, None),
588 true,
589 ),
590 Field::new(
591 "timestamp_with_timezone",
592 DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
593 true,
594 ),
595 ];
596 let schema = Schema::new(fields);
597 let schema_ref = Arc::new(schema);
598 let batch = create_random_batch(schema_ref.clone(), size, 0.35, 0.7).unwrap();
599
600 assert_eq!(batch.schema(), schema_ref);
601 assert_eq!(batch.num_columns(), schema_ref.fields().len());
602 for array in batch.columns() {
603 assert_eq!(array.len(), size);
604 }
605 }
606
607 #[test]
608 fn test_create_batch_non_null() {
609 let size = 32;
610 let fields = vec![
611 Field::new("a", DataType::Int32, false),
612 Field::new(
613 "b",
614 DataType::List(Arc::new(Field::new_list_field(DataType::LargeUtf8, false))),
615 false,
616 ),
617 Field::new("a", DataType::Int32, false),
618 ];
619 let schema = Schema::new(fields);
620 let schema_ref = Arc::new(schema);
621 let batch = create_random_batch(schema_ref.clone(), size, 0.35, 0.7).unwrap();
622
623 assert_eq!(batch.schema(), schema_ref);
624 assert_eq!(batch.num_columns(), schema_ref.fields().len());
625 for array in batch.columns() {
626 assert_eq!(array.null_count(), 0);
627 assert_eq!(array.logical_null_count(), 0);
628 }
629 let b_array = batch.column(1);
631 let list_array = b_array.as_list::<i32>();
632 let child_array = list_array.values();
633 assert_eq!(child_array.null_count(), 0);
634 assert!(child_array.len() > list_array.len());
636 }
637
638 #[test]
639 fn test_create_struct_array() {
640 let size = 32;
641 let struct_fields = Fields::from(vec![
642 Field::new("b", DataType::Boolean, true),
643 Field::new(
644 "c",
645 DataType::LargeList(Arc::new(Field::new_list_field(
646 DataType::List(Arc::new(Field::new_list_field(
647 DataType::FixedSizeBinary(6),
648 true,
649 ))),
650 false,
651 ))),
652 true,
653 ),
654 Field::new(
655 "d",
656 DataType::Struct(Fields::from(vec![
657 Field::new("d_x", DataType::Int32, true),
658 Field::new("d_y", DataType::Float32, false),
659 Field::new("d_z", DataType::Binary, true),
660 ])),
661 true,
662 ),
663 ]);
664 let field = Field::new("struct", DataType::Struct(struct_fields), true);
665 let array = create_random_array(&field, size, 0.2, 0.5).unwrap();
666
667 assert_eq!(array.len(), 32);
668 let struct_array = array.as_any().downcast_ref::<StructArray>().unwrap();
669 assert_eq!(struct_array.columns().len(), 3);
670
671 let col_c = struct_array.column_by_name("c").unwrap();
674 let col_c = col_c.as_any().downcast_ref::<LargeListArray>().unwrap();
675 assert_eq!(col_c.len(), size);
676 let col_c_list = col_c.values().as_list::<i32>();
677 assert!(col_c_list.len() > size);
678 let fsb = col_c_list.values();
680 assert_eq!(fsb.data_type(), &DataType::FixedSizeBinary(6));
681 assert!(fsb.len() > col_c_list.len());
682
683 let col_d = struct_array.column_by_name("d").unwrap();
685 let col_d = col_d.as_any().downcast_ref::<StructArray>().unwrap();
686 let col_d_y = col_d.column_by_name("d_y").unwrap();
687 assert_eq!(col_d_y.data_type(), &DataType::Float32);
688 assert_eq!(col_d_y.null_count(), 0);
689 }
690
691 #[test]
692 fn test_create_list_array_nested_nullability() {
693 let list_field = Field::new_list(
694 "not_null_list",
695 Field::new_list_field(DataType::Boolean, true),
696 false,
697 );
698
699 let list_array = create_random_array(&list_field, 100, 0.95, 0.5).unwrap();
700
701 assert_eq!(list_array.null_count(), 0);
702 assert!(list_array.as_list::<i32>().values().null_count() > 0);
703 }
704
705 #[test]
706 fn test_create_struct_array_nested_nullability() {
707 let struct_child_fields = vec![
708 Field::new("null_int", DataType::Int32, true),
709 Field::new("int", DataType::Int32, false),
710 ];
711 let struct_field = Field::new_struct("not_null_struct", struct_child_fields, false);
712
713 let struct_array = create_random_array(&struct_field, 100, 0.95, 0.5).unwrap();
714
715 assert_eq!(struct_array.null_count(), 0);
716 assert!(
717 struct_array
718 .as_struct()
719 .column_by_name("null_int")
720 .unwrap()
721 .null_count()
722 > 0
723 );
724 assert_eq!(
725 struct_array
726 .as_struct()
727 .column_by_name("int")
728 .unwrap()
729 .null_count(),
730 0
731 );
732 }
733
734 #[test]
735 fn test_create_list_array_nested_struct_nullability() {
736 let struct_child_fields = vec![
737 Field::new("null_int", DataType::Int32, true),
738 Field::new("int", DataType::Int32, false),
739 ];
740 let list_item_field =
741 Field::new_list_field(DataType::Struct(struct_child_fields.into()), true);
742 let list_field = Field::new_list("not_null_list", list_item_field, false);
743
744 let list_array = create_random_array(&list_field, 100, 0.95, 0.5).unwrap();
745
746 assert_eq!(list_array.null_count(), 0);
747 assert!(list_array.as_list::<i32>().values().null_count() > 0);
748 assert!(
749 list_array
750 .as_list::<i32>()
751 .values()
752 .as_struct()
753 .column_by_name("null_int")
754 .unwrap()
755 .null_count()
756 > 0
757 );
758 assert_eq!(
759 list_array
760 .as_list::<i32>()
761 .values()
762 .as_struct()
763 .column_by_name("int")
764 .unwrap()
765 .null_count(),
766 0
767 );
768 }
769
770 #[test]
771 fn test_create_map_array() {
772 let map_field = Field::new_map(
773 "map",
774 "entries",
775 Field::new("key", DataType::Utf8, false),
776 Field::new("value", DataType::Utf8, true),
777 false,
778 false,
779 );
780 let array = create_random_array(&map_field, 100, 0.8, 0.5).unwrap();
781
782 assert_eq!(array.len(), 100);
783 assert_eq!(array.null_count(), 0);
785 assert_eq!(array.logical_null_count(), 0);
786 assert!(array.as_map().keys().len() > array.len());
788 assert!(array.as_map().values().len() > array.len());
789 assert_eq!(array.as_map().keys().null_count(), 0);
791 assert!(array.as_map().values().null_count() > 0);
793
794 assert_eq!(array.as_map().keys().data_type(), &DataType::Utf8);
795 assert_eq!(array.as_map().values().data_type(), &DataType::Utf8);
796 }
797
798 #[test]
799 fn test_create_decimal_array() {
800 let size = 10;
801 let fields = vec![
802 Field::new("a", DataType::Decimal128(10, -2), true),
803 Field::new("b", DataType::Decimal256(10, -2), true),
804 ];
805 let schema = Schema::new(fields);
806 let schema_ref = Arc::new(schema);
807 let batch = create_random_batch(schema_ref.clone(), size, 0.35, 0.7).unwrap();
808
809 assert_eq!(batch.schema(), schema_ref);
810 assert_eq!(batch.num_columns(), schema_ref.fields().len());
811 for array in batch.columns() {
812 assert_eq!(array.len(), size);
813 }
814 }
815}