Tabular Datasets

Warning

The arrow::dataset namespace is experimental, and a stable API is not yet guaranteed.

The Arrow Datasets library provides functionality to efficiently work with tabular, potentially larger than memory, and multi-file datasets. This includes:

  • A unified interface that supports different sources and file formats (currently, Parquet, ORC, Feather / Arrow IPC, and CSV files) and different file systems (local, cloud).

  • Discovery of sources (crawling directories, handling partitioned datasets with various partitioning schemes, basic schema normalization, …)

  • Optimized reading with predicate pushdown (filtering rows), projection (selecting and deriving columns), and optionally parallel reading.

The goal is to expand support to other file formats and data sources (e.g. database connections) in the future.

Reading Datasets

For the examples below, let’s create a small dataset consisting of a directory with two parquet files:

52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
// Generate some data for the rest of this example.
std::shared_ptr<arrow::Table> CreateTable() {
  auto schema =
      arrow::schema({arrow::field("a", arrow::int64()), arrow::field("b", arrow::int64()),
                     arrow::field("c", arrow::int64())});
  std::shared_ptr<arrow::Array> array_a;
  std::shared_ptr<arrow::Array> array_b;
  std::shared_ptr<arrow::Array> array_c;
  arrow::NumericBuilder<arrow::Int64Type> builder;
  ABORT_ON_FAILURE(builder.AppendValues({0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
  ABORT_ON_FAILURE(builder.Finish(&array_a));
  builder.Reset();
  ABORT_ON_FAILURE(builder.AppendValues({9, 8, 7, 6, 5, 4, 3, 2, 1, 0}));
  ABORT_ON_FAILURE(builder.Finish(&array_b));
  builder.Reset();
  ABORT_ON_FAILURE(builder.AppendValues({1, 2, 1, 2, 1, 2, 1, 2, 1, 2}));
  ABORT_ON_FAILURE(builder.Finish(&array_c));
  return arrow::Table::Make(schema, {array_a, array_b, array_c});
}

// Set up a dataset by writing two Parquet files.
std::string CreateExampleParquetDataset(const std::shared_ptr<fs::FileSystem>& filesystem,
                                        const std::string& root_path) {
  auto base_path = root_path + "/parquet_dataset";
  ABORT_ON_FAILURE(filesystem->CreateDir(base_path));
  // Create an Arrow Table
  auto table = CreateTable();
  // Write it into two Parquet files
  auto output = filesystem->OpenOutputStream(base_path + "/data1.parquet").ValueOrDie();
  ABORT_ON_FAILURE(parquet::arrow::WriteTable(
      *table->Slice(0, 5), arrow::default_memory_pool(), output, /*chunk_size=*/2048));
  output = filesystem->OpenOutputStream(base_path + "/data2.parquet").ValueOrDie();
  ABORT_ON_FAILURE(parquet::arrow::WriteTable(
      *table->Slice(5), arrow::default_memory_pool(), output, /*chunk_size=*/2048));
  return base_path;
}

(See the full example at bottom: Full Example.)

Dataset discovery

A arrow::dataset::Dataset object can be created using the various arrow::dataset::DatasetFactory objects. Here, we’ll use the arrow::dataset::FileSystemDatasetFactory, which can create a dataset given a base directory path:

159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
// Read the whole dataset with the given format, without partitioning.
std::shared_ptr<arrow::Table> ScanWholeDataset(
    const std::shared_ptr<fs::FileSystem>& filesystem,
    const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
  // Create a dataset by scanning the filesystem for files
  fs::FileSelector selector;
  selector.base_dir = base_dir;
  auto factory = ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
                                                    ds::FileSystemFactoryOptions())
                     .ValueOrDie();
  auto dataset = factory->Finish().ValueOrDie();
  // Print out the fragments
  for (const auto& fragment : dataset->GetFragments().ValueOrDie()) {
    std::cout << "Found fragment: " << (*fragment)->ToString() << std::endl;
  }
  // Read the entire dataset as a Table
  auto scan_builder = dataset->NewScan().ValueOrDie();
  auto scanner = scan_builder->Finish().ValueOrDie();
  return scanner->ToTable().ValueOrDie();
}

We’re also passing the filesystem to use and the file format to use for reading. This lets us choose between (for example) reading local files or files in Amazon S3, or between Parquet and CSV.

In addition to searching a base directory, we can list file paths manually.

Creating a arrow::dataset::Dataset does not begin reading the data itself. It only crawls the directory to find all the files (if needed), which can be retrieved with arrow::dataset::FileSystemDataset::files():

// Print out the files crawled (only for FileSystemDataset)
for (const auto& filename : dataset->files()) {
  std::cout << filename << std::endl;
}

…and infers the dataset’s schema (by default from the first file):

std::cout << dataset->schema()->ToString() << std::endl;

Using the arrow::dataset::Dataset::NewScan() method, we can build a arrow::dataset::Scanner and read the dataset (or a portion of it) into a arrow::Table with the arrow::dataset::Scanner::ToTable() method:

159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
// Read the whole dataset with the given format, without partitioning.
std::shared_ptr<arrow::Table> ScanWholeDataset(
    const std::shared_ptr<fs::FileSystem>& filesystem,
    const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
  // Create a dataset by scanning the filesystem for files
  fs::FileSelector selector;
  selector.base_dir = base_dir;
  auto factory = ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
                                                    ds::FileSystemFactoryOptions())
                     .ValueOrDie();
  auto dataset = factory->Finish().ValueOrDie();
  // Print out the fragments
  for (const auto& fragment : dataset->GetFragments().ValueOrDie()) {
    std::cout << "Found fragment: " << (*fragment)->ToString() << std::endl;
  }
  // Read the entire dataset as a Table
  auto scan_builder = dataset->NewScan().ValueOrDie();
  auto scanner = scan_builder->Finish().ValueOrDie();
  return scanner->ToTable().ValueOrDie();
}

Note

Depending on the size of your dataset, this can require a lot of memory; see Filtering data below on filtering/projecting.

Reading different file formats

The above examples use Parquet files on local disk, but the Dataset API provides a consistent interface across multiple file formats and filesystems. (See Reading from cloud storage for more information on the latter.) Currently, Parquet, ORC, Feather / Arrow IPC, and CSV file formats are supported; more formats are planned in the future.

If we save the table as Feather files instead of Parquet files:

 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
// Set up a dataset by writing two Feather files.
std::string CreateExampleFeatherDataset(const std::shared_ptr<fs::FileSystem>& filesystem,
                                        const std::string& root_path) {
  auto base_path = root_path + "/feather_dataset";
  ABORT_ON_FAILURE(filesystem->CreateDir(base_path));
  // Create an Arrow Table
  auto table = CreateTable();
  // Write it into two Feather files
  auto output = filesystem->OpenOutputStream(base_path + "/data1.feather").ValueOrDie();
  auto writer = arrow::ipc::MakeFileWriter(output.get(), table->schema()).ValueOrDie();
  ABORT_ON_FAILURE(writer->WriteTable(*table->Slice(0, 5)));
  ABORT_ON_FAILURE(writer->Close());
  output = filesystem->OpenOutputStream(base_path + "/data2.feather").ValueOrDie();
  writer = arrow::ipc::MakeFileWriter(output.get(), table->schema()).ValueOrDie();
  ABORT_ON_FAILURE(writer->WriteTable(*table->Slice(5)));
  ABORT_ON_FAILURE(writer->Close());
  return base_path;
}

…then we can read the Feather file by passing an arrow::dataset::IpcFileFormat:

auto format = std::make_shared<ds::ParquetFileFormat>();
// ...
auto factory = ds::FileSystemDatasetFactory::Make(filesystem, selector, format, options)
                   .ValueOrDie();

Customizing file formats

arrow::dataset::FileFormat objects have properties that control how files are read. For example:

auto format = std::make_shared<ds::ParquetFileFormat>();
format->reader_options.dict_columns.insert("a");

Will configure column "a" to be dictionary-encoded when read. Similarly, setting arrow::dataset::CsvFileFormat::parse_options lets us change things like reading comma-separated or tab-separated data.

Additionally, passing an arrow::dataset::FragmentScanOptions to arrow::dataset::ScannerBuilder::FragmentScanOptions() offers fine-grained control over data scanning. For example, for CSV files, we can change what values are converted into Boolean true and false at scan time.

Filtering data

So far, we’ve been reading the entire dataset, but if we need only a subset of the data, this can waste time or memory reading data we don’t need. The arrow::dataset::Scanner offers control over what data to read.

In this snippet, we use arrow::dataset::ScannerBuilder::Project() to select which columns to read:

182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
// Read a dataset, but select only column "b" and only rows where b < 4.
//
// This is useful when you only want a few columns from a dataset. Where possible,
// Datasets will push down the column selection such that less work is done.
std::shared_ptr<arrow::Table> FilterAndSelectDataset(
    const std::shared_ptr<fs::FileSystem>& filesystem,
    const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
  fs::FileSelector selector;
  selector.base_dir = base_dir;
  auto factory = ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
                                                    ds::FileSystemFactoryOptions())
                     .ValueOrDie();
  auto dataset = factory->Finish().ValueOrDie();
  // Read specified columns with a row filter
  auto scan_builder = dataset->NewScan().ValueOrDie();
  ABORT_ON_FAILURE(scan_builder->Project({"b"}));
  ABORT_ON_FAILURE(scan_builder->Filter(cp::less(cp::field_ref("b"), cp::literal(4))));
  auto scanner = scan_builder->Finish().ValueOrDie();
  return scanner->ToTable().ValueOrDie();
}

Some formats, such as Parquet, can reduce I/O costs here by reading only the specified columns from the filesystem.

A filter can be provided with arrow::dataset::ScannerBuilder::Filter(), so that rows which do not match the filter predicate will not be included in the returned table. Again, some formats, such as Parquet, can use this filter to reduce the amount of I/O needed.

182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
// Read a dataset, but select only column "b" and only rows where b < 4.
//
// This is useful when you only want a few columns from a dataset. Where possible,
// Datasets will push down the column selection such that less work is done.
std::shared_ptr<arrow::Table> FilterAndSelectDataset(
    const std::shared_ptr<fs::FileSystem>& filesystem,
    const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
  fs::FileSelector selector;
  selector.base_dir = base_dir;
  auto factory = ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
                                                    ds::FileSystemFactoryOptions())
                     .ValueOrDie();
  auto dataset = factory->Finish().ValueOrDie();
  // Read specified columns with a row filter
  auto scan_builder = dataset->NewScan().ValueOrDie();
  ABORT_ON_FAILURE(scan_builder->Project({"b"}));
  ABORT_ON_FAILURE(scan_builder->Filter(cp::less(cp::field_ref("b"), cp::literal(4))));
  auto scanner = scan_builder->Finish().ValueOrDie();
  return scanner->ToTable().ValueOrDie();
}

Projecting columns

In addition to selecting columns, arrow::dataset::ScannerBuilder::Project() can also be used for more complex projections, such as renaming columns, casting them to other types, and even deriving new columns based on evaluating expressions.

In this case, we pass a vector of expressions used to construct column values and a vector of names for the columns:

205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
// Read a dataset, but with column projection.
//
// This is useful to derive new columns from existing data. For example, here we
// demonstrate casting a column to a different type, and turning a numeric column into a
// boolean column based on a predicate. You could also rename columns or perform
// computations involving multiple columns.
std::shared_ptr<arrow::Table> ProjectDataset(
    const std::shared_ptr<fs::FileSystem>& filesystem,
    const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
  fs::FileSelector selector;
  selector.base_dir = base_dir;
  auto factory = ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
                                                    ds::FileSystemFactoryOptions())
                     .ValueOrDie();
  auto dataset = factory->Finish().ValueOrDie();
  // Read specified columns with a row filter
  auto scan_builder = dataset->NewScan().ValueOrDie();
  ABORT_ON_FAILURE(scan_builder->Project(
      {
          // Leave column "a" as-is.
          cp::field_ref("a"),
          // Cast column "b" to float32.
          cp::call("cast", {cp::field_ref("b")},
                   arrow::compute::CastOptions::Safe(arrow::float32())),
          // Derive a boolean column from "c".
          cp::equal(cp::field_ref("c"), cp::literal(1)),
      },
      {"a_renamed", "b_as_float32", "c_1"}));
  auto scanner = scan_builder->Finish().ValueOrDie();
  return scanner->ToTable().ValueOrDie();
}

This also determines the column selection; only the given columns will be present in the resulting table. If you want to include a derived column in addition to the existing columns, you can build up the expressions from the dataset schema:

239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
// Read a dataset, but with column projection.
//
// This time, we read all original columns plus one derived column. This simply combines
// the previous two examples: selecting a subset of columns by name, and deriving new
// columns with an expression.
std::shared_ptr<arrow::Table> SelectAndProjectDataset(
    const std::shared_ptr<fs::FileSystem>& filesystem,
    const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
  fs::FileSelector selector;
  selector.base_dir = base_dir;
  auto factory = ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
                                                    ds::FileSystemFactoryOptions())
                     .ValueOrDie();
  auto dataset = factory->Finish().ValueOrDie();
  // Read specified columns with a row filter
  auto scan_builder = dataset->NewScan().ValueOrDie();
  std::vector<std::string> names;
  std::vector<cp::Expression> exprs;
  // Read all the original columns.
  for (const auto& field : dataset->schema()->fields()) {
    names.push_back(field->name());
    exprs.push_back(cp::field_ref(field->name()));
  }
  // Also derive a new column.
  names.emplace_back("b_large");
  exprs.push_back(cp::greater(cp::field_ref("b"), cp::literal(1)));
  ABORT_ON_FAILURE(scan_builder->Project(exprs, names));
  auto scanner = scan_builder->Finish().ValueOrDie();
  return scanner->ToTable().ValueOrDie();
}

Note

When combining filters and projections, Arrow will determine all necessary columns to read. For instance, if you filter on a column that isn’t ultimately selected, Arrow will still read the column to evaluate the filter.

Reading and writing partitioned data

So far, we’ve been working with datasets consisting of flat directories with files. Oftentimes, a dataset will have one or more columns that are frequently filtered on. Instead of having to read and then filter the data, by organizing the files into a nested directory structure, we can define a partitioned dataset, where sub-directory names hold information about which subset of the data is stored in that directory. Then, we can more efficiently filter data by using that information to avoid loading files that don’t match the filter.

For example, a dataset partitioned by year and month may have the following layout:

dataset_name/
  year=2007/
    month=01/
       data0.parquet
       data1.parquet
       ...
    month=02/
       data0.parquet
       data1.parquet
       ...
    month=03/
    ...
  year=2008/
    month=01/
    ...
  ...

The above partitioning scheme is using “/key=value/” directory names, as found in Apache Hive. Under this convention, the file at dataset_name/year=2007/month=01/data0.parquet contains only data for which year == 2007 and month == 01.

Let’s create a small partitioned dataset. For this, we’ll use Arrow’s dataset writing functionality.

112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
// Set up a dataset by writing files with partitioning
std::string CreateExampleParquetHivePartitionedDataset(
    const std::shared_ptr<fs::FileSystem>& filesystem, const std::string& root_path) {
  auto base_path = root_path + "/parquet_dataset";
  ABORT_ON_FAILURE(filesystem->CreateDir(base_path));
  // Create an Arrow Table
  auto schema = arrow::schema(
      {arrow::field("a", arrow::int64()), arrow::field("b", arrow::int64()),
       arrow::field("c", arrow::int64()), arrow::field("part", arrow::utf8())});
  std::vector<std::shared_ptr<arrow::Array>> arrays(4);
  arrow::NumericBuilder<arrow::Int64Type> builder;
  ABORT_ON_FAILURE(builder.AppendValues({0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
  ABORT_ON_FAILURE(builder.Finish(&arrays[0]));
  builder.Reset();
  ABORT_ON_FAILURE(builder.AppendValues({9, 8, 7, 6, 5, 4, 3, 2, 1, 0}));
  ABORT_ON_FAILURE(builder.Finish(&arrays[1]));
  builder.Reset();
  ABORT_ON_FAILURE(builder.AppendValues({1, 2, 1, 2, 1, 2, 1, 2, 1, 2}));
  ABORT_ON_FAILURE(builder.Finish(&arrays[2]));
  arrow::StringBuilder string_builder;
  ABORT_ON_FAILURE(
      string_builder.AppendValues({"a", "a", "a", "a", "a", "b", "b", "b", "b", "b"}));
  ABORT_ON_FAILURE(string_builder.Finish(&arrays[3]));
  auto table = arrow::Table::Make(schema, arrays);
  // Write it using Datasets
  auto dataset = std::make_shared<ds::InMemoryDataset>(table);
  auto scanner_builder = dataset->NewScan().ValueOrDie();
  auto scanner = scanner_builder->Finish().ValueOrDie();

  // The partition schema determines which fields are part of the partitioning.
  auto partition_schema = arrow::schema({arrow::field("part", arrow::utf8())});
  // We'll use Hive-style partitioning, which creates directories with "key=value" pairs.
  auto partitioning = std::make_shared<ds::HivePartitioning>(partition_schema);
  // We'll write Parquet files.
  auto format = std::make_shared<ds::ParquetFileFormat>();
  ds::FileSystemDatasetWriteOptions write_options;
  write_options.file_write_options = format->DefaultWriteOptions();
  write_options.filesystem = filesystem;
  write_options.base_dir = base_path;
  write_options.partitioning = partitioning;
  write_options.basename_template = "part{i}.parquet";
  ABORT_ON_FAILURE(ds::FileSystemDataset::Write(write_options, scanner));
  return base_path;
}

The above created a directory with two subdirectories (“part=a” and “part=b”), and the Parquet files written in those directories no longer include the “part” column.

Reading this dataset, we now specify that the dataset should use a Hive-like partitioning scheme:

272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
// Read an entire dataset, but with partitioning information.
std::shared_ptr<arrow::Table> ScanPartitionedDataset(
    const std::shared_ptr<fs::FileSystem>& filesystem,
    const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
  fs::FileSelector selector;
  selector.base_dir = base_dir;
  selector.recursive = true;  // Make sure to search subdirectories
  ds::FileSystemFactoryOptions options;
  // We'll use Hive-style partitioning. We'll let Arrow Datasets infer the partition
  // schema.
  options.partitioning = ds::HivePartitioning::MakeFactory();
  auto factory = ds::FileSystemDatasetFactory::Make(filesystem, selector, format, options)
                     .ValueOrDie();
  auto dataset = factory->Finish().ValueOrDie();
  // Print out the fragments
  for (const auto& fragment : dataset->GetFragments().ValueOrDie()) {
    std::cout << "Found fragment: " << (*fragment)->ToString() << std::endl;
    std::cout << "Partition expression: "
              << (*fragment)->partition_expression().ToString() << std::endl;
  }
  auto scan_builder = dataset->NewScan().ValueOrDie();
  auto scanner = scan_builder->Finish().ValueOrDie();
  return scanner->ToTable().ValueOrDie();
}

Although the partition fields are not included in the actual Parquet files, they will be added back to the resulting table when scanning this dataset:

$ ./debug/dataset_documentation_example file:///tmp parquet_hive partitioned
Found fragment: /tmp/parquet_dataset/part=a/part0.parquet
Partition expression: (part == "a")
Found fragment: /tmp/parquet_dataset/part=b/part1.parquet
Partition expression: (part == "b")
Read 20 rows
a: int64
  -- field metadata --
  PARQUET:field_id: '1'
b: double
  -- field metadata --
  PARQUET:field_id: '2'
c: int64
  -- field metadata --
  PARQUET:field_id: '3'
part: string
----
# snip...

We can now filter on the partition keys, which avoids loading files altogether if they do not match the filter:

299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
// Read an entire dataset, but with partitioning information. Also, filter the dataset on
// the partition values.
std::shared_ptr<arrow::Table> FilterPartitionedDataset(
    const std::shared_ptr<fs::FileSystem>& filesystem,
    const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
  fs::FileSelector selector;
  selector.base_dir = base_dir;
  selector.recursive = true;
  ds::FileSystemFactoryOptions options;
  options.partitioning = ds::HivePartitioning::MakeFactory();
  auto factory = ds::FileSystemDatasetFactory::Make(filesystem, selector, format, options)
                     .ValueOrDie();
  auto dataset = factory->Finish().ValueOrDie();
  auto scan_builder = dataset->NewScan().ValueOrDie();
  // Filter based on the partition values. This will mean that we won't even read the
  // files whose partition expressions don't match the filter.
  ABORT_ON_FAILURE(
      scan_builder->Filter(cp::equal(cp::field_ref("part"), cp::literal("b"))));
  auto scanner = scan_builder->Finish().ValueOrDie();
  return scanner->ToTable().ValueOrDie();
}

Different partitioning schemes

The above example uses a Hive-like directory scheme, such as “/year=2009/month=11/day=15”. We specified this by passing the Hive partitioning factory. In this case, the types of the partition keys are inferred from the file paths.

It is also possible to directly construct the partitioning and explicitly define the schema of the partition keys. For example:

auto part = std::make_shared<ds::HivePartitioning>(arrow::schema({
    arrow::field("year", arrow::int16()),
    arrow::field("month", arrow::int8()),
    arrow::field("day", arrow::int32())
}));

Arrow supports another partitioning scheme, “directory partitioning”, where the segments in the file path represent the values of the partition keys without including the name (the field names are implicit in the segment’s index). For example, given field names “year”, “month”, and “day”, one path might be “/2019/11/15”.

Since the names are not included in the file paths, these must be specified when constructing a directory partitioning:

auto part = ds::DirectoryPartitioning::MakeFactory({"year", "month", "day"});

Directory partitioning also supports providing a full schema rather than inferring types from file paths.

Reading from other data sources

Reading in-memory data

If you already have data in memory that you’d like to use with the Datasets API (e.g. to filter/project data, or to write it out to a filesystem), you can wrap it in an arrow::dataset::InMemoryDataset:

auto table = arrow::Table::FromRecordBatches(...);
auto dataset = std::make_shared<arrow::dataset::InMemoryDataset>(std::move(table));
// Scan the dataset, filter, it, etc.
auto scanner_builder = dataset->NewScan();

In the example, we used the InMemoryDataset to write our example data to local disk which was used in the rest of the example:

112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
// Set up a dataset by writing files with partitioning
std::string CreateExampleParquetHivePartitionedDataset(
    const std::shared_ptr<fs::FileSystem>& filesystem, const std::string& root_path) {
  auto base_path = root_path + "/parquet_dataset";
  ABORT_ON_FAILURE(filesystem->CreateDir(base_path));
  // Create an Arrow Table
  auto schema = arrow::schema(
      {arrow::field("a", arrow::int64()), arrow::field("b", arrow::int64()),
       arrow::field("c", arrow::int64()), arrow::field("part", arrow::utf8())});
  std::vector<std::shared_ptr<arrow::Array>> arrays(4);
  arrow::NumericBuilder<arrow::Int64Type> builder;
  ABORT_ON_FAILURE(builder.AppendValues({0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
  ABORT_ON_FAILURE(builder.Finish(&arrays[0]));
  builder.Reset();
  ABORT_ON_FAILURE(builder.AppendValues({9, 8, 7, 6, 5, 4, 3, 2, 1, 0}));
  ABORT_ON_FAILURE(builder.Finish(&arrays[1]));
  builder.Reset();
  ABORT_ON_FAILURE(builder.AppendValues({1, 2, 1, 2, 1, 2, 1, 2, 1, 2}));
  ABORT_ON_FAILURE(builder.Finish(&arrays[2]));
  arrow::StringBuilder string_builder;
  ABORT_ON_FAILURE(
      string_builder.AppendValues({"a", "a", "a", "a", "a", "b", "b", "b", "b", "b"}));
  ABORT_ON_FAILURE(string_builder.Finish(&arrays[3]));
  auto table = arrow::Table::Make(schema, arrays);
  // Write it using Datasets
  auto dataset = std::make_shared<ds::InMemoryDataset>(table);
  auto scanner_builder = dataset->NewScan().ValueOrDie();
  auto scanner = scanner_builder->Finish().ValueOrDie();

  // The partition schema determines which fields are part of the partitioning.
  auto partition_schema = arrow::schema({arrow::field("part", arrow::utf8())});
  // We'll use Hive-style partitioning, which creates directories with "key=value" pairs.
  auto partitioning = std::make_shared<ds::HivePartitioning>(partition_schema);
  // We'll write Parquet files.
  auto format = std::make_shared<ds::ParquetFileFormat>();
  ds::FileSystemDatasetWriteOptions write_options;
  write_options.file_write_options = format->DefaultWriteOptions();
  write_options.filesystem = filesystem;
  write_options.base_dir = base_path;
  write_options.partitioning = partitioning;
  write_options.basename_template = "part{i}.parquet";
  ABORT_ON_FAILURE(ds::FileSystemDataset::Write(write_options, scanner));
  return base_path;
}

Reading from cloud storage

In addition to local files, Arrow Datasets also support reading from cloud storage systems, such as Amazon S3, by passing a different filesystem.

See the filesystem docs for more details on the available filesystems.

Full Example

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

// This example showcases various ways to work with Datasets. It's
// intended to be paired with the documentation.

#include <arrow/api.h>
#include <arrow/compute/cast.h>
#include <arrow/compute/exec/expression.h>
#include <arrow/dataset/dataset.h>
#include <arrow/dataset/discovery.h>
#include <arrow/dataset/file_base.h>
#include <arrow/dataset/file_ipc.h>
#include <arrow/dataset/file_parquet.h>
#include <arrow/dataset/scanner.h>
#include <arrow/filesystem/filesystem.h>
#include <arrow/ipc/writer.h>
#include <arrow/util/iterator.h>
#include <parquet/arrow/writer.h>

#include <iostream>
#include <vector>

namespace ds = arrow::dataset;
namespace fs = arrow::fs;
namespace cp = arrow::compute;

#define ABORT_ON_FAILURE(expr)                     \
  do {                                             \
    arrow::Status status_ = (expr);                \
    if (!status_.ok()) {                           \
      std::cerr << status_.message() << std::endl; \
      abort();                                     \
    }                                              \
  } while (0);

// (Doc section: Reading Datasets)
// Generate some data for the rest of this example.
std::shared_ptr<arrow::Table> CreateTable() {
  auto schema =
      arrow::schema({arrow::field("a", arrow::int64()), arrow::field("b", arrow::int64()),
                     arrow::field("c", arrow::int64())});
  std::shared_ptr<arrow::Array> array_a;
  std::shared_ptr<arrow::Array> array_b;
  std::shared_ptr<arrow::Array> array_c;
  arrow::NumericBuilder<arrow::Int64Type> builder;
  ABORT_ON_FAILURE(builder.AppendValues({0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
  ABORT_ON_FAILURE(builder.Finish(&array_a));
  builder.Reset();
  ABORT_ON_FAILURE(builder.AppendValues({9, 8, 7, 6, 5, 4, 3, 2, 1, 0}));
  ABORT_ON_FAILURE(builder.Finish(&array_b));
  builder.Reset();
  ABORT_ON_FAILURE(builder.AppendValues({1, 2, 1, 2, 1, 2, 1, 2, 1, 2}));
  ABORT_ON_FAILURE(builder.Finish(&array_c));
  return arrow::Table::Make(schema, {array_a, array_b, array_c});
}

// Set up a dataset by writing two Parquet files.
std::string CreateExampleParquetDataset(const std::shared_ptr<fs::FileSystem>& filesystem,
                                        const std::string& root_path) {
  auto base_path = root_path + "/parquet_dataset";
  ABORT_ON_FAILURE(filesystem->CreateDir(base_path));
  // Create an Arrow Table
  auto table = CreateTable();
  // Write it into two Parquet files
  auto output = filesystem->OpenOutputStream(base_path + "/data1.parquet").ValueOrDie();
  ABORT_ON_FAILURE(parquet::arrow::WriteTable(
      *table->Slice(0, 5), arrow::default_memory_pool(), output, /*chunk_size=*/2048));
  output = filesystem->OpenOutputStream(base_path + "/data2.parquet").ValueOrDie();
  ABORT_ON_FAILURE(parquet::arrow::WriteTable(
      *table->Slice(5), arrow::default_memory_pool(), output, /*chunk_size=*/2048));
  return base_path;
}
// (Doc section: Reading Datasets)

// (Doc section: Reading different file formats)
// Set up a dataset by writing two Feather files.
std::string CreateExampleFeatherDataset(const std::shared_ptr<fs::FileSystem>& filesystem,
                                        const std::string& root_path) {
  auto base_path = root_path + "/feather_dataset";
  ABORT_ON_FAILURE(filesystem->CreateDir(base_path));
  // Create an Arrow Table
  auto table = CreateTable();
  // Write it into two Feather files
  auto output = filesystem->OpenOutputStream(base_path + "/data1.feather").ValueOrDie();
  auto writer = arrow::ipc::MakeFileWriter(output.get(), table->schema()).ValueOrDie();
  ABORT_ON_FAILURE(writer->WriteTable(*table->Slice(0, 5)));
  ABORT_ON_FAILURE(writer->Close());
  output = filesystem->OpenOutputStream(base_path + "/data2.feather").ValueOrDie();
  writer = arrow::ipc::MakeFileWriter(output.get(), table->schema()).ValueOrDie();
  ABORT_ON_FAILURE(writer->WriteTable(*table->Slice(5)));
  ABORT_ON_FAILURE(writer->Close());
  return base_path;
}
// (Doc section: Reading different file formats)

// (Doc section: Reading and writing partitioned data)
// Set up a dataset by writing files with partitioning
std::string CreateExampleParquetHivePartitionedDataset(
    const std::shared_ptr<fs::FileSystem>& filesystem, const std::string& root_path) {
  auto base_path = root_path + "/parquet_dataset";
  ABORT_ON_FAILURE(filesystem->CreateDir(base_path));
  // Create an Arrow Table
  auto schema = arrow::schema(
      {arrow::field("a", arrow::int64()), arrow::field("b", arrow::int64()),
       arrow::field("c", arrow::int64()), arrow::field("part", arrow::utf8())});
  std::vector<std::shared_ptr<arrow::Array>> arrays(4);
  arrow::NumericBuilder<arrow::Int64Type> builder;
  ABORT_ON_FAILURE(builder.AppendValues({0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
  ABORT_ON_FAILURE(builder.Finish(&arrays[0]));
  builder.Reset();
  ABORT_ON_FAILURE(builder.AppendValues({9, 8, 7, 6, 5, 4, 3, 2, 1, 0}));
  ABORT_ON_FAILURE(builder.Finish(&arrays[1]));
  builder.Reset();
  ABORT_ON_FAILURE(builder.AppendValues({1, 2, 1, 2, 1, 2, 1, 2, 1, 2}));
  ABORT_ON_FAILURE(builder.Finish(&arrays[2]));
  arrow::StringBuilder string_builder;
  ABORT_ON_FAILURE(
      string_builder.AppendValues({"a", "a", "a", "a", "a", "b", "b", "b", "b", "b"}));
  ABORT_ON_FAILURE(string_builder.Finish(&arrays[3]));
  auto table = arrow::Table::Make(schema, arrays);
  // Write it using Datasets
  auto dataset = std::make_shared<ds::InMemoryDataset>(table);
  auto scanner_builder = dataset->NewScan().ValueOrDie();
  auto scanner = scanner_builder->Finish().ValueOrDie();

  // The partition schema determines which fields are part of the partitioning.
  auto partition_schema = arrow::schema({arrow::field("part", arrow::utf8())});
  // We'll use Hive-style partitioning, which creates directories with "key=value" pairs.
  auto partitioning = std::make_shared<ds::HivePartitioning>(partition_schema);
  // We'll write Parquet files.
  auto format = std::make_shared<ds::ParquetFileFormat>();
  ds::FileSystemDatasetWriteOptions write_options;
  write_options.file_write_options = format->DefaultWriteOptions();
  write_options.filesystem = filesystem;
  write_options.base_dir = base_path;
  write_options.partitioning = partitioning;
  write_options.basename_template = "part{i}.parquet";
  ABORT_ON_FAILURE(ds::FileSystemDataset::Write(write_options, scanner));
  return base_path;
}
// (Doc section: Reading and writing partitioned data)

// (Doc section: Dataset discovery)
// Read the whole dataset with the given format, without partitioning.
std::shared_ptr<arrow::Table> ScanWholeDataset(
    const std::shared_ptr<fs::FileSystem>& filesystem,
    const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
  // Create a dataset by scanning the filesystem for files
  fs::FileSelector selector;
  selector.base_dir = base_dir;
  auto factory = ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
                                                    ds::FileSystemFactoryOptions())
                     .ValueOrDie();
  auto dataset = factory->Finish().ValueOrDie();
  // Print out the fragments
  for (const auto& fragment : dataset->GetFragments().ValueOrDie()) {
    std::cout << "Found fragment: " << (*fragment)->ToString() << std::endl;
  }
  // Read the entire dataset as a Table
  auto scan_builder = dataset->NewScan().ValueOrDie();
  auto scanner = scan_builder->Finish().ValueOrDie();
  return scanner->ToTable().ValueOrDie();
}
// (Doc section: Dataset discovery)

// (Doc section: Filtering data)
// Read a dataset, but select only column "b" and only rows where b < 4.
//
// This is useful when you only want a few columns from a dataset. Where possible,
// Datasets will push down the column selection such that less work is done.
std::shared_ptr<arrow::Table> FilterAndSelectDataset(
    const std::shared_ptr<fs::FileSystem>& filesystem,
    const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
  fs::FileSelector selector;
  selector.base_dir = base_dir;
  auto factory = ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
                                                    ds::FileSystemFactoryOptions())
                     .ValueOrDie();
  auto dataset = factory->Finish().ValueOrDie();
  // Read specified columns with a row filter
  auto scan_builder = dataset->NewScan().ValueOrDie();
  ABORT_ON_FAILURE(scan_builder->Project({"b"}));
  ABORT_ON_FAILURE(scan_builder->Filter(cp::less(cp::field_ref("b"), cp::literal(4))));
  auto scanner = scan_builder->Finish().ValueOrDie();
  return scanner->ToTable().ValueOrDie();
}
// (Doc section: Filtering data)

// (Doc section: Projecting columns)
// Read a dataset, but with column projection.
//
// This is useful to derive new columns from existing data. For example, here we
// demonstrate casting a column to a different type, and turning a numeric column into a
// boolean column based on a predicate. You could also rename columns or perform
// computations involving multiple columns.
std::shared_ptr<arrow::Table> ProjectDataset(
    const std::shared_ptr<fs::FileSystem>& filesystem,
    const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
  fs::FileSelector selector;
  selector.base_dir = base_dir;
  auto factory = ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
                                                    ds::FileSystemFactoryOptions())
                     .ValueOrDie();
  auto dataset = factory->Finish().ValueOrDie();
  // Read specified columns with a row filter
  auto scan_builder = dataset->NewScan().ValueOrDie();
  ABORT_ON_FAILURE(scan_builder->Project(
      {
          // Leave column "a" as-is.
          cp::field_ref("a"),
          // Cast column "b" to float32.
          cp::call("cast", {cp::field_ref("b")},
                   arrow::compute::CastOptions::Safe(arrow::float32())),
          // Derive a boolean column from "c".
          cp::equal(cp::field_ref("c"), cp::literal(1)),
      },
      {"a_renamed", "b_as_float32", "c_1"}));
  auto scanner = scan_builder->Finish().ValueOrDie();
  return scanner->ToTable().ValueOrDie();
}
// (Doc section: Projecting columns)

// (Doc section: Projecting columns #2)
// Read a dataset, but with column projection.
//
// This time, we read all original columns plus one derived column. This simply combines
// the previous two examples: selecting a subset of columns by name, and deriving new
// columns with an expression.
std::shared_ptr<arrow::Table> SelectAndProjectDataset(
    const std::shared_ptr<fs::FileSystem>& filesystem,
    const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
  fs::FileSelector selector;
  selector.base_dir = base_dir;
  auto factory = ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
                                                    ds::FileSystemFactoryOptions())
                     .ValueOrDie();
  auto dataset = factory->Finish().ValueOrDie();
  // Read specified columns with a row filter
  auto scan_builder = dataset->NewScan().ValueOrDie();
  std::vector<std::string> names;
  std::vector<cp::Expression> exprs;
  // Read all the original columns.
  for (const auto& field : dataset->schema()->fields()) {
    names.push_back(field->name());
    exprs.push_back(cp::field_ref(field->name()));
  }
  // Also derive a new column.
  names.emplace_back("b_large");
  exprs.push_back(cp::greater(cp::field_ref("b"), cp::literal(1)));
  ABORT_ON_FAILURE(scan_builder->Project(exprs, names));
  auto scanner = scan_builder->Finish().ValueOrDie();
  return scanner->ToTable().ValueOrDie();
}
// (Doc section: Projecting columns #2)

// (Doc section: Reading and writing partitioned data #2)
// Read an entire dataset, but with partitioning information.
std::shared_ptr<arrow::Table> ScanPartitionedDataset(
    const std::shared_ptr<fs::FileSystem>& filesystem,
    const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
  fs::FileSelector selector;
  selector.base_dir = base_dir;
  selector.recursive = true;  // Make sure to search subdirectories
  ds::FileSystemFactoryOptions options;
  // We'll use Hive-style partitioning. We'll let Arrow Datasets infer the partition
  // schema.
  options.partitioning = ds::HivePartitioning::MakeFactory();
  auto factory = ds::FileSystemDatasetFactory::Make(filesystem, selector, format, options)
                     .ValueOrDie();
  auto dataset = factory->Finish().ValueOrDie();
  // Print out the fragments
  for (const auto& fragment : dataset->GetFragments().ValueOrDie()) {
    std::cout << "Found fragment: " << (*fragment)->ToString() << std::endl;
    std::cout << "Partition expression: "
              << (*fragment)->partition_expression().ToString() << std::endl;
  }
  auto scan_builder = dataset->NewScan().ValueOrDie();
  auto scanner = scan_builder->Finish().ValueOrDie();
  return scanner->ToTable().ValueOrDie();
}
// (Doc section: Reading and writing partitioned data #2)

// (Doc section: Reading and writing partitioned data #3)
// Read an entire dataset, but with partitioning information. Also, filter the dataset on
// the partition values.
std::shared_ptr<arrow::Table> FilterPartitionedDataset(
    const std::shared_ptr<fs::FileSystem>& filesystem,
    const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
  fs::FileSelector selector;
  selector.base_dir = base_dir;
  selector.recursive = true;
  ds::FileSystemFactoryOptions options;
  options.partitioning = ds::HivePartitioning::MakeFactory();
  auto factory = ds::FileSystemDatasetFactory::Make(filesystem, selector, format, options)
                     .ValueOrDie();
  auto dataset = factory->Finish().ValueOrDie();
  auto scan_builder = dataset->NewScan().ValueOrDie();
  // Filter based on the partition values. This will mean that we won't even read the
  // files whose partition expressions don't match the filter.
  ABORT_ON_FAILURE(
      scan_builder->Filter(cp::equal(cp::field_ref("part"), cp::literal("b"))));
  auto scanner = scan_builder->Finish().ValueOrDie();
  return scanner->ToTable().ValueOrDie();
}
// (Doc section: Reading and writing partitioned data #3)

int main(int argc, char** argv) {
  if (argc < 3) {
    // Fake success for CI purposes.
    return EXIT_SUCCESS;
  }

  std::string uri = argv[1];
  std::string format_name = argv[2];
  std::string mode = argc > 3 ? argv[3] : "no_filter";
  std::string root_path;
  auto fs = fs::FileSystemFromUri(uri, &root_path).ValueOrDie();

  std::string base_path;
  std::shared_ptr<ds::FileFormat> format;
  if (format_name == "feather") {
    format = std::make_shared<ds::IpcFileFormat>();
    base_path = CreateExampleFeatherDataset(fs, root_path);
  } else if (format_name == "parquet") {
    format = std::make_shared<ds::ParquetFileFormat>();
    base_path = CreateExampleParquetDataset(fs, root_path);
  } else if (format_name == "parquet_hive") {
    format = std::make_shared<ds::ParquetFileFormat>();
    base_path = CreateExampleParquetHivePartitionedDataset(fs, root_path);
  } else {
    std::cerr << "Unknown format: " << format_name << std::endl;
    std::cerr << "Supported formats: feather, parquet, parquet_hive" << std::endl;
    return EXIT_FAILURE;
  }

  std::shared_ptr<arrow::Table> table;
  if (mode == "no_filter") {
    table = ScanWholeDataset(fs, format, base_path);
  } else if (mode == "filter") {
    table = FilterAndSelectDataset(fs, format, base_path);
  } else if (mode == "project") {
    table = ProjectDataset(fs, format, base_path);
  } else if (mode == "select_project") {
    table = SelectAndProjectDataset(fs, format, base_path);
  } else if (mode == "partitioned") {
    table = ScanPartitionedDataset(fs, format, base_path);
  } else if (mode == "filter_partitioned") {
    table = FilterPartitionedDataset(fs, format, base_path);
  } else {
    std::cerr << "Unknown mode: " << mode << std::endl;
    std::cerr
        << "Supported modes: no_filter, filter, project, select_project, partitioned"
        << std::endl;
    return EXIT_FAILURE;
  }
  std::cout << "Read " << table->num_rows() << " rows" << std::endl;
  std::cout << table->ToString() << std::endl;
  return EXIT_SUCCESS;
}