Speeding up PySpark with Apache Arrow
Published
26 Jul 2017
By
BryanCutler
Bryan Cutler is a software engineer at IBM’s Spark Technology Center STC
Beginning with Apache Spark version 2.3, Apache Arrow will be a supported
dependency and begin to offer increased performance with columnar data transfer.
If you are a Spark user that prefers to work in Python and Pandas, this is a cause
to be excited over! The initial work is limited to collecting a Spark DataFrame
with toPandas()
, which I will discuss below, however there are many additional
improvements that are currently underway.
Optimizing Spark Conversion to Pandas
The previous way of converting a Spark DataFrame to Pandas with DataFrame.toPandas()
in PySpark was painfully inefficient. Basically, it worked by first collecting all
rows to the Spark driver. Next, each row would get serialized into Python’s pickle
format and sent to a Python worker process. This child process unpickles each row into
a huge list of tuples. Finally, a Pandas DataFrame is created from the list using
pandas.DataFrame.from_records()
.
This all might seem like standard procedure, but suffers from 2 glaring issues: 1)
even using CPickle, Python serialization is a slow process and 2) creating
a pandas.DataFrame
using from_records
must slowly iterate over the list of pure
Python data and convert each value to Pandas format. See here for a detailed
analysis.
Here is where Arrow really shines to help optimize these steps: 1) Once the data is
in Arrow memory format, there is no need to serialize/pickle anymore as Arrow data can
be sent directly to the Python process, 2) When the Arrow data is received in Python,
then pyarrow can utilize zero-copy methods to create a pandas.DataFrame
from entire
chunks of data at once instead of processing individual scalar values. Additionally,
the conversion to Arrow data can be done on the JVM and pushed back for the Spark
executors to perform in parallel, drastically reducing the load on the driver.
As of the merging of SPARK-13534, the use of Arrow when calling toPandas()
needs to be enabled by setting the SQLConf “spark.sql.execution.arrow.enabled” to
“true”. Let’s look at a simple usage example.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.3.0-SNAPSHOT
/_/
Using Python version 2.7.13 (default, Dec 20 2016 23:09:15)
SparkSession available as 'spark'.
In [1]: from pyspark.sql.functions import rand
...: df = spark.range(1 << 22).toDF("id").withColumn("x", rand())
...: df.printSchema()
...:
root
|-- id: long (nullable = false)
|-- x: double (nullable = false)
In [2]: %time pdf = df.toPandas()
CPU times: user 17.4 s, sys: 792 ms, total: 18.1 s
Wall time: 20.7 s
In [3]: spark.conf.set("spark.sql.execution.arrow.enabled", "true")
In [4]: %time pdf = df.toPandas()
CPU times: user 40 ms, sys: 32 ms, total: 72 ms
Wall time: 737 ms
In [5]: pdf.describe()
Out[5]:
id x
count 4.194304e+06 4.194304e+06
mean 2.097152e+06 4.998996e-01
std 1.210791e+06 2.887247e-01
min 0.000000e+00 8.291929e-07
25% 1.048576e+06 2.498116e-01
50% 2.097152e+06 4.999210e-01
75% 3.145727e+06 7.498380e-01
max 4.194303e+06 9.999996e-01
This example was run locally on my laptop using Spark defaults so the times shown should not be taken precisely. Even though, it is clear there is a huge performance boost and using Arrow took something that was excruciatingly slow and speeds it up to be barely noticeable.
Notes on Usage
Here are some things to keep in mind before making use of this new feature. At
the time of writing this, pyarrow will not be installed automatically with
pyspark and needs to be manually installed, see installation instructions.
It is planned to add pyarrow as a pyspark dependency so that
> pip install pyspark
will also install pyarrow.
Currently, the controlling SQLConf is disabled by default. This can be enabled
programmatically as in the example above or by adding the line
“spark.sql.execution.arrow.enabled=true” to SPARK_HOME/conf/spark-defaults.conf
.
Also, not all Spark data types are currently supported and limited to primitive types. Expanded type support is in the works and expected to also be in the Spark 2.3 release.
Future Improvements
As mentioned, this was just a first step in using Arrow to make life easier for Spark Python users. A few exciting initiatives in the works are to allow for vectorized UDF evaluation (SPARK-21190, SPARK-21404), and the ability to apply a function on grouped data using a Pandas DataFrame (SPARK-20396). Just as Arrow helped in converting a Spark to Pandas, it can also work in the other direction when creating a Spark DataFrame from an existing Pandas DataFrame (SPARK-20791). Stay tuned for more!
Collaborators
Reaching this first milestone was a group effort from both the Apache Arrow and Spark communities. Thanks to the hard work of Wes McKinney, Li Jin, Holden Karau, Reynold Xin, Wenchen Fan, Shane Knapp and many others that helped push this effort forwards.