parquet/arrow/push_decoder/mod.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! [`ParquetPushDecoder`]: decodes Parquet data with data provided by the
19//! caller (rather than from an underlying reader).
20
21mod reader_builder;
22mod remaining;
23
24use crate::DecodeResult;
25use crate::arrow::arrow_reader::{
26 ArrowReaderBuilder, ArrowReaderMetadata, ArrowReaderOptions, ParquetRecordBatchReader,
27};
28use crate::errors::ParquetError;
29use crate::file::metadata::ParquetMetaData;
30use crate::util::push_buffers::PushBuffers;
31use arrow_array::RecordBatch;
32use bytes::Bytes;
33use reader_builder::RowGroupReaderBuilder;
34use remaining::RemainingRowGroups;
35use std::ops::Range;
36use std::sync::Arc;
37
38/// A builder for [`ParquetPushDecoder`].
39///
40/// To create a new decoder, use [`ParquetPushDecoderBuilder::try_new_decoder`].
41///
42/// You can decode the metadata from a Parquet file using either
43/// [`ParquetMetadataReader`] or [`ParquetMetaDataPushDecoder`].
44///
45/// [`ParquetMetadataReader`]: crate::file::metadata::ParquetMetaDataReader
46/// [`ParquetMetaDataPushDecoder`]: crate::file::metadata::ParquetMetaDataPushDecoder
47///
48/// Note the "input" type is `u64` which represents the length of the Parquet file
49/// being decoded. This is needed to initialize the internal buffers that track
50/// what data has been provided to the decoder.
51///
52/// # Example
53/// ```
54/// # use std::ops::Range;
55/// # use std::sync::Arc;
56/// # use bytes::Bytes;
57/// # use arrow_array::record_batch;
58/// # use parquet::DecodeResult;
59/// # use parquet::arrow::push_decoder::ParquetPushDecoderBuilder;
60/// # use parquet::arrow::ArrowWriter;
61/// # use parquet::file::metadata::ParquetMetaDataPushDecoder;
62/// # let file_bytes = {
63/// # let mut buffer = vec![];
64/// # let batch = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
65/// # let mut writer = ArrowWriter::try_new(&mut buffer, batch.schema(), None).unwrap();
66/// # writer.write(&batch).unwrap();
67/// # writer.close().unwrap();
68/// # Bytes::from(buffer)
69/// # };
70/// # // mimic IO by returning a function that returns the bytes for a given range
71/// # let get_range = |range: &Range<u64>| -> Bytes {
72/// # let start = range.start as usize;
73/// # let end = range.end as usize;
74/// # file_bytes.slice(start..end)
75/// # };
76/// # let file_length = file_bytes.len() as u64;
77/// # let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(file_length).unwrap();
78/// # metadata_decoder.push_ranges(vec![0..file_length], vec![file_bytes.clone()]).unwrap();
79/// # let DecodeResult::Data(parquet_metadata) = metadata_decoder.try_decode().unwrap() else { panic!("failed to decode metadata") };
80/// # let parquet_metadata = Arc::new(parquet_metadata);
81/// // The file length and metadata are required to create the decoder
82/// let mut decoder =
83/// ParquetPushDecoderBuilder::try_new_decoder(parquet_metadata)
84/// .unwrap()
85/// // Optionally configure the decoder, e.g. batch size
86/// .with_batch_size(1024)
87/// // Build the decoder
88/// .build()
89/// .unwrap();
90///
91/// // In a loop, ask the decoder what it needs next, and provide it with the required data
92/// loop {
93/// match decoder.try_decode().unwrap() {
94/// DecodeResult::NeedsData(ranges) => {
95/// // The decoder needs more data. Fetch the data for the given ranges
96/// let data = ranges.iter().map(|r| get_range(r)).collect::<Vec<_>>();
97/// // Push the data to the decoder
98/// decoder.push_ranges(ranges, data).unwrap();
99/// // After pushing the data, we can try to decode again on the next iteration
100/// }
101/// DecodeResult::Data(batch) => {
102/// // Successfully decoded a batch of data
103/// assert!(batch.num_rows() > 0);
104/// }
105/// DecodeResult::Finished => {
106/// // The decoder has finished decoding exit the loop
107/// break;
108/// }
109/// }
110/// }
111/// ```
112pub type ParquetPushDecoderBuilder = ArrowReaderBuilder<NoInput>;
113
114/// Type that represents "No input" for the [`ParquetPushDecoderBuilder`]
115///
116/// There is no "input" for the push decoder by design (the idea is that
117/// the caller pushes data to the decoder as needed)..
118///
119/// However, [`ArrowReaderBuilder`] is shared with the sync and async readers,
120/// which DO have an `input`. To support reusing the same builder code for
121/// all three types of decoders, we define this `NoInput` for the push decoder to
122/// denote in the type system there is no type.
123#[derive(Debug, Clone, Copy)]
124pub struct NoInput;
125
126/// Methods for building a ParquetDecoder. See the base [`ArrowReaderBuilder`] for
127/// more options that can be configured.
128impl ParquetPushDecoderBuilder {
129 /// Create a new `ParquetDecoderBuilder` for configuring a Parquet decoder for the given file.
130 ///
131 /// See [`ParquetMetadataDecoder`] for a builder that can read the metadata from a Parquet file.
132 ///
133 /// [`ParquetMetadataDecoder`]: crate::file::metadata::ParquetMetaDataPushDecoder
134 ///
135 /// See example on [`ParquetPushDecoderBuilder`]
136 pub fn try_new_decoder(parquet_metadata: Arc<ParquetMetaData>) -> Result<Self, ParquetError> {
137 Self::try_new_decoder_with_options(parquet_metadata, ArrowReaderOptions::default())
138 }
139
140 /// Create a new `ParquetDecoderBuilder` for configuring a Parquet decoder for the given file
141 /// with the given reader options.
142 ///
143 /// This is similar to [`Self::try_new_decoder`] but allows configuring
144 /// options such as Arrow schema
145 pub fn try_new_decoder_with_options(
146 parquet_metadata: Arc<ParquetMetaData>,
147 arrow_reader_options: ArrowReaderOptions,
148 ) -> Result<Self, ParquetError> {
149 let arrow_reader_metadata =
150 ArrowReaderMetadata::try_new(parquet_metadata, arrow_reader_options)?;
151 Ok(Self::new_with_metadata(arrow_reader_metadata))
152 }
153
154 /// Create a new `ParquetDecoderBuilder` given [`ArrowReaderMetadata`].
155 ///
156 /// See [`ArrowReaderMetadata::try_new`] for how to create the metadata from
157 /// the Parquet metadata and reader options.
158 pub fn new_with_metadata(arrow_reader_metadata: ArrowReaderMetadata) -> Self {
159 Self::new_builder(NoInput, arrow_reader_metadata)
160 }
161
162 /// Create a [`ParquetPushDecoder`] with the configured options
163 pub fn build(self) -> Result<ParquetPushDecoder, ParquetError> {
164 let Self {
165 input: NoInput,
166 metadata: parquet_metadata,
167 schema: _,
168 fields,
169 batch_size,
170 row_groups,
171 projection,
172 filter,
173 selection,
174 limit,
175 offset,
176 metrics,
177 row_selection_policy,
178 max_predicate_cache_size,
179 } = self;
180
181 // If no row groups were specified, read all of them
182 let row_groups =
183 row_groups.unwrap_or_else(|| (0..parquet_metadata.num_row_groups()).collect());
184
185 // Prepare to build RowGroup readers
186 let file_len = 0; // not used in push decoder
187 let buffers = PushBuffers::new(file_len);
188 let row_group_reader_builder = RowGroupReaderBuilder::new(
189 batch_size,
190 projection,
191 Arc::clone(&parquet_metadata),
192 fields,
193 filter,
194 limit,
195 offset,
196 metrics,
197 max_predicate_cache_size,
198 buffers,
199 row_selection_policy,
200 );
201
202 // Initialize the decoder with the configured options
203 let remaining_row_groups = RemainingRowGroups::new(
204 parquet_metadata,
205 row_groups,
206 selection,
207 row_group_reader_builder,
208 );
209
210 Ok(ParquetPushDecoder {
211 state: ParquetDecoderState::ReadingRowGroup {
212 remaining_row_groups: Box::new(remaining_row_groups),
213 },
214 })
215 }
216}
217
218/// A push based Parquet Decoder
219///
220/// See [`ParquetPushDecoderBuilder`] for an example of how to build and use the decoder.
221///
222/// [`ParquetPushDecoder`] is a low level API for decoding Parquet data without an
223/// underlying reader for performing IO, and thus offers fine grained control
224/// over how data is fetched and decoded.
225///
226/// When more data is needed to make progress, instead of reading data directly
227/// from a reader, the decoder returns [`DecodeResult`] indicating what ranges
228/// are needed. Once the caller provides the requested ranges via
229/// [`Self::push_ranges`], they try to decode again by calling
230/// [`Self::try_decode`].
231///
232/// The decoder's internal state tracks what has been already decoded and what
233/// is needed next.
234#[derive(Debug)]
235pub struct ParquetPushDecoder {
236 /// The inner state.
237 ///
238 /// This state is consumed on every transition and a new state is produced
239 /// so the Rust compiler can ensure that the state is always valid and
240 /// transitions are not missed.
241 state: ParquetDecoderState,
242}
243
244impl ParquetPushDecoder {
245 /// Attempt to decode the next batch of data, or return what data is needed
246 ///
247 /// The the decoder communicates the next state with a [`DecodeResult`]
248 ///
249 /// See full example in [`ParquetPushDecoderBuilder`]
250 ///
251 /// ```no_run
252 /// # use parquet::arrow::push_decoder::ParquetPushDecoder;
253 /// use parquet::DecodeResult;
254 /// # fn get_decoder() -> ParquetPushDecoder { unimplemented!() }
255 /// # fn push_data(decoder: &mut ParquetPushDecoder, ranges: Vec<std::ops::Range<u64>>) { unimplemented!() }
256 /// let mut decoder = get_decoder();
257 /// loop {
258 /// match decoder.try_decode().unwrap() {
259 /// DecodeResult::NeedsData(ranges) => {
260 /// // The decoder needs more data. Fetch the data for the given ranges
261 /// // call decoder.push_ranges(ranges, data) and call again
262 /// push_data(&mut decoder, ranges);
263 /// }
264 /// DecodeResult::Data(batch) => {
265 /// // Successfully decoded the next batch of data
266 /// println!("Got batch with {} rows", batch.num_rows());
267 /// }
268 /// DecodeResult::Finished => {
269 /// // The decoder has finished decoding all data
270 /// break;
271 /// }
272 /// }
273 /// }
274 ///```
275 pub fn try_decode(&mut self) -> Result<DecodeResult<RecordBatch>, ParquetError> {
276 let current_state = std::mem::replace(&mut self.state, ParquetDecoderState::Finished);
277 let (new_state, decode_result) = current_state.try_next_batch()?;
278 self.state = new_state;
279 Ok(decode_result)
280 }
281
282 /// Return a [`ParquetRecordBatchReader`] that reads the next set of rows, or
283 /// return what data is needed to produce it.
284 ///
285 /// This API can be used to get a reader for decoding the next set of
286 /// RecordBatches while proceeding to begin fetching data for the set (e.g
287 /// row group)
288 ///
289 /// Example
290 /// ```no_run
291 /// # use parquet::arrow::push_decoder::ParquetPushDecoder;
292 /// use parquet::DecodeResult;
293 /// # fn get_decoder() -> ParquetPushDecoder { unimplemented!() }
294 /// # fn push_data(decoder: &mut ParquetPushDecoder, ranges: Vec<std::ops::Range<u64>>) { unimplemented!() }
295 /// let mut decoder = get_decoder();
296 /// loop {
297 /// match decoder.try_next_reader().unwrap() {
298 /// DecodeResult::NeedsData(ranges) => {
299 /// // The decoder needs more data. Fetch the data for the given ranges
300 /// // call decoder.push_ranges(ranges, data) and call again
301 /// push_data(&mut decoder, ranges);
302 /// }
303 /// DecodeResult::Data(reader) => {
304 /// // spawn a thread to read the batches in parallel
305 /// // with fetching the next row group / data
306 /// std::thread::spawn(move || {
307 /// for batch in reader {
308 /// let batch = batch.unwrap();
309 /// println!("Got batch with {} rows", batch.num_rows());
310 /// }
311 /// });
312 /// }
313 /// DecodeResult::Finished => {
314 /// // The decoder has finished decoding all data
315 /// break;
316 /// }
317 /// }
318 /// }
319 ///```
320 pub fn try_next_reader(
321 &mut self,
322 ) -> Result<DecodeResult<ParquetRecordBatchReader>, ParquetError> {
323 let current_state = std::mem::replace(&mut self.state, ParquetDecoderState::Finished);
324 let (new_state, decode_result) = current_state.try_next_reader()?;
325 self.state = new_state;
326 Ok(decode_result)
327 }
328
329 /// Push data into the decoder for processing
330 ///
331 /// This is a convenience wrapper around [`Self::push_ranges`] for pushing a
332 /// single range of data.
333 ///
334 /// Note this can be the entire file or just a part of it. If it is part of the file,
335 /// the ranges should correspond to the data ranges requested by the decoder.
336 ///
337 /// See example in [`ParquetPushDecoderBuilder`]
338 pub fn push_range(&mut self, range: Range<u64>, data: Bytes) -> Result<(), ParquetError> {
339 self.push_ranges(vec![range], vec![data])
340 }
341
342 /// Push data into the decoder for processing
343 ///
344 /// This should correspond to the data ranges requested by the decoder
345 pub fn push_ranges(
346 &mut self,
347 ranges: Vec<Range<u64>>,
348 data: Vec<Bytes>,
349 ) -> Result<(), ParquetError> {
350 let current_state = std::mem::replace(&mut self.state, ParquetDecoderState::Finished);
351 self.state = current_state.push_data(ranges, data)?;
352 Ok(())
353 }
354
355 /// Returns the total number of buffered bytes in the decoder
356 ///
357 /// This is the sum of the size of all [`Bytes`] that has been pushed to the
358 /// decoder but not yet consumed.
359 ///
360 /// Note that this does not include any overhead of the internal data
361 /// structures and that since [`Bytes`] are ref counted memory, this may not
362 /// reflect additional memory usage.
363 ///
364 /// This can be used to monitor memory usage of the decoder.
365 pub fn buffered_bytes(&self) -> u64 {
366 self.state.buffered_bytes()
367 }
368}
369
370/// Internal state machine for the [`ParquetPushDecoder`]
371#[derive(Debug)]
372enum ParquetDecoderState {
373 /// Waiting for data needed to decode the next RowGroup
374 ReadingRowGroup {
375 remaining_row_groups: Box<RemainingRowGroups>,
376 },
377 /// The decoder is actively decoding a RowGroup
378 DecodingRowGroup {
379 /// Current active reader
380 record_batch_reader: Box<ParquetRecordBatchReader>,
381 remaining_row_groups: Box<RemainingRowGroups>,
382 },
383 /// The decoder has finished processing all data
384 Finished,
385}
386
387impl ParquetDecoderState {
388 /// If actively reading a RowGroup, return the currently active
389 /// ParquetRecordBatchReader and advance to the next group.
390 fn try_next_reader(
391 self,
392 ) -> Result<(Self, DecodeResult<ParquetRecordBatchReader>), ParquetError> {
393 let mut current_state = self;
394 loop {
395 let (next_state, decode_result) = current_state.transition()?;
396 // if more data is needed to transition, can't proceed further without it
397 match decode_result {
398 DecodeResult::NeedsData(ranges) => {
399 return Ok((next_state, DecodeResult::NeedsData(ranges)));
400 }
401 // act next based on state
402 DecodeResult::Data(()) | DecodeResult::Finished => {}
403 }
404 match next_state {
405 // not ready to read yet, continue transitioning
406 Self::ReadingRowGroup { .. } => current_state = next_state,
407 // have a reader ready, so return it and set ourself to ReadingRowGroup
408 Self::DecodingRowGroup {
409 record_batch_reader,
410 remaining_row_groups,
411 } => {
412 let result = DecodeResult::Data(*record_batch_reader);
413 let next_state = Self::ReadingRowGroup {
414 remaining_row_groups,
415 };
416 return Ok((next_state, result));
417 }
418 Self::Finished => {
419 return Ok((Self::Finished, DecodeResult::Finished));
420 }
421 }
422 }
423 }
424
425 /// Current state --> next state + output
426 ///
427 /// This function is called to get the next RecordBatch
428 ///
429 /// This structure is used to reduce the indentation level of the main loop
430 /// in try_build
431 fn try_next_batch(self) -> Result<(Self, DecodeResult<RecordBatch>), ParquetError> {
432 let mut current_state = self;
433 loop {
434 let (new_state, decode_result) = current_state.transition()?;
435 // if more data is needed to transition, can't proceed further without it
436 match decode_result {
437 DecodeResult::NeedsData(ranges) => {
438 return Ok((new_state, DecodeResult::NeedsData(ranges)));
439 }
440 // act next based on state
441 DecodeResult::Data(()) | DecodeResult::Finished => {}
442 }
443 match new_state {
444 // not ready to read yet, continue transitioning
445 Self::ReadingRowGroup { .. } => current_state = new_state,
446 // have a reader ready, so decode the next batch
447 Self::DecodingRowGroup {
448 mut record_batch_reader,
449 remaining_row_groups,
450 } => {
451 match record_batch_reader.next() {
452 // Successfully decoded a batch, return it
453 Some(Ok(batch)) => {
454 let result = DecodeResult::Data(batch);
455 let next_state = Self::DecodingRowGroup {
456 record_batch_reader,
457 remaining_row_groups,
458 };
459 return Ok((next_state, result));
460 }
461 // No more batches in this row group, move to the next row group
462 None => {
463 current_state = Self::ReadingRowGroup {
464 remaining_row_groups,
465 }
466 }
467 // some error occurred while decoding, so return that
468 Some(Err(e)) => {
469 // TODO: preserve ArrowError in ParquetError (rather than convert to a string)
470 return Err(ParquetError::ArrowError(e.to_string()));
471 }
472 }
473 }
474 Self::Finished => {
475 return Ok((Self::Finished, DecodeResult::Finished));
476 }
477 }
478 }
479 }
480
481 /// Transition to the next state with a reader (data can be produced), if not end of stream
482 ///
483 /// This function is called in a loop until the decoder is ready to return
484 /// data (has the required pages buffered) or is finished.
485 fn transition(self) -> Result<(Self, DecodeResult<()>), ParquetError> {
486 // result returned when there is data ready
487 let data_ready = DecodeResult::Data(());
488 match self {
489 Self::ReadingRowGroup {
490 mut remaining_row_groups,
491 } => {
492 match remaining_row_groups.try_next_reader()? {
493 // If we have a next reader, we can transition to decoding it
494 DecodeResult::Data(record_batch_reader) => {
495 // Transition to decoding the row group
496 Ok((
497 Self::DecodingRowGroup {
498 record_batch_reader: Box::new(record_batch_reader),
499 remaining_row_groups,
500 },
501 data_ready,
502 ))
503 }
504 DecodeResult::NeedsData(ranges) => {
505 // If we need more data, we return the ranges needed and stay in Reading
506 // RowGroup state
507 Ok((
508 Self::ReadingRowGroup {
509 remaining_row_groups,
510 },
511 DecodeResult::NeedsData(ranges),
512 ))
513 }
514 // If there are no more readers, we are finished
515 DecodeResult::Finished => {
516 // No more row groups to read, we are finished
517 Ok((Self::Finished, DecodeResult::Finished))
518 }
519 }
520 }
521 // if we are already in DecodingRowGroup, just return data ready
522 Self::DecodingRowGroup { .. } => Ok((self, data_ready)),
523 // if finished, just return finished
524 Self::Finished => Ok((self, DecodeResult::Finished)),
525 }
526 }
527
528 /// Push data, and transition state if needed
529 ///
530 /// This should correspond to the data ranges requested by the decoder
531 pub fn push_data(
532 self,
533 ranges: Vec<Range<u64>>,
534 data: Vec<Bytes>,
535 ) -> Result<Self, ParquetError> {
536 match self {
537 ParquetDecoderState::ReadingRowGroup {
538 mut remaining_row_groups,
539 } => {
540 // Push data to the RowGroupReaderBuilder
541 remaining_row_groups.push_data(ranges, data);
542 Ok(ParquetDecoderState::ReadingRowGroup {
543 remaining_row_groups,
544 })
545 }
546 // it is ok to get data before we asked for it
547 ParquetDecoderState::DecodingRowGroup {
548 record_batch_reader,
549 mut remaining_row_groups,
550 } => {
551 remaining_row_groups.push_data(ranges, data);
552 Ok(ParquetDecoderState::DecodingRowGroup {
553 record_batch_reader,
554 remaining_row_groups,
555 })
556 }
557 ParquetDecoderState::Finished => Err(ParquetError::General(
558 "Cannot push data to a finished decoder".to_string(),
559 )),
560 }
561 }
562
563 /// How many bytes are currently buffered in the decoder?
564 fn buffered_bytes(&self) -> u64 {
565 match self {
566 ParquetDecoderState::ReadingRowGroup {
567 remaining_row_groups,
568 } => remaining_row_groups.buffered_bytes(),
569 ParquetDecoderState::DecodingRowGroup {
570 record_batch_reader: _,
571 remaining_row_groups,
572 } => remaining_row_groups.buffered_bytes(),
573 ParquetDecoderState::Finished => 0,
574 }
575 }
576}
577
578#[cfg(test)]
579mod test {
580 use super::*;
581 use crate::DecodeResult;
582 use crate::arrow::arrow_reader::{ArrowPredicateFn, RowFilter, RowSelection, RowSelector};
583 use crate::arrow::push_decoder::{ParquetPushDecoder, ParquetPushDecoderBuilder};
584 use crate::arrow::{ArrowWriter, ProjectionMask};
585 use crate::errors::ParquetError;
586 use crate::file::metadata::ParquetMetaDataPushDecoder;
587 use crate::file::properties::WriterProperties;
588 use arrow::compute::kernels::cmp::{gt, lt};
589 use arrow_array::cast::AsArray;
590 use arrow_array::types::Int64Type;
591 use arrow_array::{ArrayRef, Int64Array, RecordBatch, StringViewArray};
592 use arrow_select::concat::concat_batches;
593 use bytes::Bytes;
594 use std::fmt::Debug;
595 use std::ops::Range;
596 use std::sync::{Arc, LazyLock};
597
598 /// Test decoder struct size (as they are copied around on each transition, they
599 /// should not grow too large)
600 #[test]
601 fn test_decoder_size() {
602 assert_eq!(std::mem::size_of::<ParquetDecoderState>(), 24);
603 }
604
605 /// Decode the entire file at once, simulating a scenario where all data is
606 /// available in memory
607 #[test]
608 fn test_decoder_all_data() {
609 let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
610 .unwrap()
611 .build()
612 .unwrap();
613
614 decoder
615 .push_range(test_file_range(), TEST_FILE_DATA.clone())
616 .unwrap();
617
618 let results = vec![
619 // first row group should be decoded without needing more data
620 expect_data(decoder.try_decode()),
621 // second row group should be decoded without needing more data
622 expect_data(decoder.try_decode()),
623 ];
624 expect_finished(decoder.try_decode());
625
626 let all_output = concat_batches(&TEST_BATCH.schema(), &results).unwrap();
627 // Check that the output matches the input batch
628 assert_eq!(all_output, *TEST_BATCH);
629 }
630
631 /// Decode the entire file incrementally, simulating a scenario where data is
632 /// fetched as needed
633 #[test]
634 fn test_decoder_incremental() {
635 let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
636 .unwrap()
637 .build()
638 .unwrap();
639
640 let mut results = vec![];
641
642 // First row group, expect a single request
643 let ranges = expect_needs_data(decoder.try_decode());
644 let num_bytes_requested: u64 = ranges.iter().map(|r| r.end - r.start).sum();
645 push_ranges_to_decoder(&mut decoder, ranges);
646 // The decoder should currently only store the data it needs to decode the first row group
647 assert_eq!(decoder.buffered_bytes(), num_bytes_requested);
648 results.push(expect_data(decoder.try_decode()));
649 // the decoder should have consumed the data for the first row group and freed it
650 assert_eq!(decoder.buffered_bytes(), 0);
651
652 // Second row group,
653 let ranges = expect_needs_data(decoder.try_decode());
654 let num_bytes_requested: u64 = ranges.iter().map(|r| r.end - r.start).sum();
655 push_ranges_to_decoder(&mut decoder, ranges);
656 // The decoder should currently only store the data it needs to decode the second row group
657 assert_eq!(decoder.buffered_bytes(), num_bytes_requested);
658 results.push(expect_data(decoder.try_decode()));
659 // the decoder should have consumed the data for the second row group and freed it
660 assert_eq!(decoder.buffered_bytes(), 0);
661 expect_finished(decoder.try_decode());
662
663 // Check that the output matches the input batch
664 let all_output = concat_batches(&TEST_BATCH.schema(), &results).unwrap();
665 assert_eq!(all_output, *TEST_BATCH);
666 }
667
668 /// Decode the entire file incrementally, simulating partial reads
669 #[test]
670 fn test_decoder_partial() {
671 let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
672 .unwrap()
673 .build()
674 .unwrap();
675
676 // First row group, expect a single request for all data needed to read "a" and "b"
677 let ranges = expect_needs_data(decoder.try_decode());
678 push_ranges_to_decoder(&mut decoder, ranges);
679
680 let batch1 = expect_data(decoder.try_decode());
681 let expected1 = TEST_BATCH.slice(0, 200);
682 assert_eq!(batch1, expected1);
683
684 // Second row group, this time provide the data in two steps
685 let ranges = expect_needs_data(decoder.try_decode());
686 let (ranges1, ranges2) = ranges.split_at(ranges.len() / 2);
687 assert!(!ranges1.is_empty());
688 assert!(!ranges2.is_empty());
689 // push first half to simulate partial read
690 push_ranges_to_decoder(&mut decoder, ranges1.to_vec());
691
692 // still expect more data
693 let ranges = expect_needs_data(decoder.try_decode());
694 assert_eq!(ranges, ranges2); // should be the remaining ranges
695 // push empty ranges should be a no-op
696 push_ranges_to_decoder(&mut decoder, vec![]);
697 let ranges = expect_needs_data(decoder.try_decode());
698 assert_eq!(ranges, ranges2); // should be the remaining ranges
699 push_ranges_to_decoder(&mut decoder, ranges);
700
701 let batch2 = expect_data(decoder.try_decode());
702 let expected2 = TEST_BATCH.slice(200, 200);
703 assert_eq!(batch2, expected2);
704
705 expect_finished(decoder.try_decode());
706 }
707
708 /// Decode multiple columns "a" and "b", expect that the decoder requests
709 /// only a single request per row group
710 #[test]
711 fn test_decoder_selection_does_one_request() {
712 let builder =
713 ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
714
715 let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
716
717 let mut decoder = builder
718 .with_projection(
719 ProjectionMask::columns(&schema_descr, ["a", "b"]), // read "a", "b"
720 )
721 .build()
722 .unwrap();
723
724 // First row group, expect a single request for all data needed to read "a" and "b"
725 let ranges = expect_needs_data(decoder.try_decode());
726 push_ranges_to_decoder(&mut decoder, ranges);
727
728 let batch1 = expect_data(decoder.try_decode());
729 let expected1 = TEST_BATCH.slice(0, 200).project(&[0, 1]).unwrap();
730 assert_eq!(batch1, expected1);
731
732 // Second row group, similarly expect a single request for all data needed to read "a" and "b"
733 let ranges = expect_needs_data(decoder.try_decode());
734 push_ranges_to_decoder(&mut decoder, ranges);
735
736 let batch2 = expect_data(decoder.try_decode());
737 let expected2 = TEST_BATCH.slice(200, 200).project(&[0, 1]).unwrap();
738 assert_eq!(batch2, expected2);
739
740 expect_finished(decoder.try_decode());
741 }
742
743 /// Decode with a filter that requires multiple requests, but only provide part
744 /// of the data needed for the filter at a time simulating partial reads.
745 #[test]
746 fn test_decoder_single_filter_partial() {
747 let builder =
748 ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
749
750 // Values in column "a" range 0..399
751 // First filter: "a" > 250 (nothing in Row Group 0, both data pages in Row Group 1)
752 let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
753
754 // a > 250
755 let row_filter_a = ArrowPredicateFn::new(
756 // claim to use both a and b so we get two ranges requests for the filter pages
757 ProjectionMask::columns(&schema_descr, ["a", "b"]),
758 |batch: RecordBatch| {
759 let scalar_250 = Int64Array::new_scalar(250);
760 let column = batch.column(0).as_primitive::<Int64Type>();
761 gt(column, &scalar_250)
762 },
763 );
764
765 let mut decoder = builder
766 .with_projection(
767 // read only column "a" to test that filter pages are reused
768 ProjectionMask::columns(&schema_descr, ["a"]), // read "a"
769 )
770 .with_row_filter(RowFilter::new(vec![Box::new(row_filter_a)]))
771 .build()
772 .unwrap();
773
774 // First row group, evaluating filters
775 let ranges = expect_needs_data(decoder.try_decode());
776 // only provide half the ranges
777 let (ranges1, ranges2) = ranges.split_at(ranges.len() / 2);
778 assert!(!ranges1.is_empty());
779 assert!(!ranges2.is_empty());
780 push_ranges_to_decoder(&mut decoder, ranges1.to_vec());
781 // still expect more data
782 let ranges = expect_needs_data(decoder.try_decode());
783 assert_eq!(ranges, ranges2); // should be the remaining ranges
784 let ranges = expect_needs_data(decoder.try_decode());
785 assert_eq!(ranges, ranges2); // should be the remaining ranges
786 push_ranges_to_decoder(&mut decoder, ranges2.to_vec());
787
788 // Since no rows in the first row group pass the filters, there is no
789 // additional requests to read data pages for "b" here
790
791 // Second row group
792 let ranges = expect_needs_data(decoder.try_decode());
793 push_ranges_to_decoder(&mut decoder, ranges);
794
795 let batch = expect_data(decoder.try_decode());
796 let expected = TEST_BATCH.slice(251, 149).project(&[0]).unwrap();
797 assert_eq!(batch, expected);
798
799 expect_finished(decoder.try_decode());
800 }
801
802 /// Decode with a filter where we also skip one of the RowGroups via a RowSelection
803 #[test]
804 fn test_decoder_single_filter_and_row_selection() {
805 let builder =
806 ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
807
808 // Values in column "a" range 0..399
809 // First filter: "a" > 250 (nothing in Row Group 0, last data page in Row Group 1)
810 let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
811
812 // a > 250
813 let row_filter_a = ArrowPredicateFn::new(
814 ProjectionMask::columns(&schema_descr, ["a"]),
815 |batch: RecordBatch| {
816 let scalar_250 = Int64Array::new_scalar(250);
817 let column = batch.column(0).as_primitive::<Int64Type>();
818 gt(column, &scalar_250)
819 },
820 );
821
822 let mut decoder = builder
823 .with_projection(
824 // read only column "a" to test that filter pages are reused
825 ProjectionMask::columns(&schema_descr, ["b"]), // read "b"
826 )
827 .with_row_filter(RowFilter::new(vec![Box::new(row_filter_a)]))
828 .with_row_selection(RowSelection::from(vec![
829 RowSelector::skip(200), // skip first row group
830 RowSelector::select(100), // first 100 rows of second row group
831 RowSelector::skip(100),
832 ]))
833 .build()
834 .unwrap();
835
836 // expect the first row group to be filtered out (no filter is evaluated due to row selection)
837
838 // First row group, first filter (a > 250)
839 let ranges = expect_needs_data(decoder.try_decode());
840 push_ranges_to_decoder(&mut decoder, ranges);
841
842 // Second row group
843 let ranges = expect_needs_data(decoder.try_decode());
844 push_ranges_to_decoder(&mut decoder, ranges);
845
846 let batch = expect_data(decoder.try_decode());
847 let expected = TEST_BATCH.slice(251, 49).project(&[1]).unwrap();
848 assert_eq!(batch, expected);
849
850 expect_finished(decoder.try_decode());
851 }
852
853 /// Decode with multiple filters that require multiple requests
854 #[test]
855 fn test_decoder_multi_filters() {
856 // Create a decoder for decoding parquet data (note it does not have any IO / readers)
857 let builder =
858 ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
859
860 // Values in column "a" range 0..399
861 // Values in column "b" range 400..799
862 // First filter: "a" > 175 (last data page in Row Group 0)
863 // Second filter: "b" < 625 (last data page in Row Group 0 and first DataPage in RowGroup 1)
864 let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
865
866 // a > 175
867 let row_filter_a = ArrowPredicateFn::new(
868 ProjectionMask::columns(&schema_descr, ["a"]),
869 |batch: RecordBatch| {
870 let scalar_175 = Int64Array::new_scalar(175);
871 let column = batch.column(0).as_primitive::<Int64Type>();
872 gt(column, &scalar_175)
873 },
874 );
875
876 // b < 625
877 let row_filter_b = ArrowPredicateFn::new(
878 ProjectionMask::columns(&schema_descr, ["b"]),
879 |batch: RecordBatch| {
880 let scalar_625 = Int64Array::new_scalar(625);
881 let column = batch.column(0).as_primitive::<Int64Type>();
882 lt(column, &scalar_625)
883 },
884 );
885
886 let mut decoder = builder
887 .with_projection(
888 ProjectionMask::columns(&schema_descr, ["c"]), // read "c"
889 )
890 .with_row_filter(RowFilter::new(vec![
891 Box::new(row_filter_a),
892 Box::new(row_filter_b),
893 ]))
894 .build()
895 .unwrap();
896
897 // First row group, first filter (a > 175)
898 let ranges = expect_needs_data(decoder.try_decode());
899 push_ranges_to_decoder(&mut decoder, ranges);
900
901 // first row group, second filter (b < 625)
902 let ranges = expect_needs_data(decoder.try_decode());
903 push_ranges_to_decoder(&mut decoder, ranges);
904
905 // first row group, data pages for "c"
906 let ranges = expect_needs_data(decoder.try_decode());
907 push_ranges_to_decoder(&mut decoder, ranges);
908
909 // expect the first batch to be decoded: rows 176..199, column "c"
910 let batch1 = expect_data(decoder.try_decode());
911 let expected1 = TEST_BATCH.slice(176, 24).project(&[2]).unwrap();
912 assert_eq!(batch1, expected1);
913
914 // Second row group, first filter (a > 175)
915 let ranges = expect_needs_data(decoder.try_decode());
916 push_ranges_to_decoder(&mut decoder, ranges);
917
918 // Second row group, second filter (b < 625)
919 let ranges = expect_needs_data(decoder.try_decode());
920 push_ranges_to_decoder(&mut decoder, ranges);
921
922 // Second row group, data pages for "c"
923 let ranges = expect_needs_data(decoder.try_decode());
924 push_ranges_to_decoder(&mut decoder, ranges);
925
926 // expect the second batch to be decoded: rows 200..224, column "c"
927 let batch2 = expect_data(decoder.try_decode());
928 let expected2 = TEST_BATCH.slice(200, 25).project(&[2]).unwrap();
929 assert_eq!(batch2, expected2);
930
931 expect_finished(decoder.try_decode());
932 }
933
934 /// Decode with a filter that uses a column that is also projected, and expect
935 /// that the filter pages are reused (don't refetch them)
936 #[test]
937 fn test_decoder_reuses_filter_pages() {
938 // Create a decoder for decoding parquet data (note it does not have any IO / readers)
939 let builder =
940 ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
941
942 // Values in column "a" range 0..399
943 // First filter: "a" > 250 (nothing in Row Group 0, last data page in Row Group 1)
944 let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
945
946 // a > 250
947 let row_filter_a = ArrowPredicateFn::new(
948 ProjectionMask::columns(&schema_descr, ["a"]),
949 |batch: RecordBatch| {
950 let scalar_250 = Int64Array::new_scalar(250);
951 let column = batch.column(0).as_primitive::<Int64Type>();
952 gt(column, &scalar_250)
953 },
954 );
955
956 let mut decoder = builder
957 .with_projection(
958 // read only column "a" to test that filter pages are reused
959 ProjectionMask::columns(&schema_descr, ["a"]), // read "a"
960 )
961 .with_row_filter(RowFilter::new(vec![Box::new(row_filter_a)]))
962 .build()
963 .unwrap();
964
965 // First row group, first filter (a > 175)
966 let ranges = expect_needs_data(decoder.try_decode());
967 push_ranges_to_decoder(&mut decoder, ranges);
968
969 // expect the first row group to be filtered out (no rows match)
970
971 // Second row group, first filter (a > 250)
972 let ranges = expect_needs_data(decoder.try_decode());
973 push_ranges_to_decoder(&mut decoder, ranges);
974
975 // expect that the second row group is decoded: rows 251..399, column "a"
976 // Note that the filter pages for "a" should be reused and no additional data
977 // should be requested
978 let batch = expect_data(decoder.try_decode());
979 let expected = TEST_BATCH.slice(251, 149).project(&[0]).unwrap();
980 assert_eq!(batch, expected);
981
982 expect_finished(decoder.try_decode());
983 }
984
985 #[test]
986 fn test_decoder_empty_filters() {
987 let builder =
988 ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata()).unwrap();
989 let schema_descr = builder.metadata().file_metadata().schema_descr_ptr();
990
991 // only read column "c", but with empty filters
992 let mut decoder = builder
993 .with_projection(
994 ProjectionMask::columns(&schema_descr, ["c"]), // read "c"
995 )
996 .with_row_filter(RowFilter::new(vec![
997 // empty filters should be ignored
998 ]))
999 .build()
1000 .unwrap();
1001
1002 // First row group
1003 let ranges = expect_needs_data(decoder.try_decode());
1004 push_ranges_to_decoder(&mut decoder, ranges);
1005
1006 // expect the first batch to be decoded: rows 0..199, column "c"
1007 let batch1 = expect_data(decoder.try_decode());
1008 let expected1 = TEST_BATCH.slice(0, 200).project(&[2]).unwrap();
1009 assert_eq!(batch1, expected1);
1010
1011 // Second row group,
1012 let ranges = expect_needs_data(decoder.try_decode());
1013 push_ranges_to_decoder(&mut decoder, ranges);
1014
1015 // expect the second batch to be decoded: rows 200..399, column "c"
1016 let batch2 = expect_data(decoder.try_decode());
1017 let expected2 = TEST_BATCH.slice(200, 200).project(&[2]).unwrap();
1018
1019 assert_eq!(batch2, expected2);
1020
1021 expect_finished(decoder.try_decode());
1022 }
1023
1024 #[test]
1025 fn test_decoder_offset_limit() {
1026 let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
1027 .unwrap()
1028 // skip entire first row group (200 rows) and first 25 rows of second row group
1029 .with_offset(225)
1030 // and limit to 20 rows
1031 .with_limit(20)
1032 .build()
1033 .unwrap();
1034
1035 // First row group should be skipped,
1036
1037 // Second row group
1038 let ranges = expect_needs_data(decoder.try_decode());
1039 push_ranges_to_decoder(&mut decoder, ranges);
1040
1041 // expect the first and only batch to be decoded
1042 let batch1 = expect_data(decoder.try_decode());
1043 let expected1 = TEST_BATCH.slice(225, 20);
1044 assert_eq!(batch1, expected1);
1045
1046 expect_finished(decoder.try_decode());
1047 }
1048
1049 #[test]
1050 fn test_decoder_row_group_selection() {
1051 // take only the second row group
1052 let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
1053 .unwrap()
1054 .with_row_groups(vec![1])
1055 .build()
1056 .unwrap();
1057
1058 // First row group should be skipped,
1059
1060 // Second row group
1061 let ranges = expect_needs_data(decoder.try_decode());
1062 push_ranges_to_decoder(&mut decoder, ranges);
1063
1064 // expect the first and only batch to be decoded
1065 let batch1 = expect_data(decoder.try_decode());
1066 let expected1 = TEST_BATCH.slice(200, 200);
1067 assert_eq!(batch1, expected1);
1068
1069 expect_finished(decoder.try_decode());
1070 }
1071
1072 #[test]
1073 fn test_decoder_row_selection() {
1074 // take only the second row group
1075 let mut decoder = ParquetPushDecoderBuilder::try_new_decoder(test_file_parquet_metadata())
1076 .unwrap()
1077 .with_row_selection(RowSelection::from(vec![
1078 RowSelector::skip(225), // skip first row group and 25 rows of second])
1079 RowSelector::select(20), // take 20 rows
1080 ]))
1081 .build()
1082 .unwrap();
1083
1084 // First row group should be skipped,
1085
1086 // Second row group
1087 let ranges = expect_needs_data(decoder.try_decode());
1088 push_ranges_to_decoder(&mut decoder, ranges);
1089
1090 // expect the first ane only batch to be decoded
1091 let batch1 = expect_data(decoder.try_decode());
1092 let expected1 = TEST_BATCH.slice(225, 20);
1093 assert_eq!(batch1, expected1);
1094
1095 expect_finished(decoder.try_decode());
1096 }
1097
1098 /// Returns a batch with 400 rows, with 3 columns: "a", "b", "c"
1099 ///
1100 /// Note c is a different types (so the data page sizes will be different)
1101 static TEST_BATCH: LazyLock<RecordBatch> = LazyLock::new(|| {
1102 let a: ArrayRef = Arc::new(Int64Array::from_iter_values(0..400));
1103 let b: ArrayRef = Arc::new(Int64Array::from_iter_values(400..800));
1104 let c: ArrayRef = Arc::new(StringViewArray::from_iter_values((0..400).map(|i| {
1105 if i % 2 == 0 {
1106 format!("string_{i}")
1107 } else {
1108 format!("A string larger than 12 bytes and thus not inlined {i}")
1109 }
1110 })));
1111
1112 RecordBatch::try_from_iter(vec![("a", a), ("b", b), ("c", c)]).unwrap()
1113 });
1114
1115 /// Create a parquet file in memory for testing.
1116 ///
1117 /// See [`TEST_BATCH`] for the data in the file.
1118 ///
1119 /// Each column is written in 4 data pages, each with 100 rows, across 2
1120 /// row groups. Each column in each row group has two data pages.
1121 ///
1122 /// The data is split across row groups like this
1123 ///
1124 /// Column | Values | Data Page | Row Group
1125 /// -------|------------------------|-----------|-----------
1126 /// a | 0..99 | 1 | 0
1127 /// a | 100..199 | 2 | 0
1128 /// a | 200..299 | 1 | 1
1129 /// a | 300..399 | 2 | 1
1130 ///
1131 /// b | 400..499 | 1 | 0
1132 /// b | 500..599 | 2 | 0
1133 /// b | 600..699 | 1 | 1
1134 /// b | 700..799 | 2 | 1
1135 ///
1136 /// c | "string_0".."string_99" | 1 | 0
1137 /// c | "string_100".."string_199" | 2 | 0
1138 /// c | "string_200".."string_299" | 1 | 1
1139 /// c | "string_300".."string_399" | 2 | 1
1140 static TEST_FILE_DATA: LazyLock<Bytes> = LazyLock::new(|| {
1141 let input_batch = &TEST_BATCH;
1142 let mut output = Vec::new();
1143
1144 let writer_options = WriterProperties::builder()
1145 .set_max_row_group_size(200)
1146 .set_data_page_row_count_limit(100)
1147 .build();
1148 let mut writer =
1149 ArrowWriter::try_new(&mut output, input_batch.schema(), Some(writer_options)).unwrap();
1150
1151 // since the limits are only enforced on batch boundaries, write the input
1152 // batch in chunks of 50
1153 let mut row_remain = input_batch.num_rows();
1154 while row_remain > 0 {
1155 let chunk_size = row_remain.min(50);
1156 let chunk = input_batch.slice(input_batch.num_rows() - row_remain, chunk_size);
1157 writer.write(&chunk).unwrap();
1158 row_remain -= chunk_size;
1159 }
1160 writer.close().unwrap();
1161 Bytes::from(output)
1162 });
1163
1164 /// Return the length of [`TEST_FILE_DATA`], in bytes
1165 fn test_file_len() -> u64 {
1166 TEST_FILE_DATA.len() as u64
1167 }
1168
1169 /// Return a range that covers the entire [`TEST_FILE_DATA`]
1170 fn test_file_range() -> Range<u64> {
1171 0..test_file_len()
1172 }
1173
1174 /// Return a slice of the test file data from the given range
1175 pub fn test_file_slice(range: Range<u64>) -> Bytes {
1176 let start: usize = range.start.try_into().unwrap();
1177 let end: usize = range.end.try_into().unwrap();
1178 TEST_FILE_DATA.slice(start..end)
1179 }
1180
1181 /// return the metadata for the test file
1182 pub fn test_file_parquet_metadata() -> Arc<crate::file::metadata::ParquetMetaData> {
1183 let mut metadata_decoder = ParquetMetaDataPushDecoder::try_new(test_file_len()).unwrap();
1184 push_ranges_to_metadata_decoder(&mut metadata_decoder, vec![test_file_range()]);
1185 let metadata = metadata_decoder.try_decode().unwrap();
1186 let DecodeResult::Data(metadata) = metadata else {
1187 panic!("Expected metadata to be decoded successfully");
1188 };
1189 Arc::new(metadata)
1190 }
1191
1192 /// Push the given ranges to the metadata decoder, simulating reading from a file
1193 fn push_ranges_to_metadata_decoder(
1194 metadata_decoder: &mut ParquetMetaDataPushDecoder,
1195 ranges: Vec<Range<u64>>,
1196 ) {
1197 let data = ranges
1198 .iter()
1199 .map(|range| test_file_slice(range.clone()))
1200 .collect::<Vec<_>>();
1201 metadata_decoder.push_ranges(ranges, data).unwrap();
1202 }
1203
1204 fn push_ranges_to_decoder(decoder: &mut ParquetPushDecoder, ranges: Vec<Range<u64>>) {
1205 let data = ranges
1206 .iter()
1207 .map(|range| test_file_slice(range.clone()))
1208 .collect::<Vec<_>>();
1209 decoder.push_ranges(ranges, data).unwrap();
1210 }
1211
1212 /// Expect that the [`DecodeResult`] is a [`DecodeResult::Data`] and return the corresponding element
1213 fn expect_data<T: Debug>(result: Result<DecodeResult<T>, ParquetError>) -> T {
1214 match result.expect("Expected Ok(DecodeResult::Data(T))") {
1215 DecodeResult::Data(data) => data,
1216 result => panic!("Expected DecodeResult::Data, got {result:?}"),
1217 }
1218 }
1219
1220 /// Expect that the [`DecodeResult`] is a [`DecodeResult::NeedsData`] and return the corresponding ranges
1221 fn expect_needs_data<T: Debug>(
1222 result: Result<DecodeResult<T>, ParquetError>,
1223 ) -> Vec<Range<u64>> {
1224 match result.expect("Expected Ok(DecodeResult::NeedsData{ranges})") {
1225 DecodeResult::NeedsData(ranges) => ranges,
1226 result => panic!("Expected DecodeResult::NeedsData, got {result:?}"),
1227 }
1228 }
1229
1230 fn expect_finished<T: Debug>(result: Result<DecodeResult<T>, ParquetError>) {
1231 match result.expect("Expected Ok(DecodeResult::Finished)") {
1232 DecodeResult::Finished => {}
1233 result => panic!("Expected DecodeResult::Finished, got {result:?}"),
1234 }
1235 }
1236}