arrow_json/reader/value_iter.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::io::BufRead;
19
20use arrow_schema::ArrowError;
21use serde_json::Value;
22
23/// JSON file reader that produces a serde_json::Value iterator from a Read trait
24///
25/// # Example
26///
27/// ```
28/// use std::fs::File;
29/// use std::io::BufReader;
30/// use arrow_json::reader::ValueIter;
31///
32/// let mut reader =
33/// BufReader::new(File::open("test/data/mixed_arrays.json").unwrap());
34/// let mut value_reader = ValueIter::new(&mut reader, None);
35/// for value in value_reader {
36/// println!("JSON value: {}", value.unwrap());
37/// }
38/// ```
39#[derive(Debug)]
40pub struct ValueIter<R: BufRead> {
41 reader: R,
42 max_read_records: Option<usize>,
43 record_count: usize,
44 // reuse line buffer to avoid allocation on each record
45 line_buf: String,
46}
47
48impl<R: BufRead> ValueIter<R> {
49 /// Creates a new `ValueIter`
50 pub fn new(reader: R, max_read_records: Option<usize>) -> Self {
51 Self {
52 reader,
53 max_read_records,
54 record_count: 0,
55 line_buf: String::new(),
56 }
57 }
58
59 /// Returns the number of records this iterator has consumed
60 pub fn record_count(&self) -> usize {
61 self.record_count
62 }
63}
64
65impl<R: BufRead> Iterator for ValueIter<R> {
66 type Item = Result<Value, ArrowError>;
67
68 fn next(&mut self) -> Option<Self::Item> {
69 if let Some(max) = self.max_read_records {
70 if self.record_count >= max {
71 return None;
72 }
73 }
74
75 loop {
76 self.line_buf.truncate(0);
77 match self.reader.read_line(&mut self.line_buf) {
78 Ok(0) => {
79 // read_line returns 0 when stream reached EOF
80 return None;
81 }
82 Err(e) => {
83 return Some(Err(ArrowError::JsonError(format!(
84 "Failed to read JSON record: {e}"
85 ))));
86 }
87 _ => {
88 let trimmed_s = self.line_buf.trim();
89 if trimmed_s.is_empty() {
90 // ignore empty lines
91 continue;
92 }
93
94 self.record_count += 1;
95 return Some(
96 serde_json::from_str(trimmed_s)
97 .map_err(|e| ArrowError::JsonError(format!("Not valid JSON: {e}"))),
98 );
99 }
100 }
101 }
102 }
103}