IPC: Fast Streaming and Serialization

Arrow defines two types of binary formats for serializing record batches:

  • Streaming format: for sending an arbitrary length sequence of record batches. The format must be processed from start to end, and does not support random access
  • File or Random Access format: for serializing a fixed number of record batches. Supports random access, and thus is very useful when used with memory maps

To follow this section, make sure to first read the section on Memory and IO.

Writing and Reading Streams

First, let’s create a small record batch:

In [1]: import pyarrow as pa

In [2]: data = [
   ...:     pa.array([1, 2, 3, 4]),
   ...:     pa.array(['foo', 'bar', 'baz', None]),
   ...:     pa.array([True, None, False, True])
   ...: ]
   ...: 

In [3]: batch = pa.RecordBatch.from_arrays(data, ['f0', 'f1', 'f2'])

In [4]: batch.num_rows
Out[4]: 4

In [5]: batch.num_columns
Out[5]: 3

Now, we can begin writing a stream containing some number of these batches. For this we use RecordBatchStreamWriter, which can write to a writeable NativeFile object or a writeable Python object:

In [6]: sink = pa.BufferOutputStream()

In [7]: writer = pa.RecordBatchStreamWriter(sink, batch.schema)

Here we used an in-memory Arrow buffer stream, but this could have been a socket or some other IO sink.

When creating the StreamWriter, we pass the schema, since the schema (column names and types) must be the same for all of the batches sent in this particular stream. Now we can do:

In [8]: for i in range(5):
   ...:    writer.write_batch(batch)
   ...: 

In [9]: writer.close()

In [10]: buf = sink.get_result()

In [11]: buf.size
Out[11]: 3820

Now buf contains the complete stream as an in-memory byte buffer. We can read such a stream with RecordBatchStreamReader or the convenience function pyarrow.open_stream:

In [12]: reader = pa.open_stream(buf)

In [13]: reader.schema
Out[13]: 
f0: int64
f1: string
f2: bool
-- metadata --

In [14]: batches = [b for b in reader]

In [15]: len(batches)
Out[15]: 5

We can check the returned batches are the same as the original input:

In [16]: batches[0].equals(batch)
Out[16]: True

An important point is that if the input source supports zero-copy reads (e.g. like a memory map, or pyarrow.BufferReader), then the returned batches are also zero-copy and do not allocate any new memory on read.

Writing and Reading Random Access Files

The RecordBatchFileWriter has the same API as RecordBatchStreamWriter:

In [17]: sink = pa.BufferOutputStream()

In [18]: writer = pa.RecordBatchFileWriter(sink, batch.schema)

In [19]: for i in range(10):
   ....:    writer.write_batch(batch)
   ....: 

In [20]: writer.close()

In [21]: buf = sink.get_result()

In [22]: buf.size
Out[22]: 7914

The difference between RecordBatchFileReader and RecordBatchStreamReader is that the input source must have a seek method for random access. The stream reader only requires read operations. We can also use the pyarrow.open_file method to open a file:

In [23]: reader = pa.open_file(buf)

Because we have access to the entire payload, we know the number of record batches in the file, and can read any at random:

In [24]: reader.num_record_batches
Out[24]: 10

In [25]: b = reader.get_batch(3)

In [26]: b.equals(batch)
Out[26]: True