1use crate::utils::{
18 array_from_slice, overflow_error, slice_from_slice_at_offset, string_from_slice,
19};
20use crate::ShortString;
21
22use arrow_schema::ArrowError;
23use chrono::{DateTime, Duration, NaiveDate, NaiveDateTime, NaiveTime, Utc};
24use uuid::Uuid;
25
26#[derive(Debug, Clone, Copy, PartialEq)]
34pub enum VariantBasicType {
35 Primitive = 0,
36 ShortString = 1,
37 Object = 2,
38 Array = 3,
39}
40
41#[derive(Debug, Clone, Copy, PartialEq)]
49pub enum VariantPrimitiveType {
50 Null = 0,
51 BooleanTrue = 1,
52 BooleanFalse = 2,
53 Int8 = 3,
54 Int16 = 4,
55 Int32 = 5,
56 Int64 = 6,
57 Double = 7,
58 Decimal4 = 8,
59 Decimal8 = 9,
60 Decimal16 = 10,
61 Date = 11,
62 TimestampMicros = 12,
63 TimestampNtzMicros = 13,
64 Float = 14,
65 Binary = 15,
66 String = 16,
67 Time = 17,
68 TimestampNanos = 18,
69 TimestampNtzNanos = 19,
70 Uuid = 20,
71}
72
73pub(crate) fn get_basic_type(header: u8) -> VariantBasicType {
75 let basic_type = header & 0x03; match basic_type {
78 0 => VariantBasicType::Primitive,
79 1 => VariantBasicType::ShortString,
80 2 => VariantBasicType::Object,
81 3 => VariantBasicType::Array,
82 _ => {
83 unreachable!();
86 }
87 }
88}
89
90impl TryFrom<u8> for VariantPrimitiveType {
91 type Error = ArrowError;
92
93 fn try_from(value: u8) -> Result<Self, Self::Error> {
94 match value {
95 0 => Ok(VariantPrimitiveType::Null),
96 1 => Ok(VariantPrimitiveType::BooleanTrue),
97 2 => Ok(VariantPrimitiveType::BooleanFalse),
98 3 => Ok(VariantPrimitiveType::Int8),
99 4 => Ok(VariantPrimitiveType::Int16),
100 5 => Ok(VariantPrimitiveType::Int32),
101 6 => Ok(VariantPrimitiveType::Int64),
102 7 => Ok(VariantPrimitiveType::Double),
103 8 => Ok(VariantPrimitiveType::Decimal4),
104 9 => Ok(VariantPrimitiveType::Decimal8),
105 10 => Ok(VariantPrimitiveType::Decimal16),
106 11 => Ok(VariantPrimitiveType::Date),
107 12 => Ok(VariantPrimitiveType::TimestampMicros),
108 13 => Ok(VariantPrimitiveType::TimestampNtzMicros),
109 14 => Ok(VariantPrimitiveType::Float),
110 15 => Ok(VariantPrimitiveType::Binary),
111 16 => Ok(VariantPrimitiveType::String),
112 17 => Ok(VariantPrimitiveType::Time),
113 18 => Ok(VariantPrimitiveType::TimestampNanos),
114 19 => Ok(VariantPrimitiveType::TimestampNtzNanos),
115 20 => Ok(VariantPrimitiveType::Uuid),
116 _ => Err(ArrowError::InvalidArgumentError(format!(
117 "unknown primitive type: {value}",
118 ))),
119 }
120 }
121}
122
123#[derive(Debug, Clone, Copy, PartialEq)]
127pub(crate) enum OffsetSizeBytes {
128 One = 1,
129 Two = 2,
130 Three = 3,
131 Four = 4,
132}
133
134impl OffsetSizeBytes {
135 pub(crate) fn try_new(offset_size_minus_one: u8) -> Result<Self, ArrowError> {
137 use OffsetSizeBytes::*;
138 let result = match offset_size_minus_one {
139 0 => One,
140 1 => Two,
141 2 => Three,
142 3 => Four,
143 _ => {
144 return Err(ArrowError::InvalidArgumentError(
145 "offset_size_minus_one must be 0–3".to_string(),
146 ))
147 }
148 };
149 Ok(result)
150 }
151
152 pub(crate) fn unpack_u32(&self, bytes: &[u8], index: usize) -> Result<u32, ArrowError> {
159 self.unpack_u32_at_offset(bytes, 0, index)
160 }
161
162 pub(crate) fn unpack_u32_at_offset(
172 &self,
173 bytes: &[u8],
174 byte_offset: usize, offset_index: usize, ) -> Result<u32, ArrowError> {
177 use OffsetSizeBytes::*;
178
179 let offset = offset_index
182 .checked_mul(*self as usize)
183 .and_then(|n| n.checked_add(byte_offset))
184 .ok_or_else(|| overflow_error("unpacking offset array value"))?;
185 let value = match self {
186 One => u8::from_le_bytes(array_from_slice(bytes, offset)?).into(),
187 Two => u16::from_le_bytes(array_from_slice(bytes, offset)?).into(),
188 Three => {
189 let b3_chunks: [u8; 3] = array_from_slice(bytes, offset)?;
191 let mut buf = [0u8; 4];
193 buf[..3].copy_from_slice(&b3_chunks);
194 u32::from_le_bytes(buf)
195 }
196 Four => u32::from_le_bytes(array_from_slice(bytes, offset)?),
197 };
198 Ok(value)
199 }
200}
201
202pub(crate) fn map_bytes_to_offsets(
204 buffer: &[u8],
205 offset_size: OffsetSizeBytes,
206) -> impl Iterator<Item = usize> + use<'_> {
207 buffer
208 .chunks_exact(offset_size as usize)
209 .map(move |chunk| match offset_size {
210 OffsetSizeBytes::One => chunk[0] as usize,
211 OffsetSizeBytes::Two => u16::from_le_bytes([chunk[0], chunk[1]]) as usize,
212 OffsetSizeBytes::Three => {
213 u32::from_le_bytes([chunk[0], chunk[1], chunk[2], 0]) as usize
214 }
215 OffsetSizeBytes::Four => {
216 u32::from_le_bytes([chunk[0], chunk[1], chunk[2], chunk[3]]) as usize
217 }
218 })
219}
220
221pub(crate) fn get_primitive_type(metadata: u8) -> Result<VariantPrimitiveType, ArrowError> {
223 VariantPrimitiveType::try_from(metadata >> 2)
225}
226
227pub(crate) fn decode_int8(data: &[u8]) -> Result<i8, ArrowError> {
229 Ok(i8::from_le_bytes(array_from_slice(data, 0)?))
230}
231
232pub(crate) fn decode_int16(data: &[u8]) -> Result<i16, ArrowError> {
234 Ok(i16::from_le_bytes(array_from_slice(data, 0)?))
235}
236
237pub(crate) fn decode_int32(data: &[u8]) -> Result<i32, ArrowError> {
239 Ok(i32::from_le_bytes(array_from_slice(data, 0)?))
240}
241
242pub(crate) fn decode_int64(data: &[u8]) -> Result<i64, ArrowError> {
244 Ok(i64::from_le_bytes(array_from_slice(data, 0)?))
245}
246
247pub(crate) fn decode_decimal4(data: &[u8]) -> Result<(i32, u8), ArrowError> {
249 let scale = u8::from_le_bytes(array_from_slice(data, 0)?);
250 let integer = i32::from_le_bytes(array_from_slice(data, 1)?);
251 Ok((integer, scale))
252}
253
254pub(crate) fn decode_decimal8(data: &[u8]) -> Result<(i64, u8), ArrowError> {
256 let scale = u8::from_le_bytes(array_from_slice(data, 0)?);
257 let integer = i64::from_le_bytes(array_from_slice(data, 1)?);
258 Ok((integer, scale))
259}
260
261pub(crate) fn decode_decimal16(data: &[u8]) -> Result<(i128, u8), ArrowError> {
263 let scale = u8::from_le_bytes(array_from_slice(data, 0)?);
264 let integer = i128::from_le_bytes(array_from_slice(data, 1)?);
265 Ok((integer, scale))
266}
267
268pub(crate) fn decode_float(data: &[u8]) -> Result<f32, ArrowError> {
270 Ok(f32::from_le_bytes(array_from_slice(data, 0)?))
271}
272
273pub(crate) fn decode_double(data: &[u8]) -> Result<f64, ArrowError> {
275 Ok(f64::from_le_bytes(array_from_slice(data, 0)?))
276}
277
278pub(crate) fn decode_date(data: &[u8]) -> Result<NaiveDate, ArrowError> {
280 let days_since_epoch = i32::from_le_bytes(array_from_slice(data, 0)?);
281 let value = DateTime::UNIX_EPOCH + Duration::days(i64::from(days_since_epoch));
282 Ok(value.date_naive())
283}
284
285pub(crate) fn decode_timestamp_micros(data: &[u8]) -> Result<DateTime<Utc>, ArrowError> {
287 let micros_since_epoch = i64::from_le_bytes(array_from_slice(data, 0)?);
288 DateTime::from_timestamp_micros(micros_since_epoch).ok_or_else(|| {
289 ArrowError::CastError(format!(
290 "Could not cast `{micros_since_epoch}` microseconds into a DateTime<Utc>"
291 ))
292 })
293}
294
295pub(crate) fn decode_timestampntz_micros(data: &[u8]) -> Result<NaiveDateTime, ArrowError> {
297 let micros_since_epoch = i64::from_le_bytes(array_from_slice(data, 0)?);
298 DateTime::from_timestamp_micros(micros_since_epoch)
299 .ok_or_else(|| {
300 ArrowError::CastError(format!(
301 "Could not cast `{micros_since_epoch}` microseconds into a NaiveDateTime"
302 ))
303 })
304 .map(|v| v.naive_utc())
305}
306
307pub(crate) fn decode_time_ntz(data: &[u8]) -> Result<NaiveTime, ArrowError> {
308 let micros_since_epoch = u64::from_le_bytes(array_from_slice(data, 0)?);
309
310 let case_error = ArrowError::CastError(format!(
311 "Could not cast {micros_since_epoch} microseconds into a NaiveTime"
312 ));
313
314 if micros_since_epoch >= 86_400_000_000 {
315 return Err(case_error);
316 }
317
318 let nanos_since_midnight = micros_since_epoch * 1_000;
319 NaiveTime::from_num_seconds_from_midnight_opt(
320 (nanos_since_midnight / 1_000_000_000) as u32,
321 (nanos_since_midnight % 1_000_000_000) as u32,
322 )
323 .ok_or(case_error)
324}
325
326pub(crate) fn decode_timestamp_nanos(data: &[u8]) -> Result<DateTime<Utc>, ArrowError> {
328 let nanos_since_epoch = i64::from_le_bytes(array_from_slice(data, 0)?);
329
330 Ok(DateTime::from_timestamp_nanos(nanos_since_epoch))
332}
333
334pub(crate) fn decode_timestampntz_nanos(data: &[u8]) -> Result<NaiveDateTime, ArrowError> {
336 decode_timestamp_nanos(data).map(|v| v.naive_utc())
337}
338
339pub(crate) fn decode_uuid(data: &[u8]) -> Result<Uuid, ArrowError> {
341 Uuid::from_slice(&data[0..16])
342 .map_err(|_| ArrowError::CastError(format!("Cant decode uuid from {:?}", &data[0..16])))
343}
344
345pub(crate) fn decode_binary(data: &[u8]) -> Result<&[u8], ArrowError> {
347 let len = u32::from_le_bytes(array_from_slice(data, 0)?) as usize;
348 slice_from_slice_at_offset(data, 4, 0..len)
349}
350
351pub(crate) fn decode_long_string(data: &[u8]) -> Result<&str, ArrowError> {
353 let len = u32::from_le_bytes(array_from_slice(data, 0)?) as usize;
354 string_from_slice(data, 4, 0..len)
355}
356
357pub(crate) fn decode_short_string(
359 metadata: u8,
360 data: &[u8],
361) -> Result<ShortString<'_>, ArrowError> {
362 let len = (metadata >> 2) as usize;
363 let string = string_from_slice(data, 0, 0..len)?;
364 ShortString::try_new(string)
365}
366
367#[cfg(test)]
368mod tests {
369 use super::*;
370 use paste::paste;
371
372 macro_rules! test_decoder_bounds {
373 ($test_name:ident, $data:expr, $decode_fn:ident, $expected:expr) => {
374 paste! {
375 #[test]
376 fn [<$test_name _exact_length>]() {
377 let result = $decode_fn(&$data).unwrap();
378 assert_eq!(result, $expected);
379 }
380
381 #[test]
382 fn [<$test_name _truncated_length>]() {
383 let truncated_data = &$data[.. $data.len() - 1];
385 let result = $decode_fn(truncated_data);
386 assert!(matches!(result, Err(ArrowError::InvalidArgumentError(_))));
387 }
388 }
389 };
390 }
391
392 mod integer {
393 use super::*;
394
395 test_decoder_bounds!(test_i8, [0x2a], decode_int8, 42);
396 test_decoder_bounds!(test_i16, [0xd2, 0x04], decode_int16, 1234);
397 test_decoder_bounds!(test_i32, [0x40, 0xe2, 0x01, 0x00], decode_int32, 123456);
398 test_decoder_bounds!(
399 test_i64,
400 [0x15, 0x81, 0xe9, 0x7d, 0xf4, 0x10, 0x22, 0x11],
401 decode_int64,
402 1234567890123456789
403 );
404 }
405
406 mod decimal {
407 use super::*;
408
409 test_decoder_bounds!(
410 test_decimal4,
411 [
412 0x02, 0xd2, 0x04, 0x00, 0x00, ],
415 decode_decimal4,
416 (1234, 2)
417 );
418
419 test_decoder_bounds!(
420 test_decimal8,
421 [
422 0x02, 0xd2, 0x02, 0x96, 0x49, 0x00, 0x00, 0x00, 0x00, ],
425 decode_decimal8,
426 (1234567890, 2)
427 );
428
429 test_decoder_bounds!(
430 test_decimal16,
431 [
432 0x02, 0xd2, 0xb6, 0x23, 0xc0, 0xf4, 0x10, 0x22, 0x11, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
434 0x00, 0x00, ],
436 decode_decimal16,
437 (1234567891234567890, 2)
438 );
439 }
440
441 mod float {
442 use super::*;
443
444 test_decoder_bounds!(
445 test_float,
446 [0x06, 0x2c, 0x93, 0x4e],
447 decode_float,
448 1234567890.1234
449 );
450
451 test_decoder_bounds!(
452 test_double,
453 [0xc9, 0xe5, 0x87, 0xb4, 0x80, 0x65, 0xd2, 0x41],
454 decode_double,
455 1234567890.1234
456 );
457 }
458
459 mod datetime {
460 use super::*;
461
462 test_decoder_bounds!(
463 test_date,
464 [0xe2, 0x4e, 0x0, 0x0],
465 decode_date,
466 NaiveDate::from_ymd_opt(2025, 4, 16).unwrap()
467 );
468
469 test_decoder_bounds!(
470 test_timestamp_micros,
471 [0xe0, 0x52, 0x97, 0xdd, 0xe7, 0x32, 0x06, 0x00],
472 decode_timestamp_micros,
473 NaiveDate::from_ymd_opt(2025, 4, 16)
474 .unwrap()
475 .and_hms_milli_opt(16, 34, 56, 780)
476 .unwrap()
477 .and_utc()
478 );
479
480 test_decoder_bounds!(
481 test_timestampntz_micros,
482 [0xe0, 0x52, 0x97, 0xdd, 0xe7, 0x32, 0x06, 0x00],
483 decode_timestampntz_micros,
484 NaiveDate::from_ymd_opt(2025, 4, 16)
485 .unwrap()
486 .and_hms_milli_opt(16, 34, 56, 780)
487 .unwrap()
488 );
489
490 test_decoder_bounds!(
491 test_timestamp_nanos,
492 [0x15, 0x41, 0xa2, 0x5a, 0x36, 0xa2, 0x5b, 0x18],
493 decode_timestamp_nanos,
494 NaiveDate::from_ymd_opt(2025, 8, 14)
495 .unwrap()
496 .and_hms_nano_opt(12, 33, 54, 123456789)
497 .unwrap()
498 .and_utc()
499 );
500
501 test_decoder_bounds!(
502 test_timestamp_nanos_before_epoch,
503 [0x15, 0x41, 0x52, 0xd4, 0x94, 0xe5, 0xad, 0xfa],
504 decode_timestamp_nanos,
505 NaiveDate::from_ymd_opt(1957, 11, 7)
506 .unwrap()
507 .and_hms_nano_opt(12, 33, 54, 123456789)
508 .unwrap()
509 .and_utc()
510 );
511
512 test_decoder_bounds!(
513 test_timestampntz_nanos,
514 [0x15, 0x41, 0xa2, 0x5a, 0x36, 0xa2, 0x5b, 0x18],
515 decode_timestampntz_nanos,
516 NaiveDate::from_ymd_opt(2025, 8, 14)
517 .unwrap()
518 .and_hms_nano_opt(12, 33, 54, 123456789)
519 .unwrap()
520 );
521
522 test_decoder_bounds!(
523 test_timestampntz_nanos_before_epoch,
524 [0x15, 0x41, 0x52, 0xd4, 0x94, 0xe5, 0xad, 0xfa],
525 decode_timestampntz_nanos,
526 NaiveDate::from_ymd_opt(1957, 11, 7)
527 .unwrap()
528 .and_hms_nano_opt(12, 33, 54, 123456789)
529 .unwrap()
530 );
531 }
532
533 #[test]
534 fn test_uuid() {
535 let data = [
536 0xf2, 0x4f, 0x9b, 0x64, 0x81, 0xfa, 0x49, 0xd1, 0xb7, 0x4e, 0x8c, 0x09, 0xa6, 0xe3,
537 0x1c, 0x56,
538 ];
539 let result = decode_uuid(&data).unwrap();
540 assert_eq!(
541 Uuid::parse_str("f24f9b64-81fa-49d1-b74e-8c09a6e31c56").unwrap(),
542 result
543 );
544 }
545
546 mod time {
547 use super::*;
548
549 test_decoder_bounds!(
550 test_timentz,
551 [0x53, 0x1f, 0x8e, 0xdf, 0x2, 0, 0, 0],
552 decode_time_ntz,
553 NaiveTime::from_num_seconds_from_midnight_opt(12340, 567_891_000).unwrap()
554 );
555
556 #[test]
557 fn test_decode_time_ntz_invalid() {
558 let invalid_second = u64::MAX;
559 let data = invalid_second.to_le_bytes();
560 let result = decode_time_ntz(&data);
561 assert!(matches!(result, Err(ArrowError::CastError(_))));
562 }
563 }
564
565 #[test]
566 fn test_binary_exact_length() {
567 let data = [
568 0x09, 0, 0, 0, 0x03, 0x13, 0x37, 0xde, 0xad, 0xbe, 0xef, 0xca, 0xfe,
570 ];
571 let result = decode_binary(&data).unwrap();
572 assert_eq!(
573 result,
574 [0x03, 0x13, 0x37, 0xde, 0xad, 0xbe, 0xef, 0xca, 0xfe]
575 );
576 }
577
578 #[test]
579 fn test_binary_truncated_length() {
580 let data = [
581 0x09, 0, 0, 0, 0x03, 0x13, 0x37, 0xde, 0xad, 0xbe, 0xef, 0xca,
583 ];
584 let result = decode_binary(&data);
585 assert!(matches!(result, Err(ArrowError::InvalidArgumentError(_))));
586 }
587
588 #[test]
589 fn test_short_string_exact_length() {
590 let data = [b'H', b'e', b'l', b'l', b'o', b'o'];
591 let result = decode_short_string(1 | 5 << 2, &data).unwrap();
592 assert_eq!(result.0, "Hello");
593 }
594
595 #[test]
596 fn test_short_string_truncated_length() {
597 let data = [b'H', b'e', b'l'];
598 let result = decode_short_string(1 | 5 << 2, &data);
599 assert!(matches!(result, Err(ArrowError::InvalidArgumentError(_))));
600 }
601
602 #[test]
603 fn test_string_exact_length() {
604 let data = [
605 0x05, 0, 0, 0, b'H', b'e', b'l', b'l', b'o', b'o',
607 ];
608 let result = decode_long_string(&data).unwrap();
609 assert_eq!(result, "Hello");
610 }
611
612 #[test]
613 fn test_string_truncated_length() {
614 let data = [
615 0x05, 0, 0, 0, b'H', b'e', b'l',
617 ];
618 let result = decode_long_string(&data);
619 assert!(matches!(result, Err(ArrowError::InvalidArgumentError(_))));
620 }
621
622 #[test]
623 fn test_offset() {
624 assert_eq!(OffsetSizeBytes::try_new(0).unwrap(), OffsetSizeBytes::One);
625 assert_eq!(OffsetSizeBytes::try_new(1).unwrap(), OffsetSizeBytes::Two);
626 assert_eq!(OffsetSizeBytes::try_new(2).unwrap(), OffsetSizeBytes::Three);
627 assert_eq!(OffsetSizeBytes::try_new(3).unwrap(), OffsetSizeBytes::Four);
628
629 assert!(OffsetSizeBytes::try_new(4).is_err());
631 assert!(OffsetSizeBytes::try_new(255).is_err());
632 }
633
634 #[test]
635 fn unpack_u32_all_widths() {
636 let buf_one = [0x01u8, 0xAB, 0xCD];
638 assert_eq!(OffsetSizeBytes::One.unpack_u32(&buf_one, 0).unwrap(), 0x01);
639 assert_eq!(OffsetSizeBytes::One.unpack_u32(&buf_one, 2).unwrap(), 0xCD);
640
641 let buf_two = [0x34, 0x12, 0x78, 0x56];
643 assert_eq!(
644 OffsetSizeBytes::Two.unpack_u32(&buf_two, 0).unwrap(),
645 0x1234
646 );
647 assert_eq!(
648 OffsetSizeBytes::Two.unpack_u32(&buf_two, 1).unwrap(),
649 0x5678
650 );
651
652 let buf_three = [0x01, 0x02, 0x03, 0xFF, 0x00, 0x00];
654 assert_eq!(
655 OffsetSizeBytes::Three.unpack_u32(&buf_three, 0).unwrap(),
656 0x030201
657 );
658 assert_eq!(
659 OffsetSizeBytes::Three.unpack_u32(&buf_three, 1).unwrap(),
660 0x0000FF
661 );
662
663 let buf_four = [0x78, 0x56, 0x34, 0x12, 0xEF, 0xCD, 0xAB, 0x90];
665 assert_eq!(
666 OffsetSizeBytes::Four.unpack_u32(&buf_four, 0).unwrap(),
667 0x1234_5678
668 );
669 assert_eq!(
670 OffsetSizeBytes::Four.unpack_u32(&buf_four, 1).unwrap(),
671 0x90AB_CDEF
672 );
673 }
674
675 #[test]
676 fn unpack_u32_out_of_bounds() {
677 let tiny = [0x00u8]; assert!(OffsetSizeBytes::Two.unpack_u32(&tiny, 0).is_err());
679 assert!(OffsetSizeBytes::Three.unpack_u32(&tiny, 0).is_err());
680 }
681
682 #[test]
683 fn unpack_simple() {
684 let buf = [
685 0x41, 0x02, 0x00, 0x00, 0x00, 0x05, 0x00, 0x09, 0x00, ];
691
692 let width = OffsetSizeBytes::Two;
693
694 let dict_size = width.unpack_u32_at_offset(&buf, 1, 0).unwrap();
696 assert_eq!(dict_size, 2);
697
698 let first = width.unpack_u32_at_offset(&buf, 1, 1).unwrap();
700 assert_eq!(first, 0);
701
702 let second = width.unpack_u32_at_offset(&buf, 1, 2).unwrap();
703 assert_eq!(second, 5);
704
705 let third = width.unpack_u32_at_offset(&buf, 1, 3).unwrap();
706 assert_eq!(third, 9);
707
708 let err = width.unpack_u32_at_offset(&buf, 1, 4);
709 assert!(err.is_err())
710 }
711}