arrow_json

Module reader

source
Expand description

JSON reader

This JSON reader allows JSON records to be read into the Arrow memory model. Records are loaded in batches and are then converted from the record-oriented representation to the columnar arrow data model.

The reader ignores whitespace between JSON values, including \n and \r, allowing parsing of sequences of one or more arbitrarily formatted JSON values, including but not limited to newline-delimited JSON.

ยงBasic Usage

Reader can be used directly with synchronous data sources, such as std::fs::File


let schema = Arc::new(Schema::new(vec![
    Field::new("a", DataType::Float64, false),
    Field::new("b", DataType::Float64, false),
    Field::new("c", DataType::Boolean, true),
]));

let file = File::open("test/data/basic.json").unwrap();

let mut json = arrow_json::ReaderBuilder::new(schema).build(BufReader::new(file)).unwrap();
let batch = json.next().unwrap().unwrap();

ยงAsync Usage

The lower-level Decoder can be integrated with various forms of async data streams, and is designed to be agnostic to the various different kinds of async IO primitives found within the Rust ecosystem.

For example, see below for how it can be used with an arbitrary Stream of Bytes

fn decode_stream<S: Stream<Item = Bytes> + Unpin>(
    mut decoder: Decoder,
    mut input: S,
) -> impl Stream<Item = Result<RecordBatch, ArrowError>> {
    let mut buffered = Bytes::new();
    futures::stream::poll_fn(move |cx| {
        loop {
            if buffered.is_empty() {
                buffered = match ready!(input.poll_next_unpin(cx)) {
                    Some(b) => b,
                    None => break,
                };
            }
            let decoded = match decoder.decode(buffered.as_ref()) {
                Ok(decoded) => decoded,
                Err(e) => return Poll::Ready(Some(Err(e))),
            };
            let read = buffered.len();
            buffered.advance(decoded);
            if decoded != read {
                break
            }
        }

        Poll::Ready(decoder.flush().transpose())
    })
}

In a similar vein, it can also be used with tokio-based IO primitives

fn decode_stream<R: AsyncBufRead + Unpin>(
    mut decoder: Decoder,
    mut reader: R,
) -> impl Stream<Item = Result<RecordBatch, ArrowError>> {
    futures::stream::poll_fn(move |cx| {
        loop {
            let b = match ready!(Pin::new(&mut reader).poll_fill_buf(cx)) {
                Ok(b) if b.is_empty() => break,
                Ok(b) => b,
                Err(e) => return Poll::Ready(Some(Err(e.into()))),
            };
            let read = b.len();
            let decoded = match decoder.decode(b) {
                Ok(decoded) => decoded,
                Err(e) => return Poll::Ready(Some(Err(e))),
            };
            Pin::new(&mut reader).consume(decoded);
            if decoded != read {
                break;
            }
        }

        Poll::Ready(decoder.flush().transpose())
    })
}

Modulesยง

Macrosยง

Structsยง

  • A low-level interface for reading JSON data from a byte stream
  • Reads JSON data with a known schema directly into arrow [RecordBatch]
  • A builder for Reader and Decoder
  • JSON file reader that produces a serde_json::Value iterator from a Read trait

Traitsยง

Functionsยง

  • Infer the fields of a JSON file by reading the first n records of the buffer, with max_read_records controlling the maximum number of records to read.
  • Infer the fields of a JSON file by reading all items from the JSON Value Iterator.
  • Infer the fields of a JSON file by reading the first n records of the file, with max_read_records controlling the maximum number of records to read.
  • make_decoder ๐Ÿ”’