Project News and Blog


Fast Python Serialization with Ray and Apache Arrow

Published 15 Oct 2017
By (Philipp Moritz, Robert Nishihara)

This was originally posted on the Ray blog. Philipp Moritz and Robert Nishihara are graduate students at UC Berkeley.

This post elaborates on the integration between Ray and Apache Arrow. The main problem this addresses is data serialization.

From Wikipedia, serialization is

… the process of translating data structures or object state into a format that can be stored … or transmitted … and reconstructed later (possibly in a different computer environment).

Why is any translation necessary? Well, when you create a Python object, it may have pointers to other Python objects, and these objects are all allocated in different regions of memory, and all of this has to make sense when unpacked by another process on another machine.

Serialization and deserialization are bottlenecks in parallel and distributed computing, especially in machine learning applications with large objects and large quantities of data.

Design Goals

As Ray is optimized for machine learning and AI applications, we have focused a lot on serialization and data handling, with the following design goals:

  1. It should be very efficient with large numerical data (this includes NumPy arrays and Pandas DataFrames, as well as objects that recursively contain Numpy arrays and Pandas DataFrames).
  2. It should be about as fast as Pickle for general Python types.
  3. It should be compatible with shared memory, allowing multiple processes to use the same data without copying it.
  4. Deserialization should be extremely fast (when possible, it should not require reading the entire serialized object).
  5. It should be language independent (eventually we’d like to enable Python workers to use objects created by workers in Java or other languages and vice versa).

Our Approach and Alternatives

The go-to serialization approach in Python is the pickle module. Pickle is very general, especially if you use variants like cloudpickle. However, it does not satisfy requirements 1, 3, 4, or 5. Alternatives like json satisfy 5, but not 1-4.

Our Approach: To satisfy requirements 1-5, we chose to use the Apache Arrow format as our underlying data representation. In collaboration with the Apache Arrow team, we built libraries for mapping general Python objects to and from the Arrow format. Some properties of this approach:

Alternatives to Arrow: We could have built on top of Protocol Buffers, but protocol buffers really isn’t designed for numerical data, and that approach wouldn’t satisfy 1, 3, or 4. Building on top of Flatbuffers actually could be made to work, but it would have required implementing a lot of the facilities that Arrow already has and we preferred a columnar data layout more optimized for big data.

Speedups

Here we show some performance improvements over Python’s pickle module. The experiments were done using pickle.HIGHEST_PROTOCOL. Code for generating these plots is included at the end of the post.

With NumPy arrays: In machine learning and AI applications, data (e.g., images, neural network weights, text documents) are typically represented as data structures containing NumPy arrays. When using NumPy arrays, the speedups are impressive.

The fact that the Ray bars for deserialization are barely visible is not a mistake. This is a consequence of the support for zero-copy reads (the savings largely come from the lack of memory movement).

Note that the biggest wins are with deserialization. The speedups here are multiple orders of magnitude and get better as the NumPy arrays get larger (thanks to design goals 1, 3, and 4). Making deserialization fast is important for two reasons. First, an object may be serialized once and then deserialized many times (e.g., an object that is broadcast to all workers). Second, a common pattern is for many objects to be serialized in parallel and then aggregated and deserialized one at a time on a single worker making deserialization the bottleneck.

Without NumPy arrays: When using regular Python objects, for which we cannot take advantage of shared memory, the results are comparable to pickle.

These are just a few examples of interesting Python objects. The most important case is the case where NumPy arrays are nested within other objects. Note that our serialization library works with very general Python types including custom Python classes and deeply nested objects.

The API

The serialization library can be used directly through pyarrow as follows. More documentation is available here.

x = [(1, 2), 'hello', 3, 4, np.array([5.0, 6.0])]
serialized_x = pyarrow.serialize(x).to_buffer()
deserialized_x = pyarrow.deserialize(serialized_x)

It can be used directly through the Ray API as follows.

x = [(1, 2), 'hello', 3, 4, np.array([5.0, 6.0])]
x_id = ray.put(x)
deserialized_x = ray.get(x_id)

Data Representation

We use Apache Arrow as the underlying language-independent data layout. Objects are stored in two parts: a schema and a data blob. At a high level, the data blob is roughly a flattened concatenation of all of the data values recursively contained in the object, and the schema defines the types and nesting structure of the data blob.

Technical Details: Python sequences (e.g., dictionaries, lists, tuples, sets) are encoded as Arrow UnionArrays of other types (e.g., bools, ints, strings, bytes, floats, doubles, date64s, tensors (i.e., NumPy arrays), lists, tuples, dicts and sets). Nested sequences are encoded using Arrow ListArrays. All tensors are collected and appended to the end of the serialized object, and the UnionArray contains references to these tensors.

To give a concrete example, consider the following object.

[(1, 2), 'hello', 3, 4, np.array([5.0, 6.0])]

It would be represented in Arrow with the following structure.

UnionArray(type_ids=[tuple, string, int, int, ndarray],
           tuples=ListArray(offsets=[0, 2],
                            UnionArray(type_ids=[int, int],
                                       ints=[1, 2])),
           strings=['hello'],
           ints=[3, 4],
           ndarrays=[<offset of numpy array>])

Arrow uses Flatbuffers to encode serialized schemas. Using only the schema, we can compute the offsets of each value in the data blob without scanning through the data blob (unlike Pickle, this is what enables fast deserialization). This means that we can avoid copying or otherwise converting large arrays and other values during deserialization. Tensors are appended at the end of the UnionArray and can be efficiently shared and accessed using shared memory.

Note that the actual object would be laid out in memory as shown below.

The layout of a Python object in the heap. Each box is allocated in a different memory region, and arrows between boxes represent pointers.


The Arrow serialized representation would be as follows.

The memory layout of the Arrow-serialized object.


Getting Involved

We welcome contributions, especially in the following areas.

Reproducing the Figures Above

For reference, the figures can be reproduced with the following code. Benchmarking ray.put and ray.get instead of pyarrow.serialize and pyarrow.deserialize gives similar figures. The plots were generated at this commit.

import pickle
import pyarrow
import matplotlib.pyplot as plt
import numpy as np
import timeit


def benchmark_object(obj, number=10):
    # Time serialization and deserialization for pickle.
    pickle_serialize = timeit.timeit(
        lambda: pickle.dumps(obj, protocol=pickle.HIGHEST_PROTOCOL),
        number=number)
    serialized_obj = pickle.dumps(obj, pickle.HIGHEST_PROTOCOL)
    pickle_deserialize = timeit.timeit(lambda: pickle.loads(serialized_obj),
                                       number=number)

    # Time serialization and deserialization for Ray.
    ray_serialize = timeit.timeit(
        lambda: pyarrow.serialize(obj).to_buffer(), number=number)
    serialized_obj = pyarrow.serialize(obj).to_buffer()
    ray_deserialize = timeit.timeit(
        lambda: pyarrow.deserialize(serialized_obj), number=number)

    return [[pickle_serialize, pickle_deserialize],
            [ray_serialize, ray_deserialize]]


def plot(pickle_times, ray_times, title, i):
    fig, ax = plt.subplots()
    fig.set_size_inches(3.8, 2.7)

    bar_width = 0.35
    index = np.arange(2)
    opacity = 0.6

    plt.bar(index, pickle_times, bar_width,
            alpha=opacity, color='r', label='Pickle')

    plt.bar(index + bar_width, ray_times, bar_width,
            alpha=opacity, color='c', label='Ray')

    plt.title(title, fontweight='bold')
    plt.ylabel('Time (seconds)', fontsize=10)
    labels = ['serialization', 'deserialization']
    plt.xticks(index + bar_width / 2, labels, fontsize=10)
    plt.legend(fontsize=10, bbox_to_anchor=(1, 1))
    plt.tight_layout()
    plt.yticks(fontsize=10)
    plt.savefig('plot-' + str(i) + '.png', format='png')


test_objects = [
    [np.random.randn(50000) for i in range(100)],
    {'weight-' + str(i): np.random.randn(50000) for i in range(100)},
    {i: set(['string1' + str(i), 'string2' + str(i)]) for i in range(100000)},
    [str(i) for i in range(200000)]
]

titles = [
    'List of large numpy arrays',
    'Dictionary of large numpy arrays',
    'Large dictionary of small sets',
    'Large list of strings'
]

for i in range(len(test_objects)):
    plot(*benchmark_object(test_objects[i]), titles[i], i)

Apache Arrow 0.7.0 Release

Published 19 Sep 2017
By Wes McKinney (wesm)

The Apache Arrow team is pleased to announce the 0.7.0 release. It includes 133 resolved JIRAs many new features and bug fixes to the various language implementations. The Arrow memory format remains stable since the 0.3.x release.

See the Install Page to learn how to get the libraries for your platform. The complete changelog is also available.

We include some highlights from the release in this post.

New PMC Member: Kouhei Sutou

Since the last release we have added Kou to the Arrow Project Management Committee. He is also a PMC for Apache Subversion, and a major contributor to many other open source projects.

As an active member of the Ruby community in Japan, Kou has been developing the GLib-based C bindings for Arrow with associated Ruby wrappers, to enable Ruby users to benefit from the work that’s happening in Apache Arrow.

We are excited to be collaborating with the Ruby community on shared infrastructure for in-memory analytics and data science.

Expanded JavaScript (TypeScript) Implementation

Paul Taylor from the Falcor and ReactiveX projects has worked to expand the JavaScript implementation (which is written in TypeScript), using the latest in modern JavaScript build and packaging technology. We are looking forward to building out the JS implementation and bringing it up to full functionality with the C++ and Java implementations.

We are looking for more JavaScript developers to join the project and work together to make Arrow for JS work well with many kinds of front end use cases, like real time data visualization.

Type casting for C++ and Python

As part of longer-term efforts to build an Arrow-native in-memory analytics library, we implemented a variety of type conversion functions. These functions are essential in ETL tasks when conforming one table schema to another. These are similar to the astype function in NumPy.

In [17]: import pyarrow as pa

In [18]: arr = pa.array([True, False, None, True])

In [19]: arr
Out[19]:
<pyarrow.lib.BooleanArray object at 0x7ff6fb069b88>
[
  True,
  False,
  NA,
  True
]

In [20]: arr.cast(pa.int32())
Out[20]:
<pyarrow.lib.Int32Array object at 0x7ff6fb0383b8>
[
  1,
  0,
  NA,
  1
]

Over time these will expand to support as many input-and-output type combinations with optimized conversions.

New Arrow GPU (CUDA) Extension Library for C++

To help with GPU-related projects using Arrow, like the GPU Open Analytics Initiative, we have started a C++ add-on library to simplify Arrow memory management on CUDA-enabled graphics cards. We would like to expand this to include a library of reusable CUDA kernel functions for GPU analytics on Arrow columnar memory.

For example, we could write a record batch from CPU memory to GPU device memory like so (some error checking omitted):

#include <arrow/api.h>
#include <arrow/gpu/cuda_api.h>

using namespace arrow;

gpu::CudaDeviceManager* manager;
std::shared_ptr<gpu::CudaContext> context;

gpu::CudaDeviceManager::GetInstance(&manager)
manager_->GetContext(kGpuNumber, &context);

std::shared_ptr<RecordBatch> batch = GetCpuData();

std::shared_ptr<gpu::CudaBuffer> device_serialized;
gpu::SerializeRecordBatch(*batch, context_.get(), &device_serialized));

We can then “read” the GPU record batch, but the returned arrow::RecordBatch internally will contain GPU device pointers that you can use for CUDA kernel calls:

std::shared_ptr<RecordBatch> device_batch;
gpu::ReadRecordBatch(batch->schema(), device_serialized,
                     default_memory_pool(), &device_batch));

// Now run some CUDA kernels on device_batch

Decimal Integration Tests

Phillip Cloud has been working on decimal support in C++ to enable Parquet read/write support in C++ and Python, and also end-to-end testing against the Arrow Java libraries.

In the upcoming releases, we hope to complete the remaining data types that need end-to-end testing between Java and C++:

Other Notable Python Changes

Some highlights of Python development outside of bug fixes and general API improvements include:

The Road Ahead

Upcoming Arrow releases will continue to expand the project to cover more use cases. In addition to completing end-to-end testing for all the major data types, some of us will be shifting attention to building Arrow-native in-memory analytics libraries.

We are looking for more JavaScript, R, and other programming language developers to join the project and expand the available implementations and bindings to more languages.

Apache Arrow 0.6.0 Release

Published 16 Aug 2017
By Wes McKinney (wesm)

The Apache Arrow team is pleased to announce the 0.6.0 release. It includes 90 resolved JIRAs with the new Plasma shared memory object store, and improvements and bug fixes to the various language implementations. The Arrow memory format remains stable since the 0.3.x release.

See the Install Page to learn how to get the libraries for your platform. The complete changelog is also available.

Plasma Shared Memory Object Store

This release includes the Plasma Store, which you can read more about in the linked blog post. This system was originally developed as part of the Ray Project at the UC Berkeley RISELab. We recognized that Plasma would be highly valuable to the Arrow community as a tool for shared memory management and zero-copy deserialization. Additionally, we believe we will be able to develop a stronger software stack through sharing of IO and buffer management code.

The Plasma store is a server application which runs as a separate process. A reference C++ client, with Python bindings, is made available in this release. Clients can be developed in Java or other languages in the future to enable simple sharing of complex datasets through shared memory.

Arrow Format Addition: Map type

We added a Map logical type to represent ordered and unordered maps in-memory. This corresponds to the MAP logical type annotation in the Parquet format (where maps are represented as repeated structs).

Map is represented as a list of structs. It is the first example of a logical type whose physical representation is a nested type. We have not yet created implementations of Map containers in any of the implementations, but this can be done in a future release.

As an example, the Python data:

data = [{'a': 1, 'bb': 2, 'cc': 3}, {'dddd': 4}]

Could be represented in an Arrow Map<String, Int32> as:

Map<String, Int32> = List<Struct<keys: String, values: Int32>>
  is_valid: [true, true]
  offsets: [0, 3, 4]
  values: Struct<keys: String, values: Int32>
    children:
      - keys: String
          is_valid: [true, true, true, true]
          offsets: [0, 1, 3, 5, 9]
          data: abbccdddd
      - values: Int32
          is_valid: [true, true, true, true]
          data: [1, 2, 3, 4]

Python Changes

Some highlights of Python development outside of bug fixes and general API improvements include:

Toward Arrow 1.0.0 and Beyond

We are still discussing the roadmap to 1.0.0 release on the developer mailing list. The focus of the 1.0.0 release will likely be memory format stability and hardening integration tests across the remaining data types implemented in Java and C++. Please join the discussion there.

Plasma In-Memory Object Store

Philipp Moritz and Robert Nishihara are graduate students at UC Berkeley.

Plasma: A High-Performance Shared-Memory Object Store

Motivating Plasma

This blog post presents Plasma, an in-memory object store that is being developed as part of Apache Arrow. Plasma holds immutable objects in shared memory so that they can be accessed efficiently by many clients across process boundaries. In light of the trend toward larger and larger multicore machines, Plasma enables critical performance optimizations in the big data regime.

Plasma was initially developed as part of Ray, and has recently been moved to Apache Arrow in the hopes that it will be broadly useful.

One of the goals of Apache Arrow is to serve as a common data layer enabling zero-copy data exchange between multiple frameworks. A key component of this vision is the use of off-heap memory management (via Plasma) for storing and sharing Arrow-serialized objects between applications.

Expensive serialization and deserialization as well as data copying are a common performance bottleneck in distributed computing. For example, a Python-based execution framework that wishes to distribute computation across multiple Python “worker” processes and then aggregate the results in a single “driver” process may choose to serialize data using the built-in pickle library. Assuming one Python process per core, each worker process would have to copy and deserialize the data, resulting in excessive memory usage. The driver process would then have to deserialize results from each of the workers, resulting in a bottleneck.

Using Plasma plus Arrow, the data being operated on would be placed in the Plasma store once, and all of the workers would read the data without copying or deserializing it (the workers would map the relevant region of memory into their own address spaces). The workers would then put the results of their computation back into the Plasma store, which the driver could then read and aggregate without copying or deserializing the data.

The Plasma API:

Below we illustrate a subset of the API. The C++ API is documented more fully here, and the Python API is documented here.

Object IDs: Each object is associated with a string of bytes.

Creating an object: Objects are stored in Plasma in two stages. First, the object store creates the object by allocating a buffer for it. At this point, the client can write to the buffer and construct the object within the allocated buffer. When the client is done, the client seals the buffer making the object immutable and making it available to other Plasma clients.

# Create an object.
object_id = pyarrow.plasma.ObjectID(20 * b'a')
object_size = 1000
buffer = memoryview(client.create(object_id, object_size))

# Write to the buffer.
for i in range(1000):
    buffer[i] = 0

# Seal the object making it immutable and available to other clients.
client.seal(object_id)

Getting an object: After an object has been sealed, any client who knows the object ID can get the object.

# Get the object from the store. This blocks until the object has been sealed.
object_id = pyarrow.plasma.ObjectID(20 * b'a')
[buff] = client.get([object_id])
buffer = memoryview(buff)

If the object has not been sealed yet, then the call to client.get will block until the object has been sealed.

A sorting application

To illustrate the benefits of Plasma, we demonstrate an 11x speedup (on a machine with 20 physical cores) for sorting a large pandas DataFrame (one billion entries). The baseline is the built-in pandas sort function, which sorts the DataFrame in 477 seconds. To leverage multiple cores, we implement the following standard distributed sorting scheme.

Using this scheme, we can sort the DataFrame (the data starts and ends in the Plasma store), in 44 seconds, giving an 11x speedup over the baseline.

Design

The Plasma store runs as a separate process. It is written in C++ and is designed as a single-threaded event loop based on the Redis event loop library. The plasma client library can be linked into applications. Clients communicate with the Plasma store via messages serialized using Google Flatbuffers.

Call for contributions

Plasma is a work in progress, and the API is currently unstable. Today Plasma is primarily used in Ray as an in-memory cache for Arrow serialized objects. We are looking for a broader set of use cases to help refine Plasma’s API. In addition, we are looking for contributions in a variety of areas including improving performance and building other language bindings. Please let us know if you are interested in getting involved with the project.

Speeding up PySpark with Apache Arrow

Published 26 Jul 2017
By Wes McKinney (BryanCutler)

Bryan Cutler is a software engineer at IBM’s Spark Technology Center STC

Beginning with Apache Spark version 2.3, Apache Arrow will be a supported dependency and begin to offer increased performance with columnar data transfer. If you are a Spark user that prefers to work in Python and Pandas, this is a cause to be excited over! The initial work is limited to collecting a Spark DataFrame with toPandas(), which I will discuss below, however there are many additional improvements that are currently underway.

Optimizing Spark Conversion to Pandas

The previous way of converting a Spark DataFrame to Pandas with DataFrame.toPandas() in PySpark was painfully inefficient. Basically, it worked by first collecting all rows to the Spark driver. Next, each row would get serialized into Python’s pickle format and sent to a Python worker process. This child process unpickles each row into a huge list of tuples. Finally, a Pandas DataFrame is created from the list using pandas.DataFrame.from_records().

This all might seem like standard procedure, but suffers from 2 glaring issues: 1) even using CPickle, Python serialization is a slow process and 2) creating a pandas.DataFrame using from_records must slowly iterate over the list of pure Python data and convert each value to Pandas format. See here for a detailed analysis.

Here is where Arrow really shines to help optimize these steps: 1) Once the data is in Arrow memory format, there is no need to serialize/pickle anymore as Arrow data can be sent directly to the Python process, 2) When the Arrow data is received in Python, then pyarrow can utilize zero-copy methods to create a pandas.DataFrame from entire chunks of data at once instead of processing individual scalar values. Additionally, the conversion to Arrow data can be done on the JVM and pushed back for the Spark executors to perform in parallel, drastically reducing the load on the driver.

As of the merging of SPARK-13534, the use of Arrow when calling toPandas() needs to be enabled by setting the SQLConf “spark.sql.execution.arrow.enable” to “true”. Let’s look at a simple usage example.

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.3.0-SNAPSHOT
      /_/

Using Python version 2.7.13 (default, Dec 20 2016 23:09:15)
SparkSession available as 'spark'.

In [1]: from pyspark.sql.functions import rand
   ...: df = spark.range(1 << 22).toDF("id").withColumn("x", rand())
   ...: df.printSchema()
   ...: 
root
 |-- id: long (nullable = false)
 |-- x: double (nullable = false)


In [2]: %time pdf = df.toPandas()
CPU times: user 17.4 s, sys: 792 ms, total: 18.1 s
Wall time: 20.7 s

In [3]: spark.conf.set("spark.sql.execution.arrow.enable", "true")

In [4]: %time pdf = df.toPandas()
CPU times: user 40 ms, sys: 32 ms, total: 72 ms                                 
Wall time: 737 ms

In [5]: pdf.describe()
Out[5]: 
                 id             x
count  4.194304e+06  4.194304e+06
mean   2.097152e+06  4.998996e-01
std    1.210791e+06  2.887247e-01
min    0.000000e+00  8.291929e-07
25%    1.048576e+06  2.498116e-01
50%    2.097152e+06  4.999210e-01
75%    3.145727e+06  7.498380e-01
max    4.194303e+06  9.999996e-01

This example was run locally on my laptop using Spark defaults so the times shown should not be taken precisely. Even though, it is clear there is a huge performance boost and using Arrow took something that was excruciatingly slow and speeds it up to be barely noticeable.

Notes on Usage

Here are some things to keep in mind before making use of this new feature. At the time of writing this, pyarrow will not be installed automatically with pyspark and needs to be manually installed, see installation instructions. It is planned to add pyarrow as a pyspark dependency so that > pip install pyspark will also install pyarrow.

Currently, the controlling SQLConf is disabled by default. This can be enabled programmatically as in the example above or by adding the line “spark.sql.execution.arrow.enable=true” to SPARK_HOME/conf/spark-defaults.conf.

Also, not all Spark data types are currently supported and limited to primitive types. Expanded type support is in the works and expected to also be in the Spark 2.3 release.

Future Improvements

As mentioned, this was just a first step in using Arrow to make life easier for Spark Python users. A few exciting initiatives in the works are to allow for vectorized UDF evaluation (SPARK-21190, SPARK-21404), and the ability to apply a function on grouped data using a Pandas DataFrame (SPARK-20396). Just as Arrow helped in converting a Spark to Pandas, it can also work in the other direction when creating a Spark DataFrame from an existing Pandas DataFrame (SPARK-20791). Stay tuned for more!

Collaborators

Reaching this first milestone was a group effort from both the Apache Arrow and Spark communities. Thanks to the hard work of Wes McKinney, Li Jin, Holden Karau, Reynold Xin, Wenchen Fan, Shane Knapp and many others that helped push this effort forwards.

Apache Arrow 0.5.0 Release

Published 25 Jul 2017
By Wes McKinney (wesm)

The Apache Arrow team is pleased to announce the 0.5.0 release. It includes 130 resolved JIRAs with some new features, expanded integration testing between implementations, and bug fixes. The Arrow memory format remains stable since the 0.3.x and 0.4.x releases.

See the Install Page to learn how to get the libraries for your platform. The complete changelog is also available.

Expanded Integration Testing

In this release, we added compatibility tests for dictionary-encoded data between Java and C++. This enables the distinct values (the dictionary) in a vector to be transmitted as part of an Arrow schema while the record batches contain integers which correspond to the dictionary.

So we might have:

data (string): ['foo', 'bar', 'foo', 'bar']

In dictionary-encoded form, this could be represented as:

indices (int8): [0, 1, 0, 1]
dictionary (string): ['foo', 'bar']

In upcoming releases, we plan to complete integration testing for the remaining data types (including some more complicated types like unions and decimals) on the road to a 1.0.0 release in the future.

C++ Activity

We completed a number of significant pieces of work in the C++ part of Apache Arrow.

Using jemalloc as default memory allocator

We decided to use jemalloc as the default memory allocator unless it is explicitly disabled. This memory allocator has significant performance advantages in Arrow workloads over the default malloc implementation. We will publish a blog post going into more detail about this and why you might care.

Sharing more C++ code with Apache Parquet

We imported the compression library interfaces and dictionary encoding algorithms from the Apache Parquet C++ library. The Parquet library now depends on this code in Arrow, and we will be able to use it more easily for data compression in Arrow use cases.

As part of incorporating Parquet’s dictionary encoding utilities, we have developed an arrow::DictionaryBuilder class to enable building dictionary-encoded arrays iteratively. This can help save memory and yield better performance when interacting with databases, Parquet files, or other sources which may have columns having many duplicates.

Support for LZ4 and ZSTD compressors

We added LZ4 and ZSTD compression library support. In ARROW-300 and other planned work, we intend to add some compression features for data sent via RPC.

Python Activity

We fixed many bugs which were affecting Parquet and Feather users and fixed several other rough edges with normal Arrow use. We also added some additional Arrow type conversions: structs, lists embedded in pandas objects, and Arrow time types (which deserialize to the datetime.time type).

In upcoming releases we plan to continue to improve Dask support and performance for distributed processing of Apache Parquet files with pyarrow.

The Road Ahead

We have much work ahead of us to build out Arrow integrations in other data systems to improve their processing performance and interoperability with other systems.

We are discussing the roadmap to a future 1.0.0 release on the developer mailing list. Please join the discussion there.

Connecting Relational Databases to the Apache Arrow World with turbodbc

Published 16 Jun 2017
By Michael König (MathMagique)

Michael König is the lead developer of the turbodbc project

The Apache Arrow project set out to become the universal data layer for column-oriented data processing systems without incurring serialization costs or compromising on performance on a more general level. While relational databases still lag behind in Apache Arrow adoption, the Python database module turbodbc brings Apache Arrow support to these databases using a much older, more specialized data exchange layer: ODBC.

ODBC is a database interface that offers developers the option to transfer data either in row-wise or column-wise fashion. Previous Python ODBC modules typically use the row-wise approach, and often trade repeated database roundtrips for simplified buffer handling. This makes them less suited for data-intensive applications, particularly when interfacing with modern columnar analytical databases.

In contrast, turbodbc was designed to leverage columnar data processing from day one. Naturally, this implies using the columnar portion of the ODBC API. Equally important, however, is to find new ways of providing columnar data to Python users that exceed the capabilities of the row-wise API mandated by Python’s PEP 249. Turbodbc has adopted Apache Arrow for this very task with the recently released version 2.0.0:

>>> from turbodbc import connect
>>> connection = connect(dsn="My columnar database")
>>> cursor = connection.cursor()
>>> cursor.execute("SELECT some_integers, some_strings FROM my_table")
>>> cursor.fetchallarrow()
pyarrow.Table
some_integers: int64
some_strings: string

With this new addition, the data flow for a result set of a typical SELECT query is like this:

Data flow from relational databases to Python with turbodbc and the Apache Arrow frontend

In practice, it is possible to achieve the following ideal situation: A 64-bit integer column is stored as one contiguous block of memory in a columnar database. A huge chunk of 64-bit integers is transferred over the network and the ODBC driver directly writes it to a turbodbc buffer of 64-bit integers. The Arrow frontend accumulates these values by copying the entire 64-bit buffer into a free portion of an Arrow table’s 64-bit integer column.

Moving data from the database to an Arrow table and, thus, providing it to the Python user can be as simple as copying memory blocks around, megabytes equivalent to hundred thousands of rows at a time. The absence of serialization and conversion logic renders the process extremely efficient.

Once the data is stored in an Arrow table, Python users can continue to do some actual work. They can convert it into a Pandas DataFrame for data analysis (using a quick table.to_pandas()), pass it on to other data processing systems such as Apache Spark or Apache Impala (incubating), or store it in the Apache Parquet file format. This way, non-Python systems are efficiently connected with relational databases.

In the future, turbodbc’s Arrow support will be extended to use more sophisticated features such as dictionary-encoded string fields. We also plan to pick smaller than 64-bit data types where possible. Last but not least, Arrow support will be extended to cover the reverse direction of data flow, so that Python users can quickly insert Arrow tables into relational databases.

If you would like to learn more about turbodbc, check out the GitHub project and the project documentation. If you want to learn more about how turbodbc implements the nitty-gritty details, check out parts one and two of the “Making of turbodbc” series at Blue Yonder’s technology blog.

Apache Arrow 0.4.1 Release

Published 14 Jun 2017
By Wes McKinney (wesm)

The Apache Arrow team is pleased to announce the 0.4.1 release of the project. This is a bug fix release that addresses a regression with Decimal types in the Java implementation introduced in 0.4.0 (see ARROW-1091). There were a total of 31 resolved JIRAs.

See the Install Page to learn how to get the libraries for your platform.

Python Wheel Installers for Windows

Max Risuhin contributed fixes to enable binary wheel installers to be generated for Python 3.5 and 3.6. Thus, 0.4.1 is the first Arrow release for which PyArrow including bundled Apache Parquet support that can be installed with either conda or pip across the 3 major platforms: Linux, macOS, and Windows. Use one of:

pip install pyarrow
conda install pyarrow -c conda-forge

Turbodbc 2.0.0 with Apache Arrow Support

Turbodbc, a fast C++ ODBC interface with Python bindings, released version 2.0.0 including reading SQL result sets as Arrow record batches. The team used the PyArrow C++ API introduced in version 0.4.0 to construct pyarrow.Table objects inside the turbodbc library. Learn more in their documentation and install with one of:

pip install turbodbc
conda install turbodbc -c conda-forge

Apache Arrow 0.4.0 Release

Published 23 May 2017
By Wes McKinney (wesm)

The Apache Arrow team is pleased to announce the 0.4.0 release of the project. While only 17 days since the release, it includes 77 resolved JIRAs with some important new features and bug fixes.

See the Install Page to learn how to get the libraries for your platform.

Expanded JavaScript Implementation

The TypeScript Arrow implementation has undergone some work since 0.3.0 and can now read a substantial portion of the Arrow streaming binary format. As this implementation develops, we will eventually want to include JS in the integration test suite along with Java and C++ to ensure wire cross-compatibility.

Python Support for Apache Parquet on Windows

With the 1.1.0 C++ release of Apache Parquet, we have enabled the pyarrow.parquet extension on Windows for Python 3.5 and 3.6. This should appear in conda-forge packages and PyPI in the near future. Developers can follow the source build instructions.

Generalizing Arrow Streams

In the 0.2.0 release, we defined the first version of the Arrow streaming binary format for low-cost messaging with columnar data. These streams presume that the message components are written as a continuous byte stream over a socket or file.

We would like to be able to support other other transport protocols, like gRPC, for the message components of Arrow streams. To that end, in C++ we defined an abstract stream reader interface, for which the current contiguous streaming format is one implementation:

class RecordBatchReader {
 public:
  virtual std::shared_ptr<Schema> schema() const = 0;
  virtual Status GetNextRecordBatch(std::shared_ptr<RecordBatch>* batch) = 0;
};

It would also be good to define abstract stream reader and writer interfaces in the Java implementation.

In an upcoming blog post, we will explain in more depth how Arrow streams work, but you can learn more about them by reading the IPC specification.

C++ and Cython API for Python Extensions

As other Python libraries with C or C++ extensions use Apache Arrow, they will need to be able to return Python objects wrapping the underlying C++ objects. In this release, we have implemented a prototype C++ API which enables Python wrapper objects to be constructed from C++ extension code:

#include "arrow/python/pyarrow.h"

if (!arrow::py::import_pyarrow()) {
  // Error
}

std::shared_ptr<arrow::RecordBatch> cpp_batch = GetData(...);
PyObject* py_batch = arrow::py::wrap_batch(cpp_batch);

This API is intended to be usable from Cython code as well:

cimport pyarrow
pyarrow.import_pyarrow()

Python Wheel Installers on macOS

With this release, pip install pyarrow works on macOS (OS X) as well as Linux. We are working on providing binary wheel installers for Windows as well.

Apache Arrow 0.3.0 Release

Published 08 May 2017
By Wes McKinney (wesm)

Translations: 日本語

The Apache Arrow team is pleased to announce the 0.3.0 release of the project. It is the product of an intense 10 weeks of development since the 0.2.0 release from this past February. It includes 306 resolved JIRAs from 23 contributors.

While we have added many new features to the different Arrow implementations, one of the major development focuses in 2017 has been hardening the in-memory format, type metadata, and messaging protocol to provide a stable, production-ready foundation for big data applications. We are excited to be collaborating with the Apache Spark and GeoMesa communities on utilizing Arrow for high performance IO and in-memory data processing.

See the Install Page to learn how to get the libraries for your platform.

We will be publishing more information about the Apache Arrow roadmap as we forge ahead with using Arrow to accelerate big data systems.

We are looking for more contributors from within our existing communities and from other communities (such as Go, R, or Julia) to get involved in Arrow development.

File and Streaming Format Hardening

The 0.2.0 release brought with it the first iterations of the random access and streaming Arrow wire formats. See the IPC specification for implementation details and example blog post with some use cases. These provide low-overhead, zero-copy access to Arrow record batch payloads.

In 0.3.0 we have solidified a number of small details with the binary format and improved our integration and unit testing particularly in the Java, C++, and Python libraries. Using the Google Flatbuffers project has helped with adding new features to our metadata without breaking forward compatibility.

We are not yet ready to make a firm commitment to strong forward compatibility (in case we find something needs to change) in the binary format, but we will make efforts between major releases to not make unnecessary breakages. Contributions to the website and component user and API documentation would also be most welcome.

Dictionary Encoding Support

Emilio Lahr-Vivaz from the GeoMesa project contributed Java support for dictionary-encoded Arrow vectors. We followed up with C++ and Python support (and pandas.Categorical integration). We have not yet implemented full integration tests for dictionaries (for sending this data between C++ and Java), but hope to achieve this in the 0.4.0 Arrow release.

This common data representation technique for categorical data allows multiple record batches to share a common “dictionary”, with the values in the batches being represented as integers referencing the dictionary. This data is called “categorical” or “factor” in statistical languages, while in file formats like Apache Parquet it is strictly used for data compression.

Expanded Date, Time, and Fixed Size Types

A notable omission from the 0.2.0 release was complete and integration-tested support for the gamut of date and time types that occur in the wild. These are needed for Apache Parquet and Apache Spark integration.

We have additionally added experimental support for exact decimals in C++ using Boost.Multiprecision, though we have not yet hardened the Decimal memory format between the Java and C++ implementations.

C++ and Python Support on Windows

We have made many general improvements to development and packaging for general C++ and Python development. 0.3.0 is the first release to bring full C++ and Python support for Windows on Visual Studio (MSVC) 2015 and 2017. In addition to adding Appveyor continuous integration for MSVC, we have also written guides for building from source on Windows: C++ and Python.

For the first time, you can install the Arrow Python library on Windows from conda-forge:

conda install pyarrow -c conda-forge

C (GLib) Bindings, with support for Ruby, Lua, and more

Kouhei Sutou is a new Apache Arrow contributor and has contributed GLib C bindings (to the C++ libraries) for Linux. Using a C middleware framework called GObject Introspection, it is possible to use these bindings seamlessly in Ruby, Lua, Go, and other programming languages. We will probably need to publish some follow up blogs explaining how these bindings work and how to use them.

Apache Spark Integration for PySpark

We have been collaborating with the Apache Spark community on SPARK-13534 to add support for using Arrow to accelerate DataFrame.toPandas in PySpark. We have observed over 40x speedup from the more efficient data serialization.

Using Arrow in PySpark opens the door to many other performance optimizations, particularly around UDF evaluation (e.g. map and filter operations with Python lambda functions).

New Python Feature: Memory Views, Feather, Apache Parquet support

Arrow’s Python library pyarrow is a Cython binding for the libarrow and libarrow_python C++ libraries, which handle inteoperability with NumPy, pandas, and the Python standard library.

At the heart of Arrow’s C++ libraries is the arrow::Buffer object, which is a managed memory view supporting zero-copy reads and slices. Jeff Knupp contributed integration between Arrow buffers and the Python buffer protocol and memoryviews, so now code like this is possible:

In [6]: import pyarrow as pa

In [7]: buf = pa.frombuffer(b'foobarbaz')

In [8]: buf
Out[8]: <pyarrow._io.Buffer at 0x7f6c0a84b538>

In [9]: memoryview(buf)
Out[9]: <memory at 0x7f6c0a8c5e88>

In [10]: buf.to_pybytes()
Out[10]: b'foobarbaz'

We have significantly expanded Apache Parquet support via the C++ Parquet implementation parquet-cpp. This includes support for partitioned datasets on disk or in HDFS. We added initial Arrow-powered Parquet support in the Dask project, and look forward to more collaborations with the Dask developers on distributed processing of pandas data.

With Arrow’s support for pandas maturing, we were able to merge in the Feather format implementation, which is essentially a special case of the Arrow random access format. We’ll be continuing Feather development within the Arrow codebase. For example, Feather can now read and write with Python file objects using Arrow’s Python binding layer.

We also implemented more robust support for pandas-specific data types, like DatetimeTZ and Categorical.

Support for Tensors and beyond in C++ Library

There has been increased interest in using Apache Arrow as a tool for zero-copy shared memory management for machine learning applications. A flagship example is the Ray project from the UC Berkeley RISELab.

Machine learning deals in additional kinds of data structures beyond what the Arrow columnar format supports, like multidimensional arrays aka “tensors”. As such, we implemented the arrow::Tensor C++ type which can utilize the rest of Arrow’s zero-copy shared memory machinery (using arrow::Buffer for managing memory lifetime). In C++ in particular, we will want to provide for additional data structures utilizing common IO and memory management tools.

Start of JavaScript (TypeScript) Implementation

Brian Hulette started developing an Arrow implementation in TypeScript for use in NodeJS and browser-side applications. We are benefitting from Flatbuffers’ first class support for JavaScript.

Improved Website and Developer Documentation

Since 0.2.0 we have implemented a new website stack for publishing documentation and blogs based on Jekyll. Kouhei Sutou developed a Jekyll Jupyter Notebook plugin so that we can use Jupyter to author content for the Arrow website.

On the website, we have now published API documentation for the C, C++, Java, and Python subcomponents. Within these you will find easier-to-follow developer instructions for getting started.

Contributors

Thanks to all who contributed patches to this release.

$ git shortlog -sn apache-arrow-0.2.0..apache-arrow-0.3.0
    119 Wes McKinney
     55 Kouhei Sutou
     18 Uwe L. Korn
     17 Julien Le Dem
      9 Phillip Cloud
      6 Bryan Cutler
      5 Philipp Moritz
      5 Emilio Lahr-Vivaz
      4 Max Risuhin
      4 Johan Mabille
      4 Jeff Knupp
      3 Steven Phillips
      3 Miki Tebeka
      2 Leif Walsh
      2 Jeff Reback
      2 Brian Hulette
      1 Tsuyoshi Ozawa
      1 rvernica
      1 Nong Li
      1 Julien Lafaye
      1 Itai Incze
      1 Holden Karau
      1 Deepak Majeti