Streaming, Serialization, and IPC¶
Writing and Reading Streams¶
Arrow defines two types of binary formats for serializing record batches:
Streaming format: for sending an arbitrary length sequence of record batches. The format must be processed from start to end, and does not support random access
File or Random Access format: for serializing a fixed number of record batches. Supports random access, and thus is very useful when used with memory maps
To follow this section, make sure to first read the section on Memory and IO.
Using streams¶
First, let’s create a small record batch:
In [1]: import pyarrow as pa
In [2]: data = [
...: pa.array([1, 2, 3, 4]),
...: pa.array(['foo', 'bar', 'baz', None]),
...: pa.array([True, None, False, True])
...: ]
...:
In [3]: batch = pa.record_batch(data, names=['f0', 'f1', 'f2'])
In [4]: batch.num_rows
Out[4]: 4
In [5]: batch.num_columns
Out[5]: 3
Now, we can begin writing a stream containing some number of these batches. For
this we use RecordBatchStreamWriter
, which can write to a
writeable NativeFile
object or a writeable Python object. For convenience,
this one can be created with new_stream()
:
In [6]: sink = pa.BufferOutputStream()
In [7]: writer = pa.ipc.new_stream(sink, batch.schema)
Here we used an in-memory Arrow buffer stream, but this could have been a socket or some other IO sink.
When creating the StreamWriter
, we pass the schema, since the schema
(column names and types) must be the same for all of the batches sent in this
particular stream. Now we can do:
In [8]: for i in range(5):
...: writer.write_batch(batch)
...:
In [9]: writer.close()
In [10]: buf = sink.getvalue()
In [11]: buf.size
Out[11]: 1984
Now buf
contains the complete stream as an in-memory byte buffer. We can
read such a stream with RecordBatchStreamReader
or the
convenience function pyarrow.ipc.open_stream
:
In [12]: reader = pa.ipc.open_stream(buf)
In [13]: reader.schema
Out[13]:
f0: int64
f1: string
f2: bool
In [14]: batches = [b for b in reader]
In [15]: len(batches)
Out[15]: 5
We can check the returned batches are the same as the original input:
In [16]: batches[0].equals(batch)
Out[16]: True
An important point is that if the input source supports zero-copy reads
(e.g. like a memory map, or pyarrow.BufferReader
), then the returned
batches are also zero-copy and do not allocate any new memory on read.
Writing and Reading Random Access Files¶
The RecordBatchFileWriter
has the same API as
RecordBatchStreamWriter
. You can create one with
new_file()
:
In [17]: sink = pa.BufferOutputStream()
In [18]: writer = pa.ipc.new_file(sink, batch.schema)
In [19]: for i in range(10):
....: writer.write_batch(batch)
....:
In [20]: writer.close()
In [21]: buf = sink.getvalue()
In [22]: buf.size
Out[22]: 4226
The difference between RecordBatchFileReader
and
RecordBatchStreamReader
is that the input source must have a
seek
method for random access. The stream reader only requires read
operations. We can also use the open_file()
method to open a file:
In [23]: reader = pa.ipc.open_file(buf)
Because we have access to the entire payload, we know the number of record batches in the file, and can read any at random:
In [24]: reader.num_record_batches
Out[24]: 10
In [25]: b = reader.get_batch(3)
In [26]: b.equals(batch)
Out[26]: True
Reading from Stream and File Format for pandas¶
The stream and file reader classes have a special read_pandas
method to
simplify reading multiple record batches and converting them to a single
DataFrame output:
In [27]: df = pa.ipc.open_file(buf).read_pandas()
In [28]: df[:5]
Out[28]:
f0 f1 f2
0 1 foo True
1 2 bar None
2 3 baz False
3 4 None True
4 1 foo True
Arbitrary Object Serialization¶
Warning
The custom serialization functionality is deprecated in pyarrow 2.0, and will be removed in a future version.
While the serialization functions in this section utilize the Arrow stream
protocol internally, they do not produce data that is compatible with the
above ipc.open_file
and ipc.open_stream
functions.
For arbitrary objects, you can use the standard library pickle
functionality instead. For pyarrow objects, you can use the IPC
serialization format through the pyarrow.ipc
module, as explained
above.
In pyarrow
we are able to serialize and deserialize many kinds of Python
objects. While not a complete replacement for the pickle
module, these
functions can be significantly faster, particular when dealing with collections
of NumPy arrays.
As an example, consider a dictionary containing NumPy arrays:
In [29]: import numpy as np
In [30]: data = {
....: i: np.random.randn(500, 500)
....: for i in range(100)
....: }
....:
We use the pyarrow.serialize
function to convert this data to a byte
buffer:
In [31]: buf = pa.serialize(data).to_buffer()
In [32]: type(buf)
Out[32]: pyarrow.lib.Buffer
In [33]: buf.size
Out[33]: 200028928
pyarrow.serialize
creates an intermediate object which can be converted to
a buffer (the to_buffer
method) or written directly to an output stream.
pyarrow.deserialize
converts a buffer-like object back to the original
Python object:
In [34]: restored_data = pa.deserialize(buf)
In [35]: restored_data[0]
Out[35]:
array([[ 0.88051977, 0.55272191, 0.10212234, ..., -2.28750868,
0.87506778, 0.41497569],
[-0.06914791, 1.2891059 , -0.20746828, ..., -0.05183324,
0.02896555, 0.11629995],
[-0.13707509, -1.03307268, -1.73108249, ..., -0.29438554,
-1.31643833, 0.46146448],
...,
[ 0.35610601, 0.91661909, -0.44280949, ..., 0.53764271,
0.86447821, -0.4745175 ],
[-0.08632221, -0.4628601 , 0.13151095, ..., -1.88586565,
0.08840339, -0.86300602],
[ 0.09983648, -0.32873005, -0.03006915, ..., -1.23231303,
0.70042352, 0.52386661]])
When dealing with NumPy arrays, pyarrow.deserialize
can be significantly
faster than pickle
because the resulting arrays are zero-copy references
into the input buffer. The larger the arrays, the larger the performance
savings.
Consider this example, we have for pyarrow.deserialize
In [36]: %timeit restored_data = pa.deserialize(buf)
8.01 ms +- 12.6 us per loop (mean +- std. dev. of 7 runs, 100 loops each)
And for pickle:
In [37]: import pickle
In [38]: pickled = pickle.dumps(data)
In [39]: %timeit unpickled_data = pickle.loads(pickled)
77.7 ms +- 111 us per loop (mean +- std. dev. of 7 runs, 10 loops each)
We aspire to make these functions a high-speed alternative to pickle for transient serialization in Python big data applications.
Serializing Custom Data Types¶
If an unrecognized data type is encountered when serializing an object,
pyarrow
will fall back on using pickle
for converting that type to a
byte string. There may be a more efficient way, though.
Consider a class with two members, one of which is a NumPy array:
class MyData:
def __init__(self, name, data):
self.name = name
self.data = data
We write functions to convert this to and from a dictionary with simpler types:
def _serialize_MyData(val):
return {'name': val.name, 'data': val.data}
def _deserialize_MyData(data):
return MyData(data['name'], data['data']
then, we must register these functions in a SerializationContext
so that
MyData
can be recognized:
context = pa.SerializationContext()
context.register_type(MyData, 'MyData',
custom_serializer=_serialize_MyData,
custom_deserializer=_deserialize_MyData)
Lastly, we use this context as an additional argument to pyarrow.serialize
:
buf = pa.serialize(val, context=context).to_buffer()
restored_val = pa.deserialize(buf, context=context)
The SerializationContext
also has convenience methods serialize
and
deserialize
, so these are equivalent statements:
buf = context.serialize(val).to_buffer()
restored_val = context.deserialize(buf)
Component-based Serialization¶
For serializing Python objects containing some number of NumPy arrays, Arrow
buffers, or other data types, it may be desirable to transport their serialized
representation without having to produce an intermediate copy using the
to_buffer
method. To motivate this, suppose we have a list of NumPy arrays:
In [40]: import numpy as np
In [41]: data = [np.random.randn(10, 10) for i in range(5)]
The call pa.serialize(data)
does not copy the memory inside each of these
NumPy arrays. This serialized representation can be then decomposed into a
dictionary containing a sequence of pyarrow.Buffer
objects containing
metadata for each array and references to the memory inside the arrays. To do
this, use the to_components
method:
In [42]: serialized = pa.serialize(data)
In [43]: components = serialized.to_components()
The particular details of the output of to_components
are not too
important. The objects in the 'data'
field are pyarrow.Buffer
objects,
which are zero-copy convertible to Python memoryview
objects:
In [44]: memoryview(components['data'][0])
Out[44]: <memory at 0x7fea7e36f588>
A memoryview can be converted back to a Arrow Buffer
with
pyarrow.py_buffer
:
In [45]: mv = memoryview(components['data'][0])
In [46]: buf = pa.py_buffer(mv)
An object can be reconstructed from its component-based representation using
deserialize_components
:
In [47]: restored_data = pa.deserialize_components(components)
In [48]: restored_data[0]
Out[48]:
array([[-9.14759119e-01, 8.22531216e-01, 6.19196497e-01,
3.36062879e-01, -8.63270232e-01, 1.87095556e-04,
1.33073320e+00, -2.55078762e-01, 4.26005337e-01,
9.79837876e-01],
[-3.72272916e-01, 9.44288936e-01, 1.44571184e+00,
-6.81179373e-01, -1.13526919e+00, -1.71378417e+00,
8.77443673e-02, -6.63330214e-01, 4.34648788e-01,
-5.33136112e-03],
[ 4.93206954e-01, 1.57363757e+00, 4.80429045e-01,
7.99543226e-01, -1.34740056e+00, 1.73131392e-01,
4.08318436e-01, -9.83264701e-01, -5.15108592e-01,
-2.51421849e-01],
[-2.27694036e-01, -1.31966996e-01, -1.97169052e+00,
1.28018067e+00, -2.17894070e-01, 3.68075751e-01,
2.47569038e-01, -7.47686355e-01, 1.56570559e+00,
-9.30169292e-01],
[-1.16160547e+00, -1.60687452e-01, -1.19222093e+00,
1.03886084e+00, -6.47160975e-01, 4.44812911e-01,
1.83531467e-01, -1.56010330e+00, -1.78361302e+00,
1.56711887e+00],
[ 8.29678638e-01, -2.76625345e-01, -1.39475599e+00,
-1.11105425e+00, 5.36469600e-01, -3.58585224e-01,
6.24194328e-01, 2.84876128e+00, -1.25507586e+00,
1.08762231e+00],
[-4.08995903e-01, 3.15652298e-01, 6.87224280e-01,
9.14969610e-01, 9.28179123e-01, 4.70771248e-01,
-2.30528020e+00, -8.13865317e-01, -9.37075336e-01,
1.48284172e-01],
[-4.24746822e-01, -4.84351539e-01, -1.48274864e+00,
-7.33787574e-01, -1.07289210e+00, 8.26679927e-01,
-1.35221475e+00, -2.16847620e-01, 1.01496159e+00,
1.53277643e+00],
[ 2.72949922e-01, -2.35721987e+00, 9.45241513e-01,
-3.44553200e-01, -1.21805043e+00, 3.43965292e+00,
-1.23515448e+00, 6.00958345e-02, 1.00742869e+00,
1.21356132e+00],
[ 5.71791844e-01, 1.41221910e+00, -1.81831150e+00,
5.67694490e-01, -4.91102046e-01, 9.75450988e-01,
1.54468233e+00, -9.00342339e-01, 1.72797085e-01,
9.22262251e-01]])
deserialize_components
is also available as a method on
SerializationContext
objects.
Serializing pandas Objects¶
The default serialization context has optimized handling of pandas
objects like DataFrame
and Series
. Combined with component-based
serialization above, this enables zero-copy transport of pandas DataFrame
objects not containing any Python objects:
In [49]: import pandas as pd
In [50]: df = pd.DataFrame({'a': [1, 2, 3, 4, 5]})
In [51]: context = pa.default_serialization_context()
In [52]: serialized_df = context.serialize(df)
In [53]: df_components = serialized_df.to_components()
In [54]: original_df = context.deserialize_components(df_components)
In [55]: original_df
Out[55]:
a
0 1
1 2
2 3
3 4
4 5