Tabular Datasets#
See also
The Arrow Datasets library provides functionality to efficiently work with tabular, potentially larger than memory, and multi-file datasets. This includes:
A unified interface that supports different sources and file formats and different file systems (local, cloud).
Discovery of sources (crawling directories, handling partitioned datasets with various partitioning schemes, basic schema normalization, …)
Optimized reading with predicate pushdown (filtering rows), projection (selecting and deriving columns), and optionally parallel reading.
The supported file formats currently are Parquet, Feather / Arrow IPC, CSV and ORC (note that ORC datasets can currently only be read and not yet written). The goal is to expand support to other file formats and data sources (e.g. database connections) in the future.
Reading Datasets#
For the examples below, let’s create a small dataset consisting of a directory with two parquet files:
50// Generate some data for the rest of this example.
51arrow::Result<std::shared_ptr<arrow::Table>> CreateTable() {
52 auto schema =
53 arrow::schema({arrow::field("a", arrow::int64()), arrow::field("b", arrow::int64()),
54 arrow::field("c", arrow::int64())});
55 std::shared_ptr<arrow::Array> array_a;
56 std::shared_ptr<arrow::Array> array_b;
57 std::shared_ptr<arrow::Array> array_c;
58 arrow::NumericBuilder<arrow::Int64Type> builder;
59 ARROW_RETURN_NOT_OK(builder.AppendValues({0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
60 ARROW_RETURN_NOT_OK(builder.Finish(&array_a));
61 builder.Reset();
62 ARROW_RETURN_NOT_OK(builder.AppendValues({9, 8, 7, 6, 5, 4, 3, 2, 1, 0}));
63 ARROW_RETURN_NOT_OK(builder.Finish(&array_b));
64 builder.Reset();
65 ARROW_RETURN_NOT_OK(builder.AppendValues({1, 2, 1, 2, 1, 2, 1, 2, 1, 2}));
66 ARROW_RETURN_NOT_OK(builder.Finish(&array_c));
67 return arrow::Table::Make(schema, {array_a, array_b, array_c});
68}
69
70// Set up a dataset by writing two Parquet files.
71arrow::Result<std::string> CreateExampleParquetDataset(
72 const std::shared_ptr<fs::FileSystem>& filesystem, const std::string& root_path) {
73 auto base_path = root_path + "/parquet_dataset";
74 ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
75 // Create an Arrow Table
76 ARROW_ASSIGN_OR_RAISE(auto table, CreateTable());
77 // Write it into two Parquet files
78 ARROW_ASSIGN_OR_RAISE(auto output,
79 filesystem->OpenOutputStream(base_path + "/data1.parquet"));
80 ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable(
81 *table->Slice(0, 5), arrow::default_memory_pool(), output, /*chunk_size=*/2048));
82 ARROW_ASSIGN_OR_RAISE(output,
83 filesystem->OpenOutputStream(base_path + "/data2.parquet"));
84 ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable(
85 *table->Slice(5), arrow::default_memory_pool(), output, /*chunk_size=*/2048));
86 return base_path;
87}
(See the full example at bottom: Full Example.)
Dataset discovery#
A arrow::dataset::Dataset
object can be created using the various
arrow::dataset::DatasetFactory
objects. Here, we’ll use the
arrow::dataset::FileSystemDatasetFactory
, which can create a dataset
given a base directory path:
163// Read the whole dataset with the given format, without partitioning.
164arrow::Result<std::shared_ptr<arrow::Table>> ScanWholeDataset(
165 const std::shared_ptr<fs::FileSystem>& filesystem,
166 const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
167 // Create a dataset by scanning the filesystem for files
168 fs::FileSelector selector;
169 selector.base_dir = base_dir;
170 ARROW_ASSIGN_OR_RAISE(
171 auto factory, ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
172 ds::FileSystemFactoryOptions()));
173 ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
174 // Print out the fragments
175 ARROW_ASSIGN_OR_RAISE(auto fragments, dataset->GetFragments())
176 for (const auto& fragment : fragments) {
177 std::cout << "Found fragment: " << (*fragment)->ToString() << std::endl;
178 }
179 // Read the entire dataset as a Table
180 ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
181 ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
182 return scanner->ToTable();
183}
We’re also passing the filesystem to use and the file format to use for reading. This lets us choose between (for example) reading local files or files in Amazon S3, or between Parquet and CSV.
In addition to searching a base directory, we can list file paths manually.
Creating a arrow::dataset::Dataset
does not begin reading the data
itself. It only crawls the directory to find all the files (if needed), which can
be retrieved with arrow::dataset::FileSystemDataset::files()
:
// Print out the files crawled (only for FileSystemDataset)
for (const auto& filename : dataset->files()) {
std::cout << filename << std::endl;
}
…and infers the dataset’s schema (by default from the first file):
std::cout << dataset->schema()->ToString() << std::endl;
Using the arrow::dataset::Dataset::NewScan()
method, we can build a
arrow::dataset::Scanner
and read the dataset (or a portion of it) into
a arrow::Table
with the arrow::dataset::Scanner::ToTable()
method:
163// Read the whole dataset with the given format, without partitioning.
164arrow::Result<std::shared_ptr<arrow::Table>> ScanWholeDataset(
165 const std::shared_ptr<fs::FileSystem>& filesystem,
166 const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
167 // Create a dataset by scanning the filesystem for files
168 fs::FileSelector selector;
169 selector.base_dir = base_dir;
170 ARROW_ASSIGN_OR_RAISE(
171 auto factory, ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
172 ds::FileSystemFactoryOptions()));
173 ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
174 // Print out the fragments
175 ARROW_ASSIGN_OR_RAISE(auto fragments, dataset->GetFragments())
176 for (const auto& fragment : fragments) {
177 std::cout << "Found fragment: " << (*fragment)->ToString() << std::endl;
178 }
179 // Read the entire dataset as a Table
180 ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
181 ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
182 return scanner->ToTable();
183}
Note
Depending on the size of your dataset, this can require a lot of memory; see Filtering data below on filtering/projecting.
Reading different file formats#
The above examples use Parquet files on local disk, but the Dataset API provides a consistent interface across multiple file formats and filesystems. (See Reading from cloud storage for more information on the latter.) Currently, Parquet, ORC, Feather / Arrow IPC, and CSV file formats are supported; more formats are planned in the future.
If we save the table as Feather files instead of Parquet files:
91// Set up a dataset by writing two Feather files.
92arrow::Result<std::string> CreateExampleFeatherDataset(
93 const std::shared_ptr<fs::FileSystem>& filesystem, const std::string& root_path) {
94 auto base_path = root_path + "/feather_dataset";
95 ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
96 // Create an Arrow Table
97 ARROW_ASSIGN_OR_RAISE(auto table, CreateTable());
98 // Write it into two Feather files
99 ARROW_ASSIGN_OR_RAISE(auto output,
100 filesystem->OpenOutputStream(base_path + "/data1.feather"));
101 ARROW_ASSIGN_OR_RAISE(auto writer,
102 arrow::ipc::MakeFileWriter(output.get(), table->schema()));
103 ARROW_RETURN_NOT_OK(writer->WriteTable(*table->Slice(0, 5)));
104 ARROW_RETURN_NOT_OK(writer->Close());
105 ARROW_ASSIGN_OR_RAISE(output,
106 filesystem->OpenOutputStream(base_path + "/data2.feather"));
107 ARROW_ASSIGN_OR_RAISE(writer,
108 arrow::ipc::MakeFileWriter(output.get(), table->schema()));
109 ARROW_RETURN_NOT_OK(writer->WriteTable(*table->Slice(5)));
110 ARROW_RETURN_NOT_OK(writer->Close());
111 return base_path;
112}
…then we can read the Feather file by passing an arrow::dataset::IpcFileFormat
:
auto format = std::make_shared<ds::ParquetFileFormat>();
// ...
auto factory = ds::FileSystemDatasetFactory::Make(filesystem, selector, format, options)
.ValueOrDie();
Customizing file formats#
arrow::dataset::FileFormat
objects have properties that control how
files are read. For example:
auto format = std::make_shared<ds::ParquetFileFormat>();
format->reader_options.dict_columns.insert("a");
Will configure column "a"
to be dictionary-encoded when read. Similarly,
setting arrow::dataset::CsvFileFormat::parse_options
lets us change
things like reading comma-separated or tab-separated data.
Additionally, passing an arrow::dataset::FragmentScanOptions
to
arrow::dataset::ScannerBuilder::FragmentScanOptions()
offers fine-grained
control over data scanning. For example, for CSV files, we can change what values
are converted into Boolean true and false at scan time.
Filtering data#
So far, we’ve been reading the entire dataset, but if we need only a subset of the
data, this can waste time or memory reading data we don’t need. The
arrow::dataset::Scanner
offers control over what data to read.
In this snippet, we use arrow::dataset::ScannerBuilder::Project()
to select
which columns to read:
187// Read a dataset, but select only column "b" and only rows where b < 4.
188//
189// This is useful when you only want a few columns from a dataset. Where possible,
190// Datasets will push down the column selection such that less work is done.
191arrow::Result<std::shared_ptr<arrow::Table>> FilterAndSelectDataset(
192 const std::shared_ptr<fs::FileSystem>& filesystem,
193 const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
194 fs::FileSelector selector;
195 selector.base_dir = base_dir;
196 ARROW_ASSIGN_OR_RAISE(
197 auto factory, ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
198 ds::FileSystemFactoryOptions()));
199 ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
200 // Read specified columns with a row filter
201 ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
202 ARROW_RETURN_NOT_OK(scan_builder->Project({"b"}));
203 ARROW_RETURN_NOT_OK(scan_builder->Filter(cp::less(cp::field_ref("b"), cp::literal(4))));
204 ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
205 return scanner->ToTable();
206}
Some formats, such as Parquet, can reduce I/O costs here by reading only the specified columns from the filesystem.
A filter can be provided with arrow::dataset::ScannerBuilder::Filter()
, so
that rows which do not match the filter predicate will not be included in the
returned table. Again, some formats, such as Parquet, can use this filter to
reduce the amount of I/O needed.
187// Read a dataset, but select only column "b" and only rows where b < 4.
188//
189// This is useful when you only want a few columns from a dataset. Where possible,
190// Datasets will push down the column selection such that less work is done.
191arrow::Result<std::shared_ptr<arrow::Table>> FilterAndSelectDataset(
192 const std::shared_ptr<fs::FileSystem>& filesystem,
193 const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
194 fs::FileSelector selector;
195 selector.base_dir = base_dir;
196 ARROW_ASSIGN_OR_RAISE(
197 auto factory, ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
198 ds::FileSystemFactoryOptions()));
199 ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
200 // Read specified columns with a row filter
201 ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
202 ARROW_RETURN_NOT_OK(scan_builder->Project({"b"}));
203 ARROW_RETURN_NOT_OK(scan_builder->Filter(cp::less(cp::field_ref("b"), cp::literal(4))));
204 ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
205 return scanner->ToTable();
206}
Projecting columns#
In addition to selecting columns, arrow::dataset::ScannerBuilder::Project()
can also be used for more complex projections, such as renaming columns, casting
them to other types, and even deriving new columns based on evaluating
expressions.
In this case, we pass a vector of expressions used to construct column values and a vector of names for the columns:
210// Read a dataset, but with column projection.
211//
212// This is useful to derive new columns from existing data. For example, here we
213// demonstrate casting a column to a different type, and turning a numeric column into a
214// boolean column based on a predicate. You could also rename columns or perform
215// computations involving multiple columns.
216arrow::Result<std::shared_ptr<arrow::Table>> ProjectDataset(
217 const std::shared_ptr<fs::FileSystem>& filesystem,
218 const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
219 fs::FileSelector selector;
220 selector.base_dir = base_dir;
221 ARROW_ASSIGN_OR_RAISE(
222 auto factory, ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
223 ds::FileSystemFactoryOptions()));
224 ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
225 // Read specified columns with a row filter
226 ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
227 ARROW_RETURN_NOT_OK(scan_builder->Project(
228 {
229 // Leave column "a" as-is.
230 cp::field_ref("a"),
231 // Cast column "b" to float32.
232 cp::call("cast", {cp::field_ref("b")},
233 arrow::compute::CastOptions::Safe(arrow::float32())),
234 // Derive a boolean column from "c".
235 cp::equal(cp::field_ref("c"), cp::literal(1)),
236 },
237 {"a_renamed", "b_as_float32", "c_1"}));
238 ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
239 return scanner->ToTable();
240}
This also determines the column selection; only the given columns will be present in the resulting table. If you want to include a derived column in addition to the existing columns, you can build up the expressions from the dataset schema:
244// Read a dataset, but with column projection.
245//
246// This time, we read all original columns plus one derived column. This simply combines
247// the previous two examples: selecting a subset of columns by name, and deriving new
248// columns with an expression.
249arrow::Result<std::shared_ptr<arrow::Table>> SelectAndProjectDataset(
250 const std::shared_ptr<fs::FileSystem>& filesystem,
251 const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
252 fs::FileSelector selector;
253 selector.base_dir = base_dir;
254 ARROW_ASSIGN_OR_RAISE(
255 auto factory, ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
256 ds::FileSystemFactoryOptions()));
257 ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
258 // Read specified columns with a row filter
259 ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
260 std::vector<std::string> names;
261 std::vector<cp::Expression> exprs;
262 // Read all the original columns.
263 for (const auto& field : dataset->schema()->fields()) {
264 names.push_back(field->name());
265 exprs.push_back(cp::field_ref(field->name()));
266 }
267 // Also derive a new column.
268 names.emplace_back("b_large");
269 exprs.push_back(cp::greater(cp::field_ref("b"), cp::literal(1)));
270 ARROW_RETURN_NOT_OK(scan_builder->Project(exprs, names));
271 ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
272 return scanner->ToTable();
273}
Note
When combining filters and projections, Arrow will determine all necessary columns to read. For instance, if you filter on a column that isn’t ultimately selected, Arrow will still read the column to evaluate the filter.
Reading and writing partitioned data#
So far, we’ve been working with datasets consisting of flat directories with files. Oftentimes, a dataset will have one or more columns that are frequently filtered on. Instead of having to read and then filter the data, by organizing the files into a nested directory structure, we can define a partitioned dataset, where sub-directory names hold information about which subset of the data is stored in that directory. Then, we can more efficiently filter data by using that information to avoid loading files that don’t match the filter.
For example, a dataset partitioned by year and month may have the following layout:
dataset_name/
year=2007/
month=01/
data0.parquet
data1.parquet
...
month=02/
data0.parquet
data1.parquet
...
month=03/
...
year=2008/
month=01/
...
...
The above partitioning scheme is using “/key=value/” directory names, as found in
Apache Hive. Under this convention, the file at
dataset_name/year=2007/month=01/data0.parquet
contains only data for which
year == 2007
and month == 01
.
Let’s create a small partitioned dataset. For this, we’ll use Arrow’s dataset writing functionality.
116// Set up a dataset by writing files with partitioning
117arrow::Result<std::string> CreateExampleParquetHivePartitionedDataset(
118 const std::shared_ptr<fs::FileSystem>& filesystem, const std::string& root_path) {
119 auto base_path = root_path + "/parquet_dataset";
120 ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
121 // Create an Arrow Table
122 auto schema = arrow::schema(
123 {arrow::field("a", arrow::int64()), arrow::field("b", arrow::int64()),
124 arrow::field("c", arrow::int64()), arrow::field("part", arrow::utf8())});
125 std::vector<std::shared_ptr<arrow::Array>> arrays(4);
126 arrow::NumericBuilder<arrow::Int64Type> builder;
127 ARROW_RETURN_NOT_OK(builder.AppendValues({0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
128 ARROW_RETURN_NOT_OK(builder.Finish(&arrays[0]));
129 builder.Reset();
130 ARROW_RETURN_NOT_OK(builder.AppendValues({9, 8, 7, 6, 5, 4, 3, 2, 1, 0}));
131 ARROW_RETURN_NOT_OK(builder.Finish(&arrays[1]));
132 builder.Reset();
133 ARROW_RETURN_NOT_OK(builder.AppendValues({1, 2, 1, 2, 1, 2, 1, 2, 1, 2}));
134 ARROW_RETURN_NOT_OK(builder.Finish(&arrays[2]));
135 arrow::StringBuilder string_builder;
136 ARROW_RETURN_NOT_OK(
137 string_builder.AppendValues({"a", "a", "a", "a", "a", "b", "b", "b", "b", "b"}));
138 ARROW_RETURN_NOT_OK(string_builder.Finish(&arrays[3]));
139 auto table = arrow::Table::Make(schema, arrays);
140 // Write it using Datasets
141 auto dataset = std::make_shared<ds::InMemoryDataset>(table);
142 ARROW_ASSIGN_OR_RAISE(auto scanner_builder, dataset->NewScan());
143 ARROW_ASSIGN_OR_RAISE(auto scanner, scanner_builder->Finish());
144
145 // The partition schema determines which fields are part of the partitioning.
146 auto partition_schema = arrow::schema({arrow::field("part", arrow::utf8())});
147 // We'll use Hive-style partitioning, which creates directories with "key=value" pairs.
148 auto partitioning = std::make_shared<ds::HivePartitioning>(partition_schema);
149 // We'll write Parquet files.
150 auto format = std::make_shared<ds::ParquetFileFormat>();
151 ds::FileSystemDatasetWriteOptions write_options;
152 write_options.file_write_options = format->DefaultWriteOptions();
153 write_options.filesystem = filesystem;
154 write_options.base_dir = base_path;
155 write_options.partitioning = partitioning;
156 write_options.basename_template = "part{i}.parquet";
157 ARROW_RETURN_NOT_OK(ds::FileSystemDataset::Write(write_options, scanner));
158 return base_path;
159}
The above created a directory with two subdirectories (“part=a” and “part=b”), and the Parquet files written in those directories no longer include the “part” column.
Reading this dataset, we now specify that the dataset should use a Hive-like partitioning scheme:
277// Read an entire dataset, but with partitioning information.
278arrow::Result<std::shared_ptr<arrow::Table>> ScanPartitionedDataset(
279 const std::shared_ptr<fs::FileSystem>& filesystem,
280 const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
281 fs::FileSelector selector;
282 selector.base_dir = base_dir;
283 selector.recursive = true; // Make sure to search subdirectories
284 ds::FileSystemFactoryOptions options;
285 // We'll use Hive-style partitioning. We'll let Arrow Datasets infer the partition
286 // schema.
287 options.partitioning = ds::HivePartitioning::MakeFactory();
288 ARROW_ASSIGN_OR_RAISE(auto factory, ds::FileSystemDatasetFactory::Make(
289 filesystem, selector, format, options));
290 ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
291 // Print out the fragments
292 ARROW_ASSIGN_OR_RAISE(auto fragments, dataset->GetFragments());
293 for (const auto& fragment : fragments) {
294 std::cout << "Found fragment: " << (*fragment)->ToString() << std::endl;
295 std::cout << "Partition expression: "
296 << (*fragment)->partition_expression().ToString() << std::endl;
297 }
298 ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
299 ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
300 return scanner->ToTable();
301}
Although the partition fields are not included in the actual Parquet files, they will be added back to the resulting table when scanning this dataset:
$ ./debug/dataset_documentation_example file:///tmp parquet_hive partitioned
Found fragment: /tmp/parquet_dataset/part=a/part0.parquet
Partition expression: (part == "a")
Found fragment: /tmp/parquet_dataset/part=b/part1.parquet
Partition expression: (part == "b")
Read 20 rows
a: int64
-- field metadata --
PARQUET:field_id: '1'
b: double
-- field metadata --
PARQUET:field_id: '2'
c: int64
-- field metadata --
PARQUET:field_id: '3'
part: string
----
# snip...
We can now filter on the partition keys, which avoids loading files altogether if they do not match the filter:
305// Read an entire dataset, but with partitioning information. Also, filter the dataset on
306// the partition values.
307arrow::Result<std::shared_ptr<arrow::Table>> FilterPartitionedDataset(
308 const std::shared_ptr<fs::FileSystem>& filesystem,
309 const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
310 fs::FileSelector selector;
311 selector.base_dir = base_dir;
312 selector.recursive = true;
313 ds::FileSystemFactoryOptions options;
314 options.partitioning = ds::HivePartitioning::MakeFactory();
315 ARROW_ASSIGN_OR_RAISE(auto factory, ds::FileSystemDatasetFactory::Make(
316 filesystem, selector, format, options));
317 ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
318 ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
319 // Filter based on the partition values. This will mean that we won't even read the
320 // files whose partition expressions don't match the filter.
321 ARROW_RETURN_NOT_OK(
322 scan_builder->Filter(cp::equal(cp::field_ref("part"), cp::literal("b"))));
323 ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
324 return scanner->ToTable();
325}
Different partitioning schemes#
The above example uses a Hive-like directory scheme, such as “/year=2009/month=11/day=15”. We specified this by passing the Hive partitioning factory. In this case, the types of the partition keys are inferred from the file paths.
It is also possible to directly construct the partitioning and explicitly define the schema of the partition keys. For example:
auto part = std::make_shared<ds::HivePartitioning>(arrow::schema({
arrow::field("year", arrow::int16()),
arrow::field("month", arrow::int8()),
arrow::field("day", arrow::int32())
}));
Arrow supports another partitioning scheme, “directory partitioning”, where the segments in the file path represent the values of the partition keys without including the name (the field names are implicit in the segment’s index). For example, given field names “year”, “month”, and “day”, one path might be “/2019/11/15”.
Since the names are not included in the file paths, these must be specified when constructing a directory partitioning:
auto part = ds::DirectoryPartitioning::MakeFactory({"year", "month", "day"});
Directory partitioning also supports providing a full schema rather than inferring types from file paths.
Partitioning performance considerations#
Partitioning datasets has two aspects that affect performance: it increases the number of files and it creates a directory structure around the files. Both of these have benefits as well as costs. Depending on the configuration and the size of your dataset, the costs can outweigh the benefits.
Because partitions split up the dataset into multiple files, partitioned datasets can be read and written with parallelism. However, each additional file adds a little overhead in processing for filesystem interaction. It also increases the overall dataset size since each file has some shared metadata. For example, each parquet file contains the schema and group-level statistics. The number of partitions is a floor for the number of files. If you partition a dataset by date with a year of data, you will have at least 365 files. If you further partition by another dimension with 1,000 unique values, you will have up to 365,000 files. This fine of partitioning often leads to small files that mostly consist of metadata.
Partitioned datasets create nested folder structures, and those allow us to prune which files are loaded in a scan. However, this adds overhead to discovering files in the dataset, as we’ll need to recursively “list directory” to find the data files. Too fine partitions can cause problems here: Partitioning a dataset by date for a years worth of data will require 365 list calls to find all the files; adding another column with cardinality 1,000 will make that 365,365 calls.
The most optimal partitioning layout will depend on your data, access patterns, and which systems will be reading the data. Most systems, including Arrow, should work across a range of file sizes and partitioning layouts, but there are extremes you should avoid. These guidelines can help avoid some known worst cases:
Avoid files smaller than 20MB and larger than 2GB.
Avoid partitioning layouts with more than 10,000 distinct partitions.
For file formats that have a notion of groups within a file, such as Parquet, similar guidelines apply. Row groups can provide parallelism when reading and allow data skipping based on statistics, but very small groups can cause metadata to be a significant portion of file size. Arrow’s file writer provides sensible defaults for group sizing in most cases.
Reading from other data sources#
Reading in-memory data#
If you already have data in memory that you’d like to use with the Datasets API
(e.g. to filter/project data, or to write it out to a filesystem), you can wrap it
in an arrow::dataset::InMemoryDataset
:
auto table = arrow::Table::FromRecordBatches(...);
auto dataset = std::make_shared<arrow::dataset::InMemoryDataset>(std::move(table));
// Scan the dataset, filter, it, etc.
auto scanner_builder = dataset->NewScan();
In the example, we used the InMemoryDataset to write our example data to local disk which was used in the rest of the example:
116// Set up a dataset by writing files with partitioning
117arrow::Result<std::string> CreateExampleParquetHivePartitionedDataset(
118 const std::shared_ptr<fs::FileSystem>& filesystem, const std::string& root_path) {
119 auto base_path = root_path + "/parquet_dataset";
120 ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
121 // Create an Arrow Table
122 auto schema = arrow::schema(
123 {arrow::field("a", arrow::int64()), arrow::field("b", arrow::int64()),
124 arrow::field("c", arrow::int64()), arrow::field("part", arrow::utf8())});
125 std::vector<std::shared_ptr<arrow::Array>> arrays(4);
126 arrow::NumericBuilder<arrow::Int64Type> builder;
127 ARROW_RETURN_NOT_OK(builder.AppendValues({0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
128 ARROW_RETURN_NOT_OK(builder.Finish(&arrays[0]));
129 builder.Reset();
130 ARROW_RETURN_NOT_OK(builder.AppendValues({9, 8, 7, 6, 5, 4, 3, 2, 1, 0}));
131 ARROW_RETURN_NOT_OK(builder.Finish(&arrays[1]));
132 builder.Reset();
133 ARROW_RETURN_NOT_OK(builder.AppendValues({1, 2, 1, 2, 1, 2, 1, 2, 1, 2}));
134 ARROW_RETURN_NOT_OK(builder.Finish(&arrays[2]));
135 arrow::StringBuilder string_builder;
136 ARROW_RETURN_NOT_OK(
137 string_builder.AppendValues({"a", "a", "a", "a", "a", "b", "b", "b", "b", "b"}));
138 ARROW_RETURN_NOT_OK(string_builder.Finish(&arrays[3]));
139 auto table = arrow::Table::Make(schema, arrays);
140 // Write it using Datasets
141 auto dataset = std::make_shared<ds::InMemoryDataset>(table);
142 ARROW_ASSIGN_OR_RAISE(auto scanner_builder, dataset->NewScan());
143 ARROW_ASSIGN_OR_RAISE(auto scanner, scanner_builder->Finish());
144
145 // The partition schema determines which fields are part of the partitioning.
146 auto partition_schema = arrow::schema({arrow::field("part", arrow::utf8())});
147 // We'll use Hive-style partitioning, which creates directories with "key=value" pairs.
148 auto partitioning = std::make_shared<ds::HivePartitioning>(partition_schema);
149 // We'll write Parquet files.
150 auto format = std::make_shared<ds::ParquetFileFormat>();
151 ds::FileSystemDatasetWriteOptions write_options;
152 write_options.file_write_options = format->DefaultWriteOptions();
153 write_options.filesystem = filesystem;
154 write_options.base_dir = base_path;
155 write_options.partitioning = partitioning;
156 write_options.basename_template = "part{i}.parquet";
157 ARROW_RETURN_NOT_OK(ds::FileSystemDataset::Write(write_options, scanner));
158 return base_path;
159}
Reading from cloud storage#
In addition to local files, Arrow Datasets also support reading from cloud storage systems, such as Amazon S3, by passing a different filesystem.
See the filesystem docs for more details on the available filesystems.
A note on transactions & ACID guarantees#
The dataset API offers no transaction support or any ACID guarantees. This affects both reading and writing. Concurrent reads are fine. Concurrent writes or writes concurring with reads may have unexpected behavior. Various approaches can be used to avoid operating on the same files such as using a unique basename template for each writer, a temporary directory for new files, or separate storage of the file list instead of relying on directory discovery.
Unexpectedly killing the process while a write is in progress can leave the system in an inconsistent state. Write calls generally return as soon as the bytes to be written have been completely delivered to the OS page cache. Even though a write operation has been completed it is possible for part of the file to be lost if there is a sudden power loss immediately after the write call.
Most file formats have magic numbers which are written at the end. This means a partial file write can safely be detected and discarded. The CSV file format does not have any such concept and a partially written CSV file may be detected as valid.
Full Example#
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18// This example showcases various ways to work with Datasets. It's
19// intended to be paired with the documentation.
20
21#include <arrow/api.h>
22#include <arrow/compute/api.h>
23#include <arrow/compute/cast.h>
24#include <arrow/dataset/dataset.h>
25#include <arrow/dataset/discovery.h>
26#include <arrow/dataset/file_base.h>
27#include <arrow/dataset/file_ipc.h>
28#include <arrow/dataset/file_parquet.h>
29#include <arrow/dataset/scanner.h>
30#include <arrow/filesystem/filesystem.h>
31#include <arrow/ipc/writer.h>
32#include <arrow/util/iterator.h>
33#include <parquet/arrow/writer.h>
34#include "arrow/compute/expression.h"
35
36#include <iostream>
37#include <vector>
38
39namespace ds = arrow::dataset;
40namespace fs = arrow::fs;
41namespace cp = arrow::compute;
42
43/**
44 * \brief Run Example
45 *
46 * ./debug/dataset-documentation-example file:///<some_path>/<some_directory> parquet
47 */
48
49// (Doc section: Reading Datasets)
50// Generate some data for the rest of this example.
51arrow::Result<std::shared_ptr<arrow::Table>> CreateTable() {
52 auto schema =
53 arrow::schema({arrow::field("a", arrow::int64()), arrow::field("b", arrow::int64()),
54 arrow::field("c", arrow::int64())});
55 std::shared_ptr<arrow::Array> array_a;
56 std::shared_ptr<arrow::Array> array_b;
57 std::shared_ptr<arrow::Array> array_c;
58 arrow::NumericBuilder<arrow::Int64Type> builder;
59 ARROW_RETURN_NOT_OK(builder.AppendValues({0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
60 ARROW_RETURN_NOT_OK(builder.Finish(&array_a));
61 builder.Reset();
62 ARROW_RETURN_NOT_OK(builder.AppendValues({9, 8, 7, 6, 5, 4, 3, 2, 1, 0}));
63 ARROW_RETURN_NOT_OK(builder.Finish(&array_b));
64 builder.Reset();
65 ARROW_RETURN_NOT_OK(builder.AppendValues({1, 2, 1, 2, 1, 2, 1, 2, 1, 2}));
66 ARROW_RETURN_NOT_OK(builder.Finish(&array_c));
67 return arrow::Table::Make(schema, {array_a, array_b, array_c});
68}
69
70// Set up a dataset by writing two Parquet files.
71arrow::Result<std::string> CreateExampleParquetDataset(
72 const std::shared_ptr<fs::FileSystem>& filesystem, const std::string& root_path) {
73 auto base_path = root_path + "/parquet_dataset";
74 ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
75 // Create an Arrow Table
76 ARROW_ASSIGN_OR_RAISE(auto table, CreateTable());
77 // Write it into two Parquet files
78 ARROW_ASSIGN_OR_RAISE(auto output,
79 filesystem->OpenOutputStream(base_path + "/data1.parquet"));
80 ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable(
81 *table->Slice(0, 5), arrow::default_memory_pool(), output, /*chunk_size=*/2048));
82 ARROW_ASSIGN_OR_RAISE(output,
83 filesystem->OpenOutputStream(base_path + "/data2.parquet"));
84 ARROW_RETURN_NOT_OK(parquet::arrow::WriteTable(
85 *table->Slice(5), arrow::default_memory_pool(), output, /*chunk_size=*/2048));
86 return base_path;
87}
88// (Doc section: Reading Datasets)
89
90// (Doc section: Reading different file formats)
91// Set up a dataset by writing two Feather files.
92arrow::Result<std::string> CreateExampleFeatherDataset(
93 const std::shared_ptr<fs::FileSystem>& filesystem, const std::string& root_path) {
94 auto base_path = root_path + "/feather_dataset";
95 ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
96 // Create an Arrow Table
97 ARROW_ASSIGN_OR_RAISE(auto table, CreateTable());
98 // Write it into two Feather files
99 ARROW_ASSIGN_OR_RAISE(auto output,
100 filesystem->OpenOutputStream(base_path + "/data1.feather"));
101 ARROW_ASSIGN_OR_RAISE(auto writer,
102 arrow::ipc::MakeFileWriter(output.get(), table->schema()));
103 ARROW_RETURN_NOT_OK(writer->WriteTable(*table->Slice(0, 5)));
104 ARROW_RETURN_NOT_OK(writer->Close());
105 ARROW_ASSIGN_OR_RAISE(output,
106 filesystem->OpenOutputStream(base_path + "/data2.feather"));
107 ARROW_ASSIGN_OR_RAISE(writer,
108 arrow::ipc::MakeFileWriter(output.get(), table->schema()));
109 ARROW_RETURN_NOT_OK(writer->WriteTable(*table->Slice(5)));
110 ARROW_RETURN_NOT_OK(writer->Close());
111 return base_path;
112}
113// (Doc section: Reading different file formats)
114
115// (Doc section: Reading and writing partitioned data)
116// Set up a dataset by writing files with partitioning
117arrow::Result<std::string> CreateExampleParquetHivePartitionedDataset(
118 const std::shared_ptr<fs::FileSystem>& filesystem, const std::string& root_path) {
119 auto base_path = root_path + "/parquet_dataset";
120 ARROW_RETURN_NOT_OK(filesystem->CreateDir(base_path));
121 // Create an Arrow Table
122 auto schema = arrow::schema(
123 {arrow::field("a", arrow::int64()), arrow::field("b", arrow::int64()),
124 arrow::field("c", arrow::int64()), arrow::field("part", arrow::utf8())});
125 std::vector<std::shared_ptr<arrow::Array>> arrays(4);
126 arrow::NumericBuilder<arrow::Int64Type> builder;
127 ARROW_RETURN_NOT_OK(builder.AppendValues({0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
128 ARROW_RETURN_NOT_OK(builder.Finish(&arrays[0]));
129 builder.Reset();
130 ARROW_RETURN_NOT_OK(builder.AppendValues({9, 8, 7, 6, 5, 4, 3, 2, 1, 0}));
131 ARROW_RETURN_NOT_OK(builder.Finish(&arrays[1]));
132 builder.Reset();
133 ARROW_RETURN_NOT_OK(builder.AppendValues({1, 2, 1, 2, 1, 2, 1, 2, 1, 2}));
134 ARROW_RETURN_NOT_OK(builder.Finish(&arrays[2]));
135 arrow::StringBuilder string_builder;
136 ARROW_RETURN_NOT_OK(
137 string_builder.AppendValues({"a", "a", "a", "a", "a", "b", "b", "b", "b", "b"}));
138 ARROW_RETURN_NOT_OK(string_builder.Finish(&arrays[3]));
139 auto table = arrow::Table::Make(schema, arrays);
140 // Write it using Datasets
141 auto dataset = std::make_shared<ds::InMemoryDataset>(table);
142 ARROW_ASSIGN_OR_RAISE(auto scanner_builder, dataset->NewScan());
143 ARROW_ASSIGN_OR_RAISE(auto scanner, scanner_builder->Finish());
144
145 // The partition schema determines which fields are part of the partitioning.
146 auto partition_schema = arrow::schema({arrow::field("part", arrow::utf8())});
147 // We'll use Hive-style partitioning, which creates directories with "key=value" pairs.
148 auto partitioning = std::make_shared<ds::HivePartitioning>(partition_schema);
149 // We'll write Parquet files.
150 auto format = std::make_shared<ds::ParquetFileFormat>();
151 ds::FileSystemDatasetWriteOptions write_options;
152 write_options.file_write_options = format->DefaultWriteOptions();
153 write_options.filesystem = filesystem;
154 write_options.base_dir = base_path;
155 write_options.partitioning = partitioning;
156 write_options.basename_template = "part{i}.parquet";
157 ARROW_RETURN_NOT_OK(ds::FileSystemDataset::Write(write_options, scanner));
158 return base_path;
159}
160// (Doc section: Reading and writing partitioned data)
161
162// (Doc section: Dataset discovery)
163// Read the whole dataset with the given format, without partitioning.
164arrow::Result<std::shared_ptr<arrow::Table>> ScanWholeDataset(
165 const std::shared_ptr<fs::FileSystem>& filesystem,
166 const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
167 // Create a dataset by scanning the filesystem for files
168 fs::FileSelector selector;
169 selector.base_dir = base_dir;
170 ARROW_ASSIGN_OR_RAISE(
171 auto factory, ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
172 ds::FileSystemFactoryOptions()));
173 ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
174 // Print out the fragments
175 ARROW_ASSIGN_OR_RAISE(auto fragments, dataset->GetFragments())
176 for (const auto& fragment : fragments) {
177 std::cout << "Found fragment: " << (*fragment)->ToString() << std::endl;
178 }
179 // Read the entire dataset as a Table
180 ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
181 ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
182 return scanner->ToTable();
183}
184// (Doc section: Dataset discovery)
185
186// (Doc section: Filtering data)
187// Read a dataset, but select only column "b" and only rows where b < 4.
188//
189// This is useful when you only want a few columns from a dataset. Where possible,
190// Datasets will push down the column selection such that less work is done.
191arrow::Result<std::shared_ptr<arrow::Table>> FilterAndSelectDataset(
192 const std::shared_ptr<fs::FileSystem>& filesystem,
193 const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
194 fs::FileSelector selector;
195 selector.base_dir = base_dir;
196 ARROW_ASSIGN_OR_RAISE(
197 auto factory, ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
198 ds::FileSystemFactoryOptions()));
199 ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
200 // Read specified columns with a row filter
201 ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
202 ARROW_RETURN_NOT_OK(scan_builder->Project({"b"}));
203 ARROW_RETURN_NOT_OK(scan_builder->Filter(cp::less(cp::field_ref("b"), cp::literal(4))));
204 ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
205 return scanner->ToTable();
206}
207// (Doc section: Filtering data)
208
209// (Doc section: Projecting columns)
210// Read a dataset, but with column projection.
211//
212// This is useful to derive new columns from existing data. For example, here we
213// demonstrate casting a column to a different type, and turning a numeric column into a
214// boolean column based on a predicate. You could also rename columns or perform
215// computations involving multiple columns.
216arrow::Result<std::shared_ptr<arrow::Table>> ProjectDataset(
217 const std::shared_ptr<fs::FileSystem>& filesystem,
218 const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
219 fs::FileSelector selector;
220 selector.base_dir = base_dir;
221 ARROW_ASSIGN_OR_RAISE(
222 auto factory, ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
223 ds::FileSystemFactoryOptions()));
224 ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
225 // Read specified columns with a row filter
226 ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
227 ARROW_RETURN_NOT_OK(scan_builder->Project(
228 {
229 // Leave column "a" as-is.
230 cp::field_ref("a"),
231 // Cast column "b" to float32.
232 cp::call("cast", {cp::field_ref("b")},
233 arrow::compute::CastOptions::Safe(arrow::float32())),
234 // Derive a boolean column from "c".
235 cp::equal(cp::field_ref("c"), cp::literal(1)),
236 },
237 {"a_renamed", "b_as_float32", "c_1"}));
238 ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
239 return scanner->ToTable();
240}
241// (Doc section: Projecting columns)
242
243// (Doc section: Projecting columns #2)
244// Read a dataset, but with column projection.
245//
246// This time, we read all original columns plus one derived column. This simply combines
247// the previous two examples: selecting a subset of columns by name, and deriving new
248// columns with an expression.
249arrow::Result<std::shared_ptr<arrow::Table>> SelectAndProjectDataset(
250 const std::shared_ptr<fs::FileSystem>& filesystem,
251 const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
252 fs::FileSelector selector;
253 selector.base_dir = base_dir;
254 ARROW_ASSIGN_OR_RAISE(
255 auto factory, ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
256 ds::FileSystemFactoryOptions()));
257 ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
258 // Read specified columns with a row filter
259 ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
260 std::vector<std::string> names;
261 std::vector<cp::Expression> exprs;
262 // Read all the original columns.
263 for (const auto& field : dataset->schema()->fields()) {
264 names.push_back(field->name());
265 exprs.push_back(cp::field_ref(field->name()));
266 }
267 // Also derive a new column.
268 names.emplace_back("b_large");
269 exprs.push_back(cp::greater(cp::field_ref("b"), cp::literal(1)));
270 ARROW_RETURN_NOT_OK(scan_builder->Project(exprs, names));
271 ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
272 return scanner->ToTable();
273}
274// (Doc section: Projecting columns #2)
275
276// (Doc section: Reading and writing partitioned data #2)
277// Read an entire dataset, but with partitioning information.
278arrow::Result<std::shared_ptr<arrow::Table>> ScanPartitionedDataset(
279 const std::shared_ptr<fs::FileSystem>& filesystem,
280 const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
281 fs::FileSelector selector;
282 selector.base_dir = base_dir;
283 selector.recursive = true; // Make sure to search subdirectories
284 ds::FileSystemFactoryOptions options;
285 // We'll use Hive-style partitioning. We'll let Arrow Datasets infer the partition
286 // schema.
287 options.partitioning = ds::HivePartitioning::MakeFactory();
288 ARROW_ASSIGN_OR_RAISE(auto factory, ds::FileSystemDatasetFactory::Make(
289 filesystem, selector, format, options));
290 ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
291 // Print out the fragments
292 ARROW_ASSIGN_OR_RAISE(auto fragments, dataset->GetFragments());
293 for (const auto& fragment : fragments) {
294 std::cout << "Found fragment: " << (*fragment)->ToString() << std::endl;
295 std::cout << "Partition expression: "
296 << (*fragment)->partition_expression().ToString() << std::endl;
297 }
298 ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
299 ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
300 return scanner->ToTable();
301}
302// (Doc section: Reading and writing partitioned data #2)
303
304// (Doc section: Reading and writing partitioned data #3)
305// Read an entire dataset, but with partitioning information. Also, filter the dataset on
306// the partition values.
307arrow::Result<std::shared_ptr<arrow::Table>> FilterPartitionedDataset(
308 const std::shared_ptr<fs::FileSystem>& filesystem,
309 const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
310 fs::FileSelector selector;
311 selector.base_dir = base_dir;
312 selector.recursive = true;
313 ds::FileSystemFactoryOptions options;
314 options.partitioning = ds::HivePartitioning::MakeFactory();
315 ARROW_ASSIGN_OR_RAISE(auto factory, ds::FileSystemDatasetFactory::Make(
316 filesystem, selector, format, options));
317 ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
318 ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
319 // Filter based on the partition values. This will mean that we won't even read the
320 // files whose partition expressions don't match the filter.
321 ARROW_RETURN_NOT_OK(
322 scan_builder->Filter(cp::equal(cp::field_ref("part"), cp::literal("b"))));
323 ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
324 return scanner->ToTable();
325}
326// (Doc section: Reading and writing partitioned data #3)
327
328arrow::Status RunDatasetDocumentation(const std::string& format_name,
329 const std::string& uri, const std::string& mode) {
330 ARROW_RETURN_NOT_OK(arrow::compute::Initialize());
331
332 std::string base_path;
333 std::shared_ptr<ds::FileFormat> format;
334 std::string root_path;
335 ARROW_ASSIGN_OR_RAISE(auto fs, fs::FileSystemFromUri(uri, &root_path));
336
337 if (format_name == "feather") {
338 format = std::make_shared<ds::IpcFileFormat>();
339 ARROW_ASSIGN_OR_RAISE(base_path, CreateExampleFeatherDataset(fs, root_path));
340 } else if (format_name == "parquet") {
341 format = std::make_shared<ds::ParquetFileFormat>();
342 ARROW_ASSIGN_OR_RAISE(base_path, CreateExampleParquetDataset(fs, root_path));
343 } else if (format_name == "parquet_hive") {
344 format = std::make_shared<ds::ParquetFileFormat>();
345 ARROW_ASSIGN_OR_RAISE(base_path,
346 CreateExampleParquetHivePartitionedDataset(fs, root_path));
347 } else {
348 std::cerr << "Unknown format: " << format_name << std::endl;
349 std::cerr << "Supported formats: feather, parquet, parquet_hive" << std::endl;
350 return arrow::Status::ExecutionError("Dataset creating failed.");
351 }
352
353 std::shared_ptr<arrow::Table> table;
354 if (mode == "no_filter") {
355 ARROW_ASSIGN_OR_RAISE(table, ScanWholeDataset(fs, format, base_path));
356 } else if (mode == "filter") {
357 ARROW_ASSIGN_OR_RAISE(table, FilterAndSelectDataset(fs, format, base_path));
358 } else if (mode == "project") {
359 ARROW_ASSIGN_OR_RAISE(table, ProjectDataset(fs, format, base_path));
360 } else if (mode == "select_project") {
361 ARROW_ASSIGN_OR_RAISE(table, SelectAndProjectDataset(fs, format, base_path));
362 } else if (mode == "partitioned") {
363 ARROW_ASSIGN_OR_RAISE(table, ScanPartitionedDataset(fs, format, base_path));
364 } else if (mode == "filter_partitioned") {
365 ARROW_ASSIGN_OR_RAISE(table, FilterPartitionedDataset(fs, format, base_path));
366 } else {
367 std::cerr << "Unknown mode: " << mode << std::endl;
368 std::cerr
369 << "Supported modes: no_filter, filter, project, select_project, partitioned"
370 << std::endl;
371 return arrow::Status::ExecutionError("Dataset reading failed.");
372 }
373 std::cout << "Read " << table->num_rows() << " rows" << std::endl;
374 std::cout << table->ToString() << std::endl;
375 return arrow::Status::OK();
376}
377
378int main(int argc, char** argv) {
379 if (argc < 3) {
380 // Fake success for CI purposes.
381 return EXIT_SUCCESS;
382 }
383
384 std::string uri = argv[1];
385 std::string format_name = argv[2];
386 std::string mode = argc > 3 ? argv[3] : "no_filter";
387
388 auto status = RunDatasetDocumentation(format_name, uri, mode);
389 if (!status.ok()) {
390 std::cerr << status.ToString() << std::endl;
391 return EXIT_FAILURE;
392 }
393 return EXIT_SUCCESS;
394}