Streaming, Serialization, and IPC

Writing and Reading Streams

Arrow defines two types of binary formats for serializing record batches:

  • Streaming format: for sending an arbitrary length sequence of record batches. The format must be processed from start to end, and does not support random access

  • File or Random Access format: for serializing a fixed number of record batches. Supports random access, and thus is very useful when used with memory maps

To follow this section, make sure to first read the section on Memory and IO.

Using streams

First, let’s create a small record batch:

In [1]: import pyarrow as pa

In [2]: data = [
   ...:     pa.array([1, 2, 3, 4]),
   ...:     pa.array(['foo', 'bar', 'baz', None]),
   ...:     pa.array([True, None, False, True])
   ...: ]
   ...: 

In [3]: batch = pa.record_batch(data, names=['f0', 'f1', 'f2'])

In [4]: batch.num_rows
Out[4]: 4

In [5]: batch.num_columns
Out[5]: 3

Now, we can begin writing a stream containing some number of these batches. For this we use RecordBatchStreamWriter, which can write to a writeable NativeFile object or a writeable Python object. For convenience, this one can be created with new_stream():

In [6]: sink = pa.BufferOutputStream()

In [7]: with pa.ipc.new_stream(sink, batch.schema) as writer:
   ...:    for i in range(5):
   ...:       writer.write_batch(batch)
   ...: 

Here we used an in-memory Arrow buffer stream (sink), but this could have been a socket or some other IO sink.

When creating the StreamWriter, we pass the schema, since the schema (column names and types) must be the same for all of the batches sent in this particular stream. Now we can do:

In [8]: buf = sink.getvalue()

In [9]: buf.size
Out[9]: 1984

Now buf contains the complete stream as an in-memory byte buffer. We can read such a stream with RecordBatchStreamReader or the convenience function pyarrow.ipc.open_stream:

In [10]: with pa.ipc.open_stream(buf) as reader:
   ....:       schema = reader.schema
   ....:       batches = [b for b in reader]
   ....: 

In [11]: schema
Out[11]: 
f0: int64
f1: string
f2: bool

In [12]: len(batches)
Out[12]: 5

We can check the returned batches are the same as the original input:

In [13]: batches[0].equals(batch)
Out[13]: True

An important point is that if the input source supports zero-copy reads (e.g. like a memory map, or pyarrow.BufferReader), then the returned batches are also zero-copy and do not allocate any new memory on read.

Writing and Reading Random Access Files

The RecordBatchFileWriter has the same API as RecordBatchStreamWriter. You can create one with new_file():

In [14]: sink = pa.BufferOutputStream()

In [15]: with pa.ipc.new_file(sink, batch.schema) as writer:
   ....:    for i in range(10):
   ....:       writer.write_batch(batch)
   ....: 

In [16]: buf = sink.getvalue()

In [17]: buf.size
Out[17]: 4226

The difference between RecordBatchFileReader and RecordBatchStreamReader is that the input source must have a seek method for random access. The stream reader only requires read operations. We can also use the open_file() method to open a file:

In [18]: with pa.ipc.open_file(buf) as reader:
   ....:    num_record_batches = reader.num_record_batches
   ....: 

In [19]: b = reader.get_batch(3)

Because we have access to the entire payload, we know the number of record batches in the file, and can read any at random.

In [20]: num_record_batches
Out[20]: 10

In [21]: b.equals(batch)
Out[21]: True

Reading from Stream and File Format for pandas

The stream and file reader classes have a special read_pandas method to simplify reading multiple record batches and converting them to a single DataFrame output:

In [22]: with pa.ipc.open_file(buf) as reader:
   ....:    df = reader.read_pandas()
   ....: 

In [23]: df[:5]
Out[23]: 
   f0    f1     f2
0   1   foo   True
1   2   bar   None
2   3   baz  False
3   4  None   True
4   1   foo   True

Efficiently Writing and Reading Arrow Data

Being optimized for zero copy and memory mapped data, Arrow allows to easily read and write arrays consuming the minimum amount of resident memory.

When writing and reading raw Arrow data, we can use the Arrow File Format or the Arrow Streaming Format.

To dump an array to file, you can use the new_file() which will provide a new RecordBatchFileWriter instance that can be used to write batches of data to that file.

For example to write an array of 10M integers, we could write it in 1000 chunks of 10000 entries:

In [24]: BATCH_SIZE = 10000

In [25]: NUM_BATCHES = 1000

In [26]: schema = pa.schema([pa.field('nums', pa.int32())])

In [27]: with pa.OSFile('bigfile.arrow', 'wb') as sink:
   ....:    with pa.ipc.new_file(sink, schema) as writer:
   ....:       for row in range(NUM_BATCHES):
   ....:             batch = pa.record_batch([pa.array(range(BATCH_SIZE), type=pa.int32())], schema)
   ....:             writer.write(batch)
   ....: 

record batches support multiple columns, so in practice we always write the equivalent of a Table.

Writing in batches is effective because we in theory need to keep in memory only the current batch we are writing. But when reading back, we can be even more effective by directly mapping the data from disk and avoid allocating any new memory on read.

Under normal conditions, reading back our file will consume a few hundred megabytes of memory:

In [28]: with pa.OSFile('bigfile.arrow', 'rb') as source:
   ....:    loaded_array = pa.ipc.open_file(source).read_all()
   ....: 

In [29]: print("LEN:", len(loaded_array))
LEN: 10000000

In [30]: print("RSS: {}MB".format(pa.total_allocated_bytes() >> 20))
RSS: 38MB

To more efficiently read big data from disk, we can memory map the file, so that Arrow can directly reference the data mapped from disk and avoid having to allocate its own memory. In such case the operating system will be able to page in the mapped memory lazily and page it out without any write back cost when under pressure, allowing to more easily read arrays bigger than the total memory.

In [31]: with pa.memory_map('bigfile.arrow', 'rb') as source:
   ....:    loaded_array = pa.ipc.open_file(source).read_all()
   ....: 

In [32]: print("LEN:", len(loaded_array))
LEN: 10000000

In [33]: print("RSS: {}MB".format(pa.total_allocated_bytes() >> 20))
RSS: 0MB

Note

Other high level APIs like read_table() also provide a memory_map option. But in those cases, the memory mapping can’t help with reducing resident memory consumption. See Reading Parquet and Memory Mapping for details.