Apache Arrow R 6.0.0 Release


Published 08 Nov 2021
By Nic Crane, Jonathan Keane, Neal Richardson

We are excited to announce the recent release of version 6.0.0 of the Arrow R package on CRAN. While we usually don’t write a dedicated release blog post for the R package, this one is special. There are a number of major new features in this version, some of which we’ve been building up to for several years.

More dplyr support

In version 0.16.0 (February 2020), we released the first version of the Dataset feature, which allowed you to query multi-file datasets using dplyr::select() and filter(). These tools allowed you to find a slice of data in a large dataset that may not fit into memory and pull it into R for further analysis. In version 4.0.0 earlier this year, we added support for mutate() and a number of other dplyr verbs, and all year we’ve been adding hundreds of functions you can use to transform and filter data in Datasets. However, to aggregate, you’d still need to pull the data into R.

Grouped aggregation

With arrow 6.0.0, you can now summarise() on Arrow data, both with or without group_by(). These are supported both with in-memory Arrow tables as well as across partitioned datasets. Most common aggregation functions are supported: n(), n_distinct(), min(), max(), sum(), mean(), var(), sd(), any(), and all(). median() and quantile() with one probability are also supported and currently return approximate results using the t-digest algorithm.

As usual, Arrow will read and process data in chunks and in parallel when possible to produce results much faster than one could by loading it all into memory then processing. This allows for operations that wouldn’t fit into memory on a single machine. For example, using the 1.5-billion row NYC Taxi dataset we use for examples in the package vignette, we can aggregate over the whole dataset even on a laptop:

ds <- open_dataset("nyc-taxi", partitioning = c("year", "month"))
ds %>%
  filter(
    passenger_count > 0,
    passenger_count < 6,
    grepl("csh", payment_type, ignore.case = TRUE)
  ) %>%
  group_by(passenger_count) %>%
  summarize(
    avg = mean(total_amount, na.rm = TRUE),
    count = n()
  ) %>%
  arrange(desc(count)) %>%
  collect()

#> # A tibble: 5 × 3
#>   passenger_count   avg     count
#>             <int> <dbl>     <int>
#> 1               1  11.1 257738064
#> 2               2  12.1  58824482
#> 3               5  11.4  26056438
#> 4               3  12.0  18852606
#> 5               4  12.3  10081632

Joins

In addition to aggregation, Arrow also supports all of dplyr’s mutating joins (inner, left, right, and full) and filtering joins (semi and anti).

Suppose I want to get a table of all the flights from JFK to Las Vegas Airport on 9th October 2013, with the full name of the airline included.

arrow_table(nycflights13::flights) %>%
  filter(
    year == 2013,
    month == 10,
    day == 9,
    origin == "JFK",
    dest == "LAS"
    ) %>%
  select(dep_time, arr_time, carrier) %>%
  left_join(
    arrow_table(nycflights13::airlines)
   ) %>%
  collect()

#> # A tibble: 12 × 4
#>    dep_time arr_time carrier name
#>       <int>    <int> <chr>   <chr>
#>  1      637      853 B6      JetBlue Airways
#>  2      648      912 AA      American Airlines Inc.
#>  3      812     1029 DL      Delta Air Lines Inc.
#>  4      945     1206 VX      Virgin America
#>  5      955     1219 B6      JetBlue Airways
#>  6     1018     1231 DL      Delta Air Lines Inc.
#>  7     1120     1338 B6      JetBlue Airways
#>  8     1451     1705 DL      Delta Air Lines Inc.
#>  9     1656     1915 AA      American Airlines Inc.
#> 10     1755     2001 DL      Delta Air Lines Inc.
#> 11     1827     2049 B6      JetBlue Airways
#> 12     1917     2126 DL      Delta Air Lines Inc.

In this example, we’re working on an in-memory table, so you wouldn’t need arrow to do this–but the same code would work on a larger-than-memory dataset backed by thousands of Parquet files.

Under the hood

To support these features, we’ve made some internal changes to how queries are built up and–importantly–when they are evaluated. As a result, there are some changes in behavior compared to past versions of arrow.

First, calls to summarise(), head(), and tail() no longer eagerly evaluate: this means you need to call either compute() (to evaluate it and produce an Arrow Table) or collect() (to evaluate and pull the Table into an R data.frame) to see the results.

Second, the order of rows in a dataset query is no longer determinisitic due to the way the parallelization of work happens in the C++ library. This means that you can’t assume that the results of a query will be in the same order as the rows of data in the files on disk. If you do need a stable sort order, call arrange() to specify ordering.

While these changes are a break from past arrow behavior, they are consistent with many dbplyr backends and are needed to allow queries to scale beyond data-frame workflows that can fit into memory.

Integration with DuckDB

The Arrow engine is not the only new way to query Arrow Datasets in this release. If you have the duckdb package installed, you can hand off an Arrow Dataset or query object to DuckDB for further querying using the to_duckdb() function. This allows you to use duckdb’s dbplyr methods, as well as its SQL interface, to aggregate data. DuckDB supports filter pushdown, so you can take advantage of Arrow Datasets and Arrow-based optimizations even within a DuckDB SQL query with a where clause. Filtering and column projection specified before the to_duckdb() call in a pipeline is evaluated in Arrow; this can be helpful in some circumstances like complicated dbplyr pipelines. You can also hand off DuckDB data (or the result of a query) to arrow with the to_arrow() call.

In the example below, we are looking at flights between NYC and Chicago, and want to avoid the worst-of-the-worst delays. To do this, we can use percent_rank(); however that requires a window function which isn’t yet available in Arrow, so let’s try sending the data to DuckDB to do that, then pull it back into Arrow:

library(arrow, warn.conflicts = FALSE)
library(dplyr, warn.conflicts = FALSE)

flights_filtered <- arrow_table(nycflights13::flights) %>%
  select(carrier, origin, dest, arr_delay) %>%
  # arriving early doesn't matter, so call negative delays 0
  mutate(arr_delay = pmax(arr_delay, 0)) %>%
  to_duckdb() %>%
  # for each carrier-origin-dest, take the worst 5% of delays
  group_by(carrier, origin, dest) %>%
  mutate(arr_delay_rank = percent_rank(arr_delay)) %>%
  filter(arr_delay_rank > 0.95)

head(flights_filtered)
#> # Source:   lazy query [?? x 5]
#> # Database: duckdb_connection
#> # Groups:   carrier, origin, dest
#>   carrier origin dest  arr_delay arr_delay_rank
#>   <chr>   <chr>  <chr>     <dbl>          <dbl>
#> 1 9E      JFK    RIC         119          0.952
#> 2 9E      JFK    RIC         125          0.956
#> 3 9E      JFK    RIC         137          0.960
#> 4 9E      JFK    RIC         137          0.960
#> 5 9E      JFK    RIC         158          0.968
#> 6 9E      JFK    RIC         163          0.972

Now we have all of the flights filtered to those that are the worst-of-the-worst, and stored as a dbplyr lazy tbl with our DuckDB connection. This is an example of using Arrow -> DuckDB.

But we can do more: we can then bring that data back into Arrow just as easily. For the rest of our analysis, we pick up where we left off with the tbl referring to the DuckDB query:

# pull data back into arrow to complete analysis
flights_filtered %>%
  to_arrow() %>%
  # now summarise to get mean/min
  group_by(carrier, origin, dest) %>%
  summarise(
    arr_delay_mean = mean(arr_delay),
    arr_delay_min = min(arr_delay),
    num_flights = n()
  ) %>%
  filter(dest %in% c("ORD", "MDW")) %>%
  arrange(desc(arr_delay_mean)) %>%
  collect()
#> # A tibble: 10 × 6
#> # Groups:   carrier, origin [10]
#>    carrier origin dest  arr_delay_mean arr_delay_min num_flights
#>    <chr>   <chr>  <chr>          <dbl>         <dbl>       <int>
#>  1 MQ      EWR    ORD             190.           103         113
#>  2 9E      JFK    ORD             185.           134          52
#>  3 UA      LGA    ORD             179.           101         157
#>  4 WN      LGA    MDW             178.           107         103
#>  5 AA      JFK    ORD             178.           133          19
#>  6 B6      JFK    ORD             174.           129          46
#>  7 WN      EWR    MDW             167.           107         103
#>  8 UA      EWR    ORD             149.            87         189
#>  9 AA      LGA    ORD             135.            78         280
#> 10 EV      EWR    ORD              35             35           1

And just like that, we’ve passed data back and forth between Arrow and DuckDB without having to write a single file to disk!

Expanded use of ALTREP

We are continuing our use of R’s ALTREP where possible. In 5.0.0 there were a limited set of circumstances that took advantage of ALTREP, but in 6.0.0 we have expanded types to include strings, as well as vectors with NAs.

library(microbenchmark)
library(arrow)

tbl <-
  arrow_table(data.frame(
    x = rnorm(10000000),
    y = sample(c(letters, NA), 10000000, replace = TRUE)
  ))

with_altrep <- function(data){
  options(arrow.use_altrep = TRUE)
  as.data.frame(data)
}

without_altrep <- function(data){
  options(arrow.use_altrep = FALSE)
  as.data.frame(data)
}

microbenchmark(
  without_altrep(tbl),
  with_altrep(tbl)
)

#> Unit: milliseconds
#>                 expr      min        lq      mean    median        uq      max neval
#>  without_altrep(tbl) 191.0788 213.82235 249.65076 225.52120 244.26977 512.1652   100
#>     with_altrep(tbl)  48.7152  50.97269  65.56832  52.93795  55.24505 338.4602   100

Airgapped installation on Linux

With every release, we continue to improve the installation experience on Linux. Unlike macOS and Windows, CRAN does not host binary packages for Linux, and unless you’re using a service like RStudio Package Manger that hosts binaries, you have to build arrow from source. Because Arrow involves a large C++ project, this can be slow and sensitive to differences in build environments. To ensure a reliable installation experience, we work hard to test on a wide range of platforms and configurations and eagerly seek to simplify the process so that install.packages("arrow") just works and you don’t have to think about it.

A big improvement in 6.0.0 is that arrow can now install in a fully offline mode. The R package now includes the C++ source, so it does not need to be downloaded at build time. This does not include optional dependencies like compression libraries, the AWS SDK for accessing data in S3, and more. For folks who need to install Arrow on an airgapped server with all of those features, we have included a helper function to download and assemble a “fat” pacakge that contains everything that would be downloaded lazily at build time. The function create_package_with_all_dependencies() can be run from a computer that does have access to the internet, and creates a fat-source package which can then be transferred and installed on a server without connectivity. This helper is also available on GitHub without installing the arrow package. For more installation see the docs.

Another installation change is that we’ve changed the source build to fail cleanly if the C++ library is not found or cannot be built. Previously, if the C++ library failed to build, you would get a successful R package installation, but the package wouldn’t do anything useful, it would just tell you to reinstall. This was helpful back in the early days of the package when we weren’t confident it would build everywhere that CRAN checked, but we now have much more experience (and extensive testing). In recent months this failure mode caused more confusion than it was worth, and it led many people to think that after you install arrow, you always have to install_arrow() again.

Thanks

This is a significant milestone for Arrow, and the R package specifically, and there is much gratitude to go around. In the 6.0.0 release, there were 77 individuals who contributed to Arrow, many of whom did the heavy lifting in the C++ library to make the new dataset query features a reality. Specifically in the R package, we wanted to acknowledge Phillip Cloud, Dewey Dunnington, Dragoș Moldovan-Grünfeld, Matt Peterson, and Percy Camilo Triveño Aucahuasi for their their first contributions to the R package. And a special thanks goes to Karl Dunkle Werner for the hard work on the offline package build!

We also want to thank you in advance for your help. For this release of the Arrow query engine, we’ve focused our effort on getting the core functionality implemented. (In fact, this first release is something of an R-exclusive: bindings for these features haven’t yet been added to pyarrow, the Python Arrow library!) By focusing on the essentials, it means that there are a number of performance optimizations we plan to do but didn’t have time for in this release–and there are surely more issues to improve that we don’t yet know. We are eager for your feedback: please let us know of any issues you encounter so that we can improve these for our next release.