1use std::sync::Arc;
21
22use rand::{
23 Rng,
24 distr::uniform::{SampleRange, SampleUniform},
25};
26
27use crate::array::*;
28use crate::error::{ArrowError, Result};
29use crate::{
30 buffer::{Buffer, MutableBuffer},
31 datatypes::*,
32};
33
34use super::{bench_util::*, bit_util, test_util::seedable_rng};
35
36pub fn create_random_batch(
38 schema: SchemaRef,
39 size: usize,
40 null_density: f32,
41 true_density: f32,
42) -> Result<RecordBatch> {
43 let columns = schema
44 .fields()
45 .iter()
46 .map(|field| create_random_array(field, size, null_density, true_density))
47 .collect::<Result<Vec<ArrayRef>>>()?;
48
49 RecordBatch::try_new_with_options(
50 schema,
51 columns,
52 &RecordBatchOptions::new().with_match_field_names(false),
53 )
54}
55
56pub fn create_random_array(
67 field: &Field,
68 size: usize,
69 mut null_density: f32,
70 true_density: f32,
71) -> Result<ArrayRef> {
72 if !field.data_type().is_nested() && !matches!(field.data_type(), Dictionary(_, _)) {
76 null_density = match field.is_nullable() {
78 true => null_density,
79 false => 0.0,
80 };
81 }
82
83 use DataType::*;
84 let array = match field.data_type() {
85 Null => Arc::new(NullArray::new(size)) as ArrayRef,
86 Boolean => Arc::new(create_boolean_array(size, null_density, true_density)),
87 Int8 => Arc::new(create_primitive_array::<Int8Type>(size, null_density)),
88 Int16 => Arc::new(create_primitive_array::<Int16Type>(size, null_density)),
89 Int32 => Arc::new(create_primitive_array::<Int32Type>(size, null_density)),
90 Int64 => Arc::new(create_primitive_array::<Int64Type>(size, null_density)),
91 UInt8 => Arc::new(create_primitive_array::<UInt8Type>(size, null_density)),
92 UInt16 => Arc::new(create_primitive_array::<UInt16Type>(size, null_density)),
93 UInt32 => Arc::new(create_primitive_array::<UInt32Type>(size, null_density)),
94 UInt64 => Arc::new(create_primitive_array::<UInt64Type>(size, null_density)),
95 Float16 => {
96 return Err(ArrowError::NotYetImplemented(
97 "Float16 is not implemented".to_string(),
98 ));
99 }
100 Float32 => Arc::new(create_primitive_array::<Float32Type>(size, null_density)),
101 Float64 => Arc::new(create_primitive_array::<Float64Type>(size, null_density)),
102 Timestamp(unit, tz) => match unit {
103 TimeUnit::Second => Arc::new(
104 create_random_temporal_array::<TimestampSecondType>(size, null_density)
105 .with_timezone_opt(tz.clone()),
106 ) as ArrayRef,
107 TimeUnit::Millisecond => Arc::new(
108 create_random_temporal_array::<TimestampMillisecondType>(size, null_density)
109 .with_timezone_opt(tz.clone()),
110 ),
111 TimeUnit::Microsecond => Arc::new(
112 create_random_temporal_array::<TimestampMicrosecondType>(size, null_density)
113 .with_timezone_opt(tz.clone()),
114 ),
115 TimeUnit::Nanosecond => Arc::new(
116 create_random_temporal_array::<TimestampNanosecondType>(size, null_density)
117 .with_timezone_opt(tz.clone()),
118 ),
119 },
120 Date32 => Arc::new(create_random_temporal_array::<Date32Type>(
121 size,
122 null_density,
123 )),
124 Date64 => Arc::new(create_random_temporal_array::<Date64Type>(
125 size,
126 null_density,
127 )),
128 Time32(unit) => match unit {
129 TimeUnit::Second => Arc::new(create_random_temporal_array::<Time32SecondType>(
130 size,
131 null_density,
132 )) as ArrayRef,
133 TimeUnit::Millisecond => Arc::new(
134 create_random_temporal_array::<Time32MillisecondType>(size, null_density),
135 ),
136 _ => {
137 return Err(ArrowError::InvalidArgumentError(format!(
138 "Unsupported unit {unit:?} for Time32"
139 )));
140 }
141 },
142 Time64(unit) => match unit {
143 TimeUnit::Microsecond => Arc::new(
144 create_random_temporal_array::<Time64MicrosecondType>(size, null_density),
145 ) as ArrayRef,
146 TimeUnit::Nanosecond => Arc::new(create_random_temporal_array::<Time64NanosecondType>(
147 size,
148 null_density,
149 )),
150 _ => {
151 return Err(ArrowError::InvalidArgumentError(format!(
152 "Unsupported unit {unit:?} for Time64"
153 )));
154 }
155 },
156 Utf8 => Arc::new(create_string_array::<i32>(size, null_density)),
157 LargeUtf8 => Arc::new(create_string_array::<i64>(size, null_density)),
158 Utf8View => Arc::new(create_string_view_array_with_len(
159 size,
160 null_density,
161 4,
162 false,
163 )),
164 Binary => Arc::new(create_binary_array::<i32>(size, null_density)),
165 LargeBinary => Arc::new(create_binary_array::<i64>(size, null_density)),
166 FixedSizeBinary(len) => Arc::new(create_fsb_array(size, null_density, *len as usize)),
167 BinaryView => Arc::new(
168 create_string_view_array_with_len(size, null_density, 4, false).to_binary_view(),
169 ),
170 List(_) => create_random_list_array(field, size, null_density, true_density)?,
171 LargeList(_) => create_random_list_array(field, size, null_density, true_density)?,
172 Struct(_) => create_random_struct_array(field, size, null_density, true_density)?,
173 d @ Dictionary(_, value_type) if crate::compute::can_cast_types(value_type, d) => {
174 let f = Field::new(
175 field.name(),
176 value_type.as_ref().clone(),
177 field.is_nullable(),
178 );
179 let v = create_random_array(&f, size, null_density, true_density)?;
180 crate::compute::cast(&v, d)?
181 }
182 Map(_, _) => create_random_map_array(field, size, null_density, true_density)?,
183 Decimal128(_, _) => create_random_decimal_array(field, size, null_density)?,
184 Decimal256(_, _) => create_random_decimal_array(field, size, null_density)?,
185 other => {
186 return Err(ArrowError::NotYetImplemented(format!(
187 "Generating random arrays not yet implemented for {other:?}"
188 )));
189 }
190 };
191
192 if !field.is_nullable() {
193 assert_eq!(array.null_count(), 0);
194 }
195
196 Ok(array)
197}
198
199#[inline]
200fn create_random_decimal_array(field: &Field, size: usize, null_density: f32) -> Result<ArrayRef> {
201 let mut rng = seedable_rng();
202
203 match field.data_type() {
204 DataType::Decimal128(precision, scale) => {
205 let values = (0..size)
206 .map(|_| {
207 if rng.random::<f32>() < null_density {
208 None
209 } else {
210 Some(rng.random::<i128>())
211 }
212 })
213 .collect::<Vec<_>>();
214 Ok(Arc::new(
215 Decimal128Array::from(values).with_precision_and_scale(*precision, *scale)?,
216 ))
217 }
218 DataType::Decimal256(precision, scale) => {
219 let values = (0..size)
220 .map(|_| {
221 if rng.random::<f32>() < null_density {
222 None
223 } else {
224 Some(i256::from_parts(rng.random::<u128>(), rng.random::<i128>()))
225 }
226 })
227 .collect::<Vec<_>>();
228 Ok(Arc::new(
229 Decimal256Array::from(values).with_precision_and_scale(*precision, *scale)?,
230 ))
231 }
232 _ => Err(ArrowError::InvalidArgumentError(format!(
233 "Cannot create decimal array for field {field}"
234 ))),
235 }
236}
237
238#[inline]
239fn create_random_list_array(
240 field: &Field,
241 size: usize,
242 null_density: f32,
243 true_density: f32,
244) -> Result<ArrayRef> {
245 let list_null_density = match field.is_nullable() {
247 true => null_density,
248 false => 0.0,
249 };
250 let list_field;
251 let (offsets, child_len) = match field.data_type() {
252 DataType::List(f) => {
253 let (offsets, child_len) = create_random_offsets::<i32>(size, 0, 5);
254 list_field = f;
255 (Buffer::from(offsets.to_byte_slice()), child_len as usize)
256 }
257 DataType::LargeList(f) => {
258 let (offsets, child_len) = create_random_offsets::<i64>(size, 0, 5);
259 list_field = f;
260 (Buffer::from(offsets.to_byte_slice()), child_len as usize)
261 }
262 _ => {
263 return Err(ArrowError::InvalidArgumentError(format!(
264 "Cannot create list array for field {field}"
265 )));
266 }
267 };
268
269 let child_array = create_random_array(list_field, child_len, null_density, true_density)?;
271 let child_data = child_array.to_data();
272 let null_buffer = match field.is_nullable() {
274 true => Some(create_random_null_buffer(size, list_null_density)),
275 false => None,
276 };
277 let list_data = unsafe {
278 ArrayData::new_unchecked(
279 field.data_type().clone(),
280 size,
281 None,
282 null_buffer,
283 0,
284 vec![offsets],
285 vec![child_data],
286 )
287 };
288 Ok(make_array(list_data))
289}
290
291#[inline]
292fn create_random_struct_array(
293 field: &Field,
294 size: usize,
295 null_density: f32,
296 true_density: f32,
297) -> Result<ArrayRef> {
298 let struct_fields = match field.data_type() {
299 DataType::Struct(fields) => fields,
300 _ => {
301 return Err(ArrowError::InvalidArgumentError(format!(
302 "Cannot create struct array for field {field}"
303 )));
304 }
305 };
306
307 let child_arrays = struct_fields
308 .iter()
309 .map(|struct_field| create_random_array(struct_field, size, null_density, true_density))
310 .collect::<Result<Vec<_>>>()?;
311
312 let null_buffer = match field.is_nullable() {
313 true => {
314 let nulls = arrow_buffer::BooleanBuffer::new(
315 create_random_null_buffer(size, null_density),
316 0,
317 size,
318 );
319 Some(nulls.into())
320 }
321 false => None,
322 };
323
324 Ok(Arc::new(StructArray::try_new(
325 struct_fields.clone(),
326 child_arrays,
327 null_buffer,
328 )?))
329}
330
331#[inline]
332fn create_random_map_array(
333 field: &Field,
334 size: usize,
335 null_density: f32,
336 true_density: f32,
337) -> Result<ArrayRef> {
338 let map_null_density = match field.is_nullable() {
340 true => null_density,
341 false => 0.0,
342 };
343
344 let entries_field = match field.data_type() {
345 DataType::Map(f, _) => f,
346 _ => {
347 return Err(ArrowError::InvalidArgumentError(format!(
348 "Cannot create map array for field {field:?}"
349 )));
350 }
351 };
352
353 let (offsets, child_len) = create_random_offsets::<i32>(size, 0, 5);
354 let offsets = Buffer::from(offsets.to_byte_slice());
355
356 let entries = create_random_array(
357 entries_field,
358 child_len as usize,
359 null_density,
360 true_density,
361 )?
362 .to_data();
363
364 let null_buffer = match field.is_nullable() {
365 true => Some(create_random_null_buffer(size, map_null_density)),
366 false => None,
367 };
368
369 let map_data = unsafe {
370 ArrayData::new_unchecked(
371 field.data_type().clone(),
372 size,
373 None,
374 null_buffer,
375 0,
376 vec![offsets],
377 vec![entries],
378 )
379 };
380 Ok(make_array(map_data))
381}
382
383fn create_random_offsets<T: OffsetSizeTrait + SampleUniform>(
385 size: usize,
386 min: T,
387 max: T,
388) -> (Vec<T>, T) {
389 let rng = &mut seedable_rng();
390
391 let mut current_offset = T::zero();
392
393 let mut offsets = Vec::with_capacity(size + 1);
394 offsets.push(current_offset);
395
396 (0..size).for_each(|_| {
397 current_offset += rng.random_range(min..max);
398 offsets.push(current_offset);
399 });
400
401 (offsets, current_offset)
402}
403
404fn create_random_null_buffer(size: usize, null_density: f32) -> Buffer {
405 let mut rng = seedable_rng();
406 let mut mut_buf = MutableBuffer::new_null(size);
407 {
408 let mut_slice = mut_buf.as_slice_mut();
409 (0..size).for_each(|i| {
410 if rng.random::<f32>() >= null_density {
411 bit_util::set_bit(mut_slice, i)
412 }
413 })
414 };
415 mut_buf.into()
416}
417
418pub trait RandomTemporalValue: ArrowTemporalType {
421 fn value_range() -> impl SampleRange<Self::Native>;
423
424 fn gen_range<R: Rng>(rng: &mut R) -> Self::Native
426 where
427 Self::Native: SampleUniform,
428 {
429 rng.random_range(Self::value_range())
430 }
431
432 fn random<R: Rng>(rng: &mut R) -> Self::Native
434 where
435 Self::Native: SampleUniform,
436 {
437 Self::gen_range(rng)
438 }
439}
440
441impl RandomTemporalValue for TimestampSecondType {
442 fn value_range() -> impl SampleRange<Self::Native> {
445 0..60 * 60 * 24 * 365 * 100
446 }
447}
448
449impl RandomTemporalValue for TimestampMillisecondType {
450 fn value_range() -> impl SampleRange<Self::Native> {
453 0..1_000 * 60 * 60 * 24 * 365 * 100
454 }
455}
456
457impl RandomTemporalValue for TimestampMicrosecondType {
458 fn value_range() -> impl SampleRange<Self::Native> {
461 0..1_000 * 1_000 * 60 * 60 * 24 * 365 * 100
462 }
463}
464
465impl RandomTemporalValue for TimestampNanosecondType {
466 fn value_range() -> impl SampleRange<Self::Native> {
469 0..1_000 * 1_000 * 1_000 * 60 * 60 * 24 * 365 * 100
470 }
471}
472
473impl RandomTemporalValue for Date32Type {
474 fn value_range() -> impl SampleRange<Self::Native> {
477 0..365 * 100
478 }
479}
480
481impl RandomTemporalValue for Date64Type {
482 fn value_range() -> impl SampleRange<Self::Native> {
485 0..1_000 * 60 * 60 * 24 * 365 * 100
486 }
487}
488
489impl RandomTemporalValue for Time32SecondType {
490 fn value_range() -> impl SampleRange<Self::Native> {
493 0..60 * 60 * 24
494 }
495}
496
497impl RandomTemporalValue for Time32MillisecondType {
498 fn value_range() -> impl SampleRange<Self::Native> {
501 0..1_000 * 60 * 60 * 24
502 }
503}
504
505impl RandomTemporalValue for Time64MicrosecondType {
506 fn value_range() -> impl SampleRange<Self::Native> {
509 0..1_000 * 1_000 * 60 * 60 * 24
510 }
511}
512
513impl RandomTemporalValue for Time64NanosecondType {
514 fn value_range() -> impl SampleRange<Self::Native> {
517 0..1_000 * 1_000 * 1_000 * 60 * 60 * 24
518 }
519}
520
521fn create_random_temporal_array<T>(size: usize, null_density: f32) -> PrimitiveArray<T>
522where
523 T: RandomTemporalValue,
524 <T as ArrowPrimitiveType>::Native: SampleUniform,
525{
526 let mut rng = seedable_rng();
527
528 (0..size)
529 .map(|_| {
530 if rng.random::<f32>() < null_density {
531 None
532 } else {
533 Some(T::random(&mut rng))
534 }
535 })
536 .collect()
537}
538
539#[cfg(test)]
540mod tests {
541 use super::*;
542
543 #[test]
544 fn test_create_batch() {
545 let size = 32;
546 let fields = vec![
547 Field::new("a", DataType::Int32, true),
548 Field::new(
549 "timestamp_without_timezone",
550 DataType::Timestamp(TimeUnit::Nanosecond, None),
551 true,
552 ),
553 Field::new(
554 "timestamp_with_timezone",
555 DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
556 true,
557 ),
558 ];
559 let schema = Schema::new(fields);
560 let schema_ref = Arc::new(schema);
561 let batch = create_random_batch(schema_ref.clone(), size, 0.35, 0.7).unwrap();
562
563 assert_eq!(batch.schema(), schema_ref);
564 assert_eq!(batch.num_columns(), schema_ref.fields().len());
565 for array in batch.columns() {
566 assert_eq!(array.len(), size);
567 }
568 }
569
570 #[test]
571 fn test_create_batch_non_null() {
572 let size = 32;
573 let fields = vec![
574 Field::new("a", DataType::Int32, false),
575 Field::new(
576 "b",
577 DataType::List(Arc::new(Field::new_list_field(DataType::LargeUtf8, false))),
578 false,
579 ),
580 Field::new("a", DataType::Int32, false),
581 ];
582 let schema = Schema::new(fields);
583 let schema_ref = Arc::new(schema);
584 let batch = create_random_batch(schema_ref.clone(), size, 0.35, 0.7).unwrap();
585
586 assert_eq!(batch.schema(), schema_ref);
587 assert_eq!(batch.num_columns(), schema_ref.fields().len());
588 for array in batch.columns() {
589 assert_eq!(array.null_count(), 0);
590 assert_eq!(array.logical_null_count(), 0);
591 }
592 let b_array = batch.column(1);
594 let list_array = b_array.as_list::<i32>();
595 let child_array = list_array.values();
596 assert_eq!(child_array.null_count(), 0);
597 assert!(child_array.len() > list_array.len());
599 }
600
601 #[test]
602 fn test_create_struct_array() {
603 let size = 32;
604 let struct_fields = Fields::from(vec![
605 Field::new("b", DataType::Boolean, true),
606 Field::new(
607 "c",
608 DataType::LargeList(Arc::new(Field::new_list_field(
609 DataType::List(Arc::new(Field::new_list_field(
610 DataType::FixedSizeBinary(6),
611 true,
612 ))),
613 false,
614 ))),
615 true,
616 ),
617 Field::new(
618 "d",
619 DataType::Struct(Fields::from(vec![
620 Field::new("d_x", DataType::Int32, true),
621 Field::new("d_y", DataType::Float32, false),
622 Field::new("d_z", DataType::Binary, true),
623 ])),
624 true,
625 ),
626 ]);
627 let field = Field::new("struct", DataType::Struct(struct_fields), true);
628 let array = create_random_array(&field, size, 0.2, 0.5).unwrap();
629
630 assert_eq!(array.len(), 32);
631 let struct_array = array.as_any().downcast_ref::<StructArray>().unwrap();
632 assert_eq!(struct_array.columns().len(), 3);
633
634 let col_c = struct_array.column_by_name("c").unwrap();
637 let col_c = col_c.as_any().downcast_ref::<LargeListArray>().unwrap();
638 assert_eq!(col_c.len(), size);
639 let col_c_list = col_c.values().as_list::<i32>();
640 assert!(col_c_list.len() > size);
641 let fsb = col_c_list.values();
643 assert_eq!(fsb.data_type(), &DataType::FixedSizeBinary(6));
644 assert!(fsb.len() > col_c_list.len());
645
646 let col_d = struct_array.column_by_name("d").unwrap();
648 let col_d = col_d.as_any().downcast_ref::<StructArray>().unwrap();
649 let col_d_y = col_d.column_by_name("d_y").unwrap();
650 assert_eq!(col_d_y.data_type(), &DataType::Float32);
651 assert_eq!(col_d_y.null_count(), 0);
652 }
653
654 #[test]
655 fn test_create_list_array_nested_nullability() {
656 let list_field = Field::new_list(
657 "not_null_list",
658 Field::new_list_field(DataType::Boolean, true),
659 false,
660 );
661
662 let list_array = create_random_array(&list_field, 100, 0.95, 0.5).unwrap();
663
664 assert_eq!(list_array.null_count(), 0);
665 assert!(list_array.as_list::<i32>().values().null_count() > 0);
666 }
667
668 #[test]
669 fn test_create_struct_array_nested_nullability() {
670 let struct_child_fields = vec![
671 Field::new("null_int", DataType::Int32, true),
672 Field::new("int", DataType::Int32, false),
673 ];
674 let struct_field = Field::new_struct("not_null_struct", struct_child_fields, false);
675
676 let struct_array = create_random_array(&struct_field, 100, 0.95, 0.5).unwrap();
677
678 assert_eq!(struct_array.null_count(), 0);
679 assert!(
680 struct_array
681 .as_struct()
682 .column_by_name("null_int")
683 .unwrap()
684 .null_count()
685 > 0
686 );
687 assert_eq!(
688 struct_array
689 .as_struct()
690 .column_by_name("int")
691 .unwrap()
692 .null_count(),
693 0
694 );
695 }
696
697 #[test]
698 fn test_create_list_array_nested_struct_nullability() {
699 let struct_child_fields = vec![
700 Field::new("null_int", DataType::Int32, true),
701 Field::new("int", DataType::Int32, false),
702 ];
703 let list_item_field =
704 Field::new_list_field(DataType::Struct(struct_child_fields.into()), true);
705 let list_field = Field::new_list("not_null_list", list_item_field, false);
706
707 let list_array = create_random_array(&list_field, 100, 0.95, 0.5).unwrap();
708
709 assert_eq!(list_array.null_count(), 0);
710 assert!(list_array.as_list::<i32>().values().null_count() > 0);
711 assert!(
712 list_array
713 .as_list::<i32>()
714 .values()
715 .as_struct()
716 .column_by_name("null_int")
717 .unwrap()
718 .null_count()
719 > 0
720 );
721 assert_eq!(
722 list_array
723 .as_list::<i32>()
724 .values()
725 .as_struct()
726 .column_by_name("int")
727 .unwrap()
728 .null_count(),
729 0
730 );
731 }
732
733 #[test]
734 fn test_create_map_array() {
735 let map_field = Field::new_map(
736 "map",
737 "entries",
738 Field::new("key", DataType::Utf8, false),
739 Field::new("value", DataType::Utf8, true),
740 false,
741 false,
742 );
743 let array = create_random_array(&map_field, 100, 0.8, 0.5).unwrap();
744
745 assert_eq!(array.len(), 100);
746 assert_eq!(array.null_count(), 0);
748 assert_eq!(array.logical_null_count(), 0);
749 assert!(array.as_map().keys().len() > array.len());
751 assert!(array.as_map().values().len() > array.len());
752 assert_eq!(array.as_map().keys().null_count(), 0);
754 assert!(array.as_map().values().null_count() > 0);
756
757 assert_eq!(array.as_map().keys().data_type(), &DataType::Utf8);
758 assert_eq!(array.as_map().values().data_type(), &DataType::Utf8);
759 }
760
761 #[test]
762 fn test_create_decimal_array() {
763 let size = 10;
764 let fields = vec![
765 Field::new("a", DataType::Decimal128(10, -2), true),
766 Field::new("b", DataType::Decimal256(10, -2), true),
767 ];
768 let schema = Schema::new(fields);
769 let schema_ref = Arc::new(schema);
770 let batch = create_random_batch(schema_ref.clone(), size, 0.35, 0.7).unwrap();
771
772 assert_eq!(batch.schema(), schema_ref);
773 assert_eq!(batch.num_columns(), schema_ref.fields().len());
774 for array in batch.columns() {
775 assert_eq!(array.len(), size);
776 }
777 }
778
779 #[test]
780 fn create_non_nullable_decimal_array_with_null_density() {
781 let size = 10;
782 let fields = vec![
783 Field::new("a", DataType::Decimal128(10, -2), false),
784 Field::new("b", DataType::Decimal256(10, -2), false),
785 ];
786 let schema = Schema::new(fields);
787 let schema_ref = Arc::new(schema);
788 let batch = create_random_batch(schema_ref.clone(), size, 0.35, 0.7).unwrap();
789
790 assert_eq!(batch.schema(), schema_ref);
791 assert_eq!(batch.num_columns(), schema_ref.fields().len());
792 for array in batch.columns() {
793 assert_eq!(array.len(), size);
794 assert_eq!(array.null_count(), 0);
795 }
796 }
797}