Skip to main content

arrow_json/reader/
value_iter.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::io::BufRead;
19
20use arrow_schema::ArrowError;
21use serde_json::Value;
22
23/// JSON file reader that produces a serde_json::Value iterator from a Read trait
24///
25/// # Example
26///
27/// ```
28/// use std::fs::File;
29/// use std::io::BufReader;
30/// use arrow_json::reader::ValueIter;
31///
32/// let mut reader =
33///     BufReader::new(File::open("test/data/mixed_arrays.json").unwrap());
34/// let mut value_reader = ValueIter::new(&mut reader, None);
35/// for value in value_reader {
36///     println!("JSON value: {}", value.unwrap());
37/// }
38/// ```
39#[derive(Debug)]
40pub struct ValueIter<R: BufRead> {
41    reader: R,
42    max_read_records: Option<usize>,
43    record_count: usize,
44    // reuse line buffer to avoid allocation on each record
45    line_buf: String,
46}
47
48impl<R: BufRead> ValueIter<R> {
49    /// Creates a new `ValueIter`
50    pub fn new(reader: R, max_read_records: Option<usize>) -> Self {
51        Self {
52            reader,
53            max_read_records,
54            record_count: 0,
55            line_buf: String::new(),
56        }
57    }
58
59    /// Returns the number of records this iterator has consumed
60    pub fn record_count(&self) -> usize {
61        self.record_count
62    }
63}
64
65impl<R: BufRead> Iterator for ValueIter<R> {
66    type Item = Result<Value, ArrowError>;
67
68    fn next(&mut self) -> Option<Self::Item> {
69        if let Some(max) = self.max_read_records {
70            if self.record_count >= max {
71                return None;
72            }
73        }
74
75        loop {
76            self.line_buf.truncate(0);
77            match self.reader.read_line(&mut self.line_buf) {
78                Ok(0) => {
79                    // read_line returns 0 when stream reached EOF
80                    return None;
81                }
82                Err(e) => {
83                    return Some(Err(ArrowError::JsonError(format!(
84                        "Failed to read JSON record: {e}"
85                    ))));
86                }
87                _ => {
88                    let trimmed_s = self.line_buf.trim();
89                    if trimmed_s.is_empty() {
90                        // ignore empty lines
91                        continue;
92                    }
93
94                    self.record_count += 1;
95                    return Some(
96                        serde_json::from_str(trimmed_s)
97                            .map_err(|e| ArrowError::JsonError(format!("Not valid JSON: {e}"))),
98                    );
99                }
100            }
101        }
102    }
103}