Apache Arrow lets you work efficiently with large, multi-file datasets. The arrow
R package provides a dplyr
interface to Arrow Datasets, as well as other tools for interactive exploration of Arrow data.
This vignette introduces Datasets and shows how to use dplyr
to analyze them. It describes both what is possible to do with Arrow now and what is on the immediate development roadmap.
The New York City taxi trip record data is widely used in big data exercises and competitions. For demonstration purposes, we have hosted a Parquet-formatted version of about 10 years of the trip data in a public AWS S3 bucket.
The total file size is around 37 gigabytes, even in the efficient Parquet file format. That's bigger than memory on most people's computers, so we can't just read it all in and stack it into a single data frame.
In Windows and macOS binary packages, S3 support is included. On Linux when installing from source, S3 support is not enabled by default, and it has additional system requirements. See vignette("install", package = "arrow")
for details. To see if your arrow
installation has S3 support, run
arrow::arrow_with_s3()
## [1] FALSE
Even with S3 support enabled network, speed will be a bottleneck unless your machine is located in the same AWS region as the data. So, for this vignette, we assume that the NYC taxi dataset has been downloaded locally in a "nyc-taxi" directory.
If your arrow
build has S3 support, you can sync the data locally with:
arrow::copy_files("s3://ursa-labs-taxi-data", "nyc-taxi")
If your arrow
build doesn't have S3 support, you can download the files with some additional code:
bucket <- "https://ursa-labs-taxi-data.s3.us-east-2.amazonaws.com" dir.create("nyc-taxi") for (year in 2009:2019) { dir.create(file.path("nyc-taxi", year)) if (year == 2019) { # We only have through June 2019 there months <- 1:6 } else { months <- 1:12 } for (month in months) { if (month < 10) { month <- paste0("0", month) } dir.create(file.path("nyc-taxi", year, month)) download.file( paste(bucket, year, month, "data.parquet", sep = "/"), file.path("nyc-taxi", year, month, "data.parquet"), mode = 'wb' ) } }
Note that these download steps in the vignette are not executed: if you want to run with live data, you'll have to do it yourself separately. Given the size, if you're running this locally and don't have a fast connection, feel free to grab only a year or two of data.
If you don't have the taxi data downloaded, the vignette will still run and will yield previously cached output for reference. To be explicit about which version is running, let's check whether we're running with live data:
dir.exists("nyc-taxi")
## [1] FALSE
Because dplyr
is not necessary for many Arrow workflows, it is an optional (Suggests
) dependency. So, to work with Datasets, we need to load both arrow
and dplyr
.
The first step is to create our Dataset object, pointing at the directory of data.
ds <- open_dataset("nyc-taxi", partitioning = c("year", "month"))
The default file format for open_dataset()
is Parquet; if we had a directory of Arrow format files, we could include format = "arrow"
in the call. Other supported formats include: "feather" (an alias for "arrow", as Feather v2 is the Arrow file format), "csv", "tsv" (for tab-delimited), and "text" for generic text-delimited files. For text files, you can pass any parsing options ("delim", "quote", etc.) to open_dataset()
that you would otherwise pass to read_csv_arrow()
.
The partitioning
argument lets us specify how the file paths provide information about how the dataset is chunked into different files. Our files in this example have file paths like
2009/01/data.parquet
2009/02/data.parquet
...
By providing a character vector to partitioning
, we're saying that the first path segment gives the value for "year" and the second segment is "month". Every row in 2009/01/data.parquet
has a value of 2009 for "year" and 1 for "month", even though those columns may not actually be present in the file.
Indeed, when we look at the dataset, we see that in addition to the columns present in every file, there are also columns "year" and "month".
ds
##
## ## FileSystemDataset with 125 Parquet files
## ## vendor_id: string
## ## pickup_at: timestamp[us]
## ## dropoff_at: timestamp[us]
## ## passenger_count: int8
## ## trip_distance: float
## ## pickup_longitude: float
## ## pickup_latitude: float
## ## rate_code_id: string
## ## store_and_fwd_flag: string
## ## dropoff_longitude: float
## ## dropoff_latitude: float
## ## payment_type: string
## ## fare_amount: float
## ## extra: float
## ## mta_tax: float
## ## tip_amount: float
## ## tolls_amount: float
## ## total_amount: float
## ## improvement_surcharge: float
## ## pickup_location_id: int32
## ## dropoff_location_id: int32
## ## congestion_surcharge: float
## ## year: int32
## ## month: int32
##
## See $metadata for additional Schema metadata
The other form of partitioning currently supported is Hive-style, in which the partition variable names are included in the path segments. If we had saved our files in paths like
year=2009/month=01/data.parquet
year=2009/month=02/data.parquet
...
we would not have had to provide the names in partitioning
: we could have just called ds <- open_dataset("nyc-taxi")
and the partitions would have been detected automatically.
Up to this point, we haven't loaded any data: we have walked directories to find files, we've parsed file paths to identify partitions, and we've read the headers of the Parquet files to inspect their schemas so that we can make sure they all line up.
In the current release, arrow
supports methods for selecting a window of data: select()
, rename()
, and filter()
. Aggregation is not yet supported, nor is deriving or projecting new columns, so before you call summarize()
or mutate()
, you'll need to collect()
the data first, which pulls your selected window of data into an in-memory R data frame. While we could have made those methods collect()
the data they needed automatically and invisibly to the end user, we thought it best to make it explicit when you're pulling data into memory so that you can construct your queries most efficiently and not be surprised when some query consumes way more resources than expected.
Here's an example. Suppose I was curious about tipping behavior among the longest taxi rides. Let's find the median tip percentage for rides with fares greater than $100 in 2015, broken down by the number of passengers:
system.time(ds %>% filter(total_amount > 100, year == 2015) %>% select(tip_amount, total_amount, passenger_count) %>% group_by(passenger_count) %>% collect() %>% summarize( tip_pct = median(100 * tip_amount / total_amount), n = n() ) %>% print())
##
## ## # A tibble: 10 x 3
## ## passenger_count tip_pct n
## ## <int> <dbl> <int>
## ## 1 0 9.84 380
## ## 2 1 16.7 143087
## ## 3 2 16.6 34418
## ## 4 3 14.4 8922
## ## 5 4 11.4 4771
## ## 6 5 16.7 5806
## ## 7 6 16.7 3338
## ## 8 7 16.7 11
## ## 9 8 16.7 32
## ## 10 9 16.7 42
## ##
## ## user system elapsed
## ## 4.436 1.012 1.402
We just selected a window out of a dataset with around 2 billion rows and aggregated on it in under 2 seconds on my laptop. How does this work?
First, select()
/rename()
, filter()
, and group_by()
record their actions but don't evaluate on the data until you run collect()
.
ds %>% filter(total_amount > 100, year == 2015) %>% select(tip_amount, total_amount, passenger_count) %>% group_by(passenger_count)
##
## ## FileSystemDataset (query)
## ## tip_amount: float
## ## total_amount: float
## ## passenger_count: int8
## ##
## ## * Filter: ((total_amount > 100:double) and (year == 2015:double))
## ## * Grouped by passenger_count
## ## See $.data for the source Arrow object
This returns instantly and shows the window selection you've made, without loading data from the files. Because the evaluation of these queries is deferred, you can build up a query that selects down to a small window without generating intermediate datasets that would potentially be large.
Second, all work is pushed down to the individual data files, and depending on the file format, chunks of data within the files. As a result, we can select a window of data from a much larger dataset by collecting the smaller slices from each file--we don't have to load the whole dataset in memory in order to slice from it.
Third, because of partitioning, we can ignore some files entirely. In this example, by filtering year == 2015
, all files corresponding to other years are immediately excluded: we don't have to load them in order to find that no rows match the filter. Relatedly, since Parquet files contain row groups with statistics on the data within, there may be entire chunks of data we can avoid scanning because they have no rows where total_amount > 100
.
There are a few ways you can control the Dataset creation to adapt to special use cases. For one, you can specify a schema
argument to declare the columns and their data types. This is useful if you have data files that have different storage schema (for example, a column could be int32
in one and int8
in another) and you want to ensure that the resulting Dataset has a specific type. To be clear, it's not necessary to specify a schema, even in this example of mixed integer types, because the Dataset constructor will reconcile differences like these. The schema specification just lets you declare what you want the result to be.
Similarly, you can provide a Schema in the partitioning
argument of open_dataset()
in order to declare the types of the virtual columns that define the partitions. This would be useful, in our taxi dataset example, if you wanted to keep "month" as a string instead of an integer for some reason.
Another feature of Datasets is that they can be composed of multiple data sources. That is, you may have a directory of partitioned Parquet files in one location, and in another directory, files that haven't been partitioned. Or, you could point to an S3 bucket of Parquet data and a directory of CSVs on the local file system and query them together as a single dataset. To create a multi-source dataset, provide a list of datasets to open_dataset()
instead of a file path, or simply concatenate them like big_dataset <- c(ds1, ds2)
.
As you can see, querying a large dataset can be made quite fast by storage in an efficient binary columnar format like Parquet or Feather and partitioning based on columns commonly used for filtering. However, we don't always get our data delivered to us that way. Sometimes we start with one giant CSV. Our first step in analyzing data is cleaning is up and reshaping it into a more usable form.
The write_dataset()
function allows you to take a Dataset or other tabular data object---an Arrow Table
or RecordBatch
, or an R data.frame
---and write it to a different file format, partitioned into multiple files.
Assume we have a version of the NYC Taxi data as CSV:
ds <- open_dataset("nyc-taxi/csv/", format = "csv")
We can write it to a new location and translate the files to the Feather format by calling write_dataset()
on it:
write_dataset(ds, "nyc-taxi/feather", format = "feather")
Next, let's imagine that the "payment_type" column is something we often filter on, so we want to partition the data by that variable. By doing so we ensure that a filter like payment_type == 3
will touch only a subset of files where payment_type is always 3.
One natural way to express the columns you want to partition on is to use the group_by()
method:
ds %>% group_by(payment_type) %>% write_dataset("nyc-taxi/feather", format = "feather")
This will write files to a directory tree that looks like this:
system("tree nyc-taxi/feather") # feather # <U+251C><U+2500><U+2500> payment_type=1NA# <U+2502> <U+2514><U+2500><U+2500> part-5.featherNA# <U+251C><U+2500><U+2500> payment_type=2NA# <U+2502> <U+2514><U+2500><U+2500> part-0.featherNA# ... # <U+2514><U+2500><U+2500> payment_type=5NA# <U+2514><U+2500><U+2500> part-2.featherNA# # 5 directories, 25 files
Note that the directory names are payment_type=1
and similar: this is the Hive-style partitioning described above. This means that when we call open_dataset()
on this directory, we don't have to declare what the partitions are because they can be read from the file paths. (To instead write bare values for partition segments, i.e. 1
rather than payment_type=1
, call write_dataset()
with hive_style = FALSE
.)
Perhaps, though, payment_type == 3
is the only data we ever care about, and we just want to drop the rest and have a smaller working set. For this, we can filter()
them out when writing:
ds %>% filter(payment_type == 3) %>% write_dataset("nyc-taxi/feather", format = "feather")
The other thing we can do when writing datasets is select a subset of and/or reorder columns. Suppose we never care about vendor_id
, and being a string column, it can take up a lot of space when we read it in, so let's drop it:
ds %>% group_by(payment_type) %>% select(-vendor_id) %>% write_dataset("nyc-taxi/feather", format = "feather")
Note that while you can select a subset of columns, you cannot currently rename columns when writing.