Tabular Datasets¶
See also
Warning
The arrow::dataset
namespace is experimental, and a stable API
is not yet guaranteed.
The Arrow Datasets library provides functionality to efficiently work with tabular, potentially larger than memory, and multi-file datasets. This includes:
A unified interface that supports different sources and file formats (currently, Parquet, Feather / Arrow IPC, and CSV files) and different file systems (local, cloud).
Discovery of sources (crawling directories, handling partitioned datasets with various partitioning schemes, basic schema normalization, …)
Optimized reading with predicate pushdown (filtering rows), projection (selecting and deriving columns), and optionally parallel reading.
The goal is to expand support to other file formats and data sources (e.g. database connections) in the future.
Reading Datasets¶
For the examples below, let’s create a small dataset consisting of a directory with two parquet files:
50 // Generate some data for the rest of this example.
51 std::shared_ptr<arrow::Table> CreateTable() {
52 auto schema =
53 arrow::schema({arrow::field("a", arrow::int64()), arrow::field("b", arrow::int64()),
54 arrow::field("c", arrow::int64())});
55 std::shared_ptr<arrow::Array> array_a;
56 std::shared_ptr<arrow::Array> array_b;
57 std::shared_ptr<arrow::Array> array_c;
58 arrow::NumericBuilder<arrow::Int64Type> builder;
59 ABORT_ON_FAILURE(builder.AppendValues({0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
60 ABORT_ON_FAILURE(builder.Finish(&array_a));
61 builder.Reset();
62 ABORT_ON_FAILURE(builder.AppendValues({9, 8, 7, 6, 5, 4, 3, 2, 1, 0}));
63 ABORT_ON_FAILURE(builder.Finish(&array_b));
64 builder.Reset();
65 ABORT_ON_FAILURE(builder.AppendValues({1, 2, 1, 2, 1, 2, 1, 2, 1, 2}));
66 ABORT_ON_FAILURE(builder.Finish(&array_c));
67 return arrow::Table::Make(schema, {array_a, array_b, array_c});
68 }
69
70 // Set up a dataset by writing two Parquet files.
71 std::string CreateExampleParquetDataset(const std::shared_ptr<fs::FileSystem>& filesystem,
72 const std::string& root_path) {
73 auto base_path = root_path + "/parquet_dataset";
74 ABORT_ON_FAILURE(filesystem->CreateDir(base_path));
75 // Create an Arrow Table
76 auto table = CreateTable();
77 // Write it into two Parquet files
78 auto output = filesystem->OpenOutputStream(base_path + "/data1.parquet").ValueOrDie();
79 ABORT_ON_FAILURE(parquet::arrow::WriteTable(
80 *table->Slice(0, 5), arrow::default_memory_pool(), output, /*chunk_size=*/2048));
81 output = filesystem->OpenOutputStream(base_path + "/data2.parquet").ValueOrDie();
82 ABORT_ON_FAILURE(parquet::arrow::WriteTable(
83 *table->Slice(5), arrow::default_memory_pool(), output, /*chunk_size=*/2048));
84 return base_path;
85 }
(See the full example at bottom: Full Example.)
Dataset discovery¶
A arrow::dataset::Dataset
object can be created using the various
arrow::dataset::DatasetFactory
objects. Here, we’ll use the
arrow::dataset::FileSystemDatasetFactory
, which can create a dataset
given a base directory path:
151 // Read the whole dataset with the given format, without partitioning.
152 std::shared_ptr<arrow::Table> ScanWholeDataset(
153 const std::shared_ptr<fs::FileSystem>& filesystem,
154 const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
155 // Create a dataset by scanning the filesystem for files
156 fs::FileSelector selector;
157 selector.base_dir = base_dir;
158 auto factory = ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
159 ds::FileSystemFactoryOptions())
160 .ValueOrDie();
161 auto dataset = factory->Finish().ValueOrDie();
162 // Print out the fragments
163 for (const auto& fragment : dataset->GetFragments().ValueOrDie()) {
164 std::cout << "Found fragment: " << (*fragment)->ToString() << std::endl;
165 }
We’re also passing the filesystem to use and the file format to use for reading. This lets us choose between (for example) reading local files or files in Amazon S3, or between Parquet and CSV.
In addition to searching a base directory, we can list file paths manually.
Creating a arrow::dataset::Dataset
does not begin reading the data
itself. It only crawls the directory to find all the files (if needed), which can
be retrieved with arrow::dataset::FileSystemDataset::files()
:
// Print out the files crawled (only for FileSystemDataset)
for (const auto& filename : dataset->files()) {
std::cout << filename << std::endl;
}
…and infers the dataset’s schema (by default from the first file):
std::cout << dataset->schema()->ToString() << std::endl;
Using the arrow::dataset::Dataset::NewScan()
method, we can build a
arrow::dataset::Scanner
and read the dataset (or a portion of it) into
a arrow::Table
with the arrow::dataset::Scanner::ToTable()
method:
151 // Read the whole dataset with the given format, without partitioning.
152 std::shared_ptr<arrow::Table> ScanWholeDataset(
153 const std::shared_ptr<fs::FileSystem>& filesystem,
154 const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
155 // Create a dataset by scanning the filesystem for files
156 fs::FileSelector selector;
157 selector.base_dir = base_dir;
158 auto factory = ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
159 ds::FileSystemFactoryOptions())
160 .ValueOrDie();
161 auto dataset = factory->Finish().ValueOrDie();
162 // Print out the fragments
163 for (const auto& fragment : dataset->GetFragments().ValueOrDie()) {
164 std::cout << "Found fragment: " << (*fragment)->ToString() << std::endl;
165 }
166 // Read the entire dataset as a Table
167 auto scan_builder = dataset->NewScan().ValueOrDie();
168 auto scanner = scan_builder->Finish().ValueOrDie();
169 return scanner->ToTable().ValueOrDie();
170 }
Note
Depending on the size of your dataset, this can require a lot of memory; see Filtering data below on filtering/projecting.
Reading different file formats¶
The above examples use Parquet files on local disk, but the Dataset API provides a consistent interface across multiple file formats and filesystems. (See Reading from cloud storage for more information on the latter.) Currently, Parquet, Feather / Arrow IPC, and CSV file formats are supported; more formats are planned in the future.
If we save the table as Feather files instead of Parquet files:
87 // Set up a dataset by writing two Feather files.
88 std::string CreateExampleFeatherDataset(const std::shared_ptr<fs::FileSystem>& filesystem,
89 const std::string& root_path) {
90 auto base_path = root_path + "/feather_dataset";
91 ABORT_ON_FAILURE(filesystem->CreateDir(base_path));
92 // Create an Arrow Table
93 auto table = CreateTable();
94 // Write it into two Feather files
95 auto output = filesystem->OpenOutputStream(base_path + "/data1.feather").ValueOrDie();
96 auto writer = arrow::ipc::MakeFileWriter(output.get(), table->schema()).ValueOrDie();
97 ABORT_ON_FAILURE(writer->WriteTable(*table->Slice(0, 5)));
98 ABORT_ON_FAILURE(writer->Close());
99 output = filesystem->OpenOutputStream(base_path + "/data2.feather").ValueOrDie();
100 writer = arrow::ipc::MakeFileWriter(output.get(), table->schema()).ValueOrDie();
101 ABORT_ON_FAILURE(writer->WriteTable(*table->Slice(5)));
102 ABORT_ON_FAILURE(writer->Close());
103 return base_path;
104 }
…then we can read the Feather file by passing an arrow::dataset::IpcFileFormat
:
1 format = std::make_shared<ds::IpcFileFormat>();
2 table = ScanWholeDataset(fs, format, base_path);
Customizing file formats¶
arrow::dataset::FileFormat
objects have properties that control how
files are read. For example:
auto format = std::make_shared<ds::ParquetFileFormat>();
format->reader_options.dict_columns.insert("a");
Will configure column "a"
to be dictionary-encoded when read. Similarly,
setting arrow::dataset::CsvFileFormat::parse_options
lets us change
things like reading comma-separated or tab-separated data.
Additionally, passing an arrow::dataset::FragmentScanOptions
to
arrow::dataset::ScannerBuilder::FragmentScanOptions()
offers fine-grained
control over data scanning. For example, for CSV files, we can change what values
are converted into Boolean true and false at scan time.
Filtering data¶
So far, we’ve been reading the entire dataset, but if we need only a subset of the
data, this can waste time or memory reading data we don’t need. The
arrow::dataset::Scanner
offers control over what data to read.
In this snippet, we use arrow::dataset::ScannerBuilder::Project()
to select
which columns to read:
172 // Read a dataset, but select only column "b" and only rows where b < 4.
173 //
174 // This is useful when you only want a few columns from a dataset. Where possible,
175 // Datasets will push down the column selection such that less work is done.
176 std::shared_ptr<arrow::Table> FilterAndSelectDataset(
177 const std::shared_ptr<fs::FileSystem>& filesystem,
178 const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
179 fs::FileSelector selector;
180 selector.base_dir = base_dir;
181 auto factory = ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
182 ds::FileSystemFactoryOptions())
183 .ValueOrDie();
184 auto dataset = factory->Finish().ValueOrDie();
185 // Read specified columns with a row filter
186 auto scan_builder = dataset->NewScan().ValueOrDie();
187 ABORT_ON_FAILURE(scan_builder->Project({"b"}));
188 ABORT_ON_FAILURE(scan_builder->Filter(ds::less(ds::field_ref("b"), ds::literal(4))));
189 auto scanner = scan_builder->Finish().ValueOrDie();
190 return scanner->ToTable().ValueOrDie();
191 }
Some formats, such as Parquet, can reduce I/O costs here by reading only the specified columns from the filesystem.
A filter can be provided with arrow::dataset::ScannerBuilder::Filter()
, so
that rows which do not match the filter predicate will not be included in the
returned table. Again, some formats, such as Parquet, can use this filter to
reduce the amount of I/O needed.
172 // Read a dataset, but select only column "b" and only rows where b < 4.
173 //
174 // This is useful when you only want a few columns from a dataset. Where possible,
175 // Datasets will push down the column selection such that less work is done.
176 std::shared_ptr<arrow::Table> FilterAndSelectDataset(
177 const std::shared_ptr<fs::FileSystem>& filesystem,
178 const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
179 fs::FileSelector selector;
180 selector.base_dir = base_dir;
181 auto factory = ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
182 ds::FileSystemFactoryOptions())
183 .ValueOrDie();
184 auto dataset = factory->Finish().ValueOrDie();
185 // Read specified columns with a row filter
186 auto scan_builder = dataset->NewScan().ValueOrDie();
187 ABORT_ON_FAILURE(scan_builder->Project({"b"}));
188 ABORT_ON_FAILURE(scan_builder->Filter(ds::less(ds::field_ref("b"), ds::literal(4))));
189 auto scanner = scan_builder->Finish().ValueOrDie();
190 return scanner->ToTable().ValueOrDie();
191 }
Projecting columns¶
In addition to selecting columns, arrow::dataset::ScannerBuilder::Project()
can also be used for more complex projections, such as renaming columns, casting
them to other types, and even deriving new columns based on evaluating
expressions.
In this case, we pass a vector of expressions used to construct column values and a vector of names for the columns:
193 // Read a dataset, but with column projection.
194 //
195 // This is useful to derive new columns from existing data. For example, here we
196 // demonstrate casting a column to a different type, and turning a numeric column into a
197 // boolean column based on a predicate. You could also rename columns or perform
198 // computations involving multiple columns.
199 std::shared_ptr<arrow::Table> ProjectDataset(
200 const std::shared_ptr<fs::FileSystem>& filesystem,
201 const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
202 fs::FileSelector selector;
203 selector.base_dir = base_dir;
204 auto factory = ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
205 ds::FileSystemFactoryOptions())
206 .ValueOrDie();
207 auto dataset = factory->Finish().ValueOrDie();
208 // Read specified columns with a row filter
209 auto scan_builder = dataset->NewScan().ValueOrDie();
210 ABORT_ON_FAILURE(scan_builder->Project(
211 {
212 // Leave column "a" as-is.
213 ds::field_ref("a"),
214 // Cast column "b" to float32.
215 ds::call("cast", {ds::field_ref("b")},
216 arrow::compute::CastOptions::Safe(arrow::float32())),
217 // Derive a boolean column from "c".
218 ds::equal(ds::field_ref("c"), ds::literal(1)),
219 },
220 {"a_renamed", "b_as_float32", "c_1"}));
221 auto scanner = scan_builder->Finish().ValueOrDie();
222 return scanner->ToTable().ValueOrDie();
223 }
This also determines the column selection; only the given columns will be present in the resulting table. If you want to include a derived column in addition to the existing columns, you can build up the expressions from the dataset schema:
225 // Read a dataset, but with column projection.
226 //
227 // This time, we read all original columns plus one derived column. This simply combines
228 // the previous two examples: selecting a subset of columns by name, and deriving new
229 // columns with an expression.
230 std::shared_ptr<arrow::Table> SelectAndProjectDataset(
231 const std::shared_ptr<fs::FileSystem>& filesystem,
232 const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
233 fs::FileSelector selector;
234 selector.base_dir = base_dir;
235 auto factory = ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
236 ds::FileSystemFactoryOptions())
237 .ValueOrDie();
238 auto dataset = factory->Finish().ValueOrDie();
239 // Read specified columns with a row filter
240 auto scan_builder = dataset->NewScan().ValueOrDie();
241 std::vector<std::string> names;
242 std::vector<ds::Expression> exprs;
243 // Read all the original columns.
244 for (const auto& field : dataset->schema()->fields()) {
245 names.push_back(field->name());
246 exprs.push_back(ds::field_ref(field->name()));
247 }
248 // Also derive a new column.
249 names.push_back("b_large");
250 exprs.push_back(ds::greater(ds::field_ref("b"), ds::literal(1)));
251 ABORT_ON_FAILURE(scan_builder->Project(exprs, names));
252 auto scanner = scan_builder->Finish().ValueOrDie();
253 return scanner->ToTable().ValueOrDie();
254 }
Note
When combining filters and projections, Arrow will determine all necessary columns to read. For instance, if you filter on a column that isn’t ultimately selected, Arrow will still read the column to evaluate the filter.
Reading and writing partitioned data¶
So far, we’ve been working with datasets consisting of flat directories with files. Oftentimes, a dataset will have one or more columns that are frequently filtered on. Instead of having to read and then filter the data, by organizing the files into a nested directory structure, we can define a partitioned dataset, where sub-directory names hold information about which subset of the data is stored in that directory. Then, we can more efficiently filter data by using that information to avoid loading files that don’t match the filter.
For example, a dataset partitioned by year and month may have the following layout:
dataset_name/
year=2007/
month=01/
data0.parquet
data1.parquet
...
month=02/
data0.parquet
data1.parquet
...
month=03/
...
year=2008/
month=01/
...
...
The above partitioning scheme is using “/key=value/” directory names, as found in
Apache Hive. Under this convention, the file at
dataset_name/year=2007/month=01/data0.parquet
contains only data for which
year == 2007
and month == 01
.
Let’s create a small partitioned dataset. For this, we’ll use Arrow’s dataset writing functionality.
106 // Set up a dataset by writing files with partitioning
107 std::string CreateExampleParquetHivePartitionedDataset(
108 const std::shared_ptr<fs::FileSystem>& filesystem, const std::string& root_path) {
109 auto base_path = root_path + "/parquet_dataset";
110 ABORT_ON_FAILURE(filesystem->CreateDir(base_path));
111 // Create an Arrow Table
112 auto schema = arrow::schema(
113 {arrow::field("a", arrow::int64()), arrow::field("b", arrow::int64()),
114 arrow::field("c", arrow::int64()), arrow::field("part", arrow::utf8())});
115 std::vector<std::shared_ptr<arrow::Array>> arrays(4);
116 arrow::NumericBuilder<arrow::Int64Type> builder;
117 ABORT_ON_FAILURE(builder.AppendValues({0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
118 ABORT_ON_FAILURE(builder.Finish(&arrays[0]));
119 builder.Reset();
120 ABORT_ON_FAILURE(builder.AppendValues({9, 8, 7, 6, 5, 4, 3, 2, 1, 0}));
121 ABORT_ON_FAILURE(builder.Finish(&arrays[1]));
122 builder.Reset();
123 ABORT_ON_FAILURE(builder.AppendValues({1, 2, 1, 2, 1, 2, 1, 2, 1, 2}));
124 ABORT_ON_FAILURE(builder.Finish(&arrays[2]));
125 arrow::StringBuilder string_builder;
126 ABORT_ON_FAILURE(
127 string_builder.AppendValues({"a", "a", "a", "a", "a", "b", "b", "b", "b", "b"}));
128 ABORT_ON_FAILURE(string_builder.Finish(&arrays[3]));
129 auto table = arrow::Table::Make(schema, arrays);
130 // Write it using Datasets
131 auto dataset = std::make_shared<ds::InMemoryDataset>(table);
132 auto scanner_builder = dataset->NewScan().ValueOrDie();
133 auto scanner = scanner_builder->Finish().ValueOrDie();
134
135 // The partition schema determines which fields are part of the partitioning.
136 auto partition_schema = arrow::schema({arrow::field("part", arrow::utf8())});
137 // We'll use Hive-style partitioning, which creates directories with "key=value" pairs.
138 auto partitioning = std::make_shared<ds::HivePartitioning>(partition_schema);
139 // We'll write Parquet files.
140 auto format = std::make_shared<ds::ParquetFileFormat>();
141 ds::FileSystemDatasetWriteOptions write_options;
142 write_options.file_write_options = format->DefaultWriteOptions();
143 write_options.filesystem = filesystem;
144 write_options.base_dir = base_path;
145 write_options.partitioning = partitioning;
146 write_options.basename_template = "part{i}.parquet";
147 ABORT_ON_FAILURE(ds::FileSystemDataset::Write(write_options, scanner));
148 return base_path;
149 }
The above created a directory with two subdirectories (“part=a” and “part=b”), and the Parquet files written in those directories no longer include the “part” column.
Reading this dataset, we now specify that the dataset should use a Hive-like partitioning scheme:
256 // Read an entire dataset, but with partitioning information.
257 std::shared_ptr<arrow::Table> ScanPartitionedDataset(
258 const std::shared_ptr<fs::FileSystem>& filesystem,
259 const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
260 fs::FileSelector selector;
261 selector.base_dir = base_dir;
262 selector.recursive = true; // Make sure to search subdirectories
263 ds::FileSystemFactoryOptions options;
264 // We'll use Hive-style partitioning. We'll let Arrow Datasets infer the partition
265 // schema.
266 options.partitioning = ds::HivePartitioning::MakeFactory();
267 auto factory = ds::FileSystemDatasetFactory::Make(filesystem, selector, format, options)
268 .ValueOrDie();
269 auto dataset = factory->Finish().ValueOrDie();
270 // Print out the fragments
271 for (const auto& fragment : dataset->GetFragments().ValueOrDie()) {
272 std::cout << "Found fragment: " << (*fragment)->ToString() << std::endl;
273 std::cout << "Partition expression: "
274 << (*fragment)->partition_expression().ToString() << std::endl;
275 }
276 auto scan_builder = dataset->NewScan().ValueOrDie();
277 auto scanner = scan_builder->Finish().ValueOrDie();
278 return scanner->ToTable().ValueOrDie();
279 }
Although the partition fields are not included in the actual Parquet files, they will be added back to the resulting table when scanning this dataset:
$ ./debug/dataset_documentation_example file:///tmp parquet_hive partitioned
Found fragment: /tmp/parquet_dataset/part=a/part0.parquet
Partition expression: (part == "a")
Found fragment: /tmp/parquet_dataset/part=b/part1.parquet
Partition expression: (part == "b")
Read 20 rows
a: int64
-- field metadata --
PARQUET:field_id: '1'
b: double
-- field metadata --
PARQUET:field_id: '2'
c: int64
-- field metadata --
PARQUET:field_id: '3'
part: string
----
# snip...
We can now filter on the partition keys, which avoids loading files altogether if they do not match the filter:
281 // Read an entire dataset, but with partitioning information. Also, filter the dataset on
282 // the partition values.
283 std::shared_ptr<arrow::Table> FilterPartitionedDataset(
284 const std::shared_ptr<fs::FileSystem>& filesystem,
285 const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
286 fs::FileSelector selector;
287 selector.base_dir = base_dir;
288 selector.recursive = true;
289 ds::FileSystemFactoryOptions options;
290 options.partitioning = ds::HivePartitioning::MakeFactory();
291 auto factory = ds::FileSystemDatasetFactory::Make(filesystem, selector, format, options)
292 .ValueOrDie();
293 auto dataset = factory->Finish().ValueOrDie();
294 auto scan_builder = dataset->NewScan().ValueOrDie();
295 // Filter based on the partition values. This will mean that we won't even read the
296 // files whose partition expressions don't match the filter.
297 ABORT_ON_FAILURE(
298 scan_builder->Filter(ds::equal(ds::field_ref("part"), ds::literal("b"))));
299 auto scanner = scan_builder->Finish().ValueOrDie();
300 return scanner->ToTable().ValueOrDie();
301 }
Different partitioning schemes¶
The above example uses a Hive-like directory scheme, such as “/year=2009/month=11/day=15”. We specified this by passing the Hive partitioning factory. In this case, the types of the partition keys are inferred from the file paths.
It is also possible to directly construct the partitioning and explicitly define the schema of the partition keys. For example:
auto part = std::make_shared<ds::HivePartitioning>(arrow::schema({
arrow::field("year", arrow::int16()),
arrow::field("month", arrow::int8()),
arrow::field("day", arrow::int32())
}));
Arrow supports another partitioning scheme, “directory partitioning”, where the segments in the file path represent the values of the partition keys without including the name (the field names are implicit in the segment’s index). For example, given field names “year”, “month”, and “day”, one path might be “/2019/11/15”.
Since the names are not included in the file paths, these must be specified when constructing a directory partitioning:
auto part = ds::DirectoryPartitioning::MakeFactory({"year", "month", "day"});
Directory partitioning also supports providing a full schema rather than inferring types from file paths.
Reading from other data sources¶
Reading in-memory data¶
If you already have data in memory that you’d like to use with the Datasets API
(e.g. to filter/project data, or to write it out to a filesystem), you can wrap it
in an arrow::dataset::InMemoryDataset
:
auto table = arrow::Table::FromRecordBatches(...);
auto dataset = std::make_shared<arrow::dataset::InMemoryDataset>(std::move(table));
// Scan the dataset, filter, it, etc.
auto scanner_builder = dataset->NewScan();
In the example, we used the InMemoryDataset to write our example data to local disk which was used in the rest of the example:
106 // Set up a dataset by writing files with partitioning
107 std::string CreateExampleParquetHivePartitionedDataset(
108 const std::shared_ptr<fs::FileSystem>& filesystem, const std::string& root_path) {
109 auto base_path = root_path + "/parquet_dataset";
110 ABORT_ON_FAILURE(filesystem->CreateDir(base_path));
111 // Create an Arrow Table
112 auto schema = arrow::schema(
113 {arrow::field("a", arrow::int64()), arrow::field("b", arrow::int64()),
114 arrow::field("c", arrow::int64()), arrow::field("part", arrow::utf8())});
115 std::vector<std::shared_ptr<arrow::Array>> arrays(4);
116 arrow::NumericBuilder<arrow::Int64Type> builder;
117 ABORT_ON_FAILURE(builder.AppendValues({0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
118 ABORT_ON_FAILURE(builder.Finish(&arrays[0]));
119 builder.Reset();
120 ABORT_ON_FAILURE(builder.AppendValues({9, 8, 7, 6, 5, 4, 3, 2, 1, 0}));
121 ABORT_ON_FAILURE(builder.Finish(&arrays[1]));
122 builder.Reset();
123 ABORT_ON_FAILURE(builder.AppendValues({1, 2, 1, 2, 1, 2, 1, 2, 1, 2}));
124 ABORT_ON_FAILURE(builder.Finish(&arrays[2]));
125 arrow::StringBuilder string_builder;
126 ABORT_ON_FAILURE(
127 string_builder.AppendValues({"a", "a", "a", "a", "a", "b", "b", "b", "b", "b"}));
128 ABORT_ON_FAILURE(string_builder.Finish(&arrays[3]));
129 auto table = arrow::Table::Make(schema, arrays);
130 // Write it using Datasets
131 auto dataset = std::make_shared<ds::InMemoryDataset>(table);
132 auto scanner_builder = dataset->NewScan().ValueOrDie();
133 auto scanner = scanner_builder->Finish().ValueOrDie();
134
135 // The partition schema determines which fields are part of the partitioning.
136 auto partition_schema = arrow::schema({arrow::field("part", arrow::utf8())});
137 // We'll use Hive-style partitioning, which creates directories with "key=value" pairs.
138 auto partitioning = std::make_shared<ds::HivePartitioning>(partition_schema);
139 // We'll write Parquet files.
140 auto format = std::make_shared<ds::ParquetFileFormat>();
141 ds::FileSystemDatasetWriteOptions write_options;
142 write_options.file_write_options = format->DefaultWriteOptions();
143 write_options.filesystem = filesystem;
144 write_options.base_dir = base_path;
145 write_options.partitioning = partitioning;
146 write_options.basename_template = "part{i}.parquet";
147 ABORT_ON_FAILURE(ds::FileSystemDataset::Write(write_options, scanner));
148 return base_path;
149 }
Reading from cloud storage¶
In addition to local files, Arrow Datasets also support reading from cloud storage systems, such as Amazon S3, by passing a different filesystem.
See the filesystem docs for more details on the available filesystems.
Full Example¶
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17
18 // This example showcases various ways to work with Datasets. It's
19 // intended to be paired with the documentation.
20
21 #include <arrow/api.h>
22 #include <arrow/compute/cast.h>
23 #include <arrow/dataset/dataset.h>
24 #include <arrow/dataset/discovery.h>
25 #include <arrow/dataset/expression.h>
26 #include <arrow/dataset/file_base.h>
27 #include <arrow/dataset/file_ipc.h>
28 #include <arrow/dataset/file_parquet.h>
29 #include <arrow/dataset/scanner.h>
30 #include <arrow/filesystem/filesystem.h>
31 #include <arrow/ipc/writer.h>
32 #include <arrow/util/iterator.h>
33 #include <parquet/arrow/writer.h>
34
35 #include <iostream>
36 #include <vector>
37
38 namespace ds = arrow::dataset;
39 namespace fs = arrow::fs;
40
41 #define ABORT_ON_FAILURE(expr) \
42 do { \
43 arrow::Status status_ = (expr); \
44 if (!status_.ok()) { \
45 std::cerr << status_.message() << std::endl; \
46 abort(); \
47 } \
48 } while (0);
49
50 // Generate some data for the rest of this example.
51 std::shared_ptr<arrow::Table> CreateTable() {
52 auto schema =
53 arrow::schema({arrow::field("a", arrow::int64()), arrow::field("b", arrow::int64()),
54 arrow::field("c", arrow::int64())});
55 std::shared_ptr<arrow::Array> array_a;
56 std::shared_ptr<arrow::Array> array_b;
57 std::shared_ptr<arrow::Array> array_c;
58 arrow::NumericBuilder<arrow::Int64Type> builder;
59 ABORT_ON_FAILURE(builder.AppendValues({0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
60 ABORT_ON_FAILURE(builder.Finish(&array_a));
61 builder.Reset();
62 ABORT_ON_FAILURE(builder.AppendValues({9, 8, 7, 6, 5, 4, 3, 2, 1, 0}));
63 ABORT_ON_FAILURE(builder.Finish(&array_b));
64 builder.Reset();
65 ABORT_ON_FAILURE(builder.AppendValues({1, 2, 1, 2, 1, 2, 1, 2, 1, 2}));
66 ABORT_ON_FAILURE(builder.Finish(&array_c));
67 return arrow::Table::Make(schema, {array_a, array_b, array_c});
68 }
69
70 // Set up a dataset by writing two Parquet files.
71 std::string CreateExampleParquetDataset(const std::shared_ptr<fs::FileSystem>& filesystem,
72 const std::string& root_path) {
73 auto base_path = root_path + "/parquet_dataset";
74 ABORT_ON_FAILURE(filesystem->CreateDir(base_path));
75 // Create an Arrow Table
76 auto table = CreateTable();
77 // Write it into two Parquet files
78 auto output = filesystem->OpenOutputStream(base_path + "/data1.parquet").ValueOrDie();
79 ABORT_ON_FAILURE(parquet::arrow::WriteTable(
80 *table->Slice(0, 5), arrow::default_memory_pool(), output, /*chunk_size=*/2048));
81 output = filesystem->OpenOutputStream(base_path + "/data2.parquet").ValueOrDie();
82 ABORT_ON_FAILURE(parquet::arrow::WriteTable(
83 *table->Slice(5), arrow::default_memory_pool(), output, /*chunk_size=*/2048));
84 return base_path;
85 }
86
87 // Set up a dataset by writing two Feather files.
88 std::string CreateExampleFeatherDataset(const std::shared_ptr<fs::FileSystem>& filesystem,
89 const std::string& root_path) {
90 auto base_path = root_path + "/feather_dataset";
91 ABORT_ON_FAILURE(filesystem->CreateDir(base_path));
92 // Create an Arrow Table
93 auto table = CreateTable();
94 // Write it into two Feather files
95 auto output = filesystem->OpenOutputStream(base_path + "/data1.feather").ValueOrDie();
96 auto writer = arrow::ipc::MakeFileWriter(output.get(), table->schema()).ValueOrDie();
97 ABORT_ON_FAILURE(writer->WriteTable(*table->Slice(0, 5)));
98 ABORT_ON_FAILURE(writer->Close());
99 output = filesystem->OpenOutputStream(base_path + "/data2.feather").ValueOrDie();
100 writer = arrow::ipc::MakeFileWriter(output.get(), table->schema()).ValueOrDie();
101 ABORT_ON_FAILURE(writer->WriteTable(*table->Slice(5)));
102 ABORT_ON_FAILURE(writer->Close());
103 return base_path;
104 }
105
106 // Set up a dataset by writing files with partitioning
107 std::string CreateExampleParquetHivePartitionedDataset(
108 const std::shared_ptr<fs::FileSystem>& filesystem, const std::string& root_path) {
109 auto base_path = root_path + "/parquet_dataset";
110 ABORT_ON_FAILURE(filesystem->CreateDir(base_path));
111 // Create an Arrow Table
112 auto schema = arrow::schema(
113 {arrow::field("a", arrow::int64()), arrow::field("b", arrow::int64()),
114 arrow::field("c", arrow::int64()), arrow::field("part", arrow::utf8())});
115 std::vector<std::shared_ptr<arrow::Array>> arrays(4);
116 arrow::NumericBuilder<arrow::Int64Type> builder;
117 ABORT_ON_FAILURE(builder.AppendValues({0, 1, 2, 3, 4, 5, 6, 7, 8, 9}));
118 ABORT_ON_FAILURE(builder.Finish(&arrays[0]));
119 builder.Reset();
120 ABORT_ON_FAILURE(builder.AppendValues({9, 8, 7, 6, 5, 4, 3, 2, 1, 0}));
121 ABORT_ON_FAILURE(builder.Finish(&arrays[1]));
122 builder.Reset();
123 ABORT_ON_FAILURE(builder.AppendValues({1, 2, 1, 2, 1, 2, 1, 2, 1, 2}));
124 ABORT_ON_FAILURE(builder.Finish(&arrays[2]));
125 arrow::StringBuilder string_builder;
126 ABORT_ON_FAILURE(
127 string_builder.AppendValues({"a", "a", "a", "a", "a", "b", "b", "b", "b", "b"}));
128 ABORT_ON_FAILURE(string_builder.Finish(&arrays[3]));
129 auto table = arrow::Table::Make(schema, arrays);
130 // Write it using Datasets
131 auto dataset = std::make_shared<ds::InMemoryDataset>(table);
132 auto scanner_builder = dataset->NewScan().ValueOrDie();
133 auto scanner = scanner_builder->Finish().ValueOrDie();
134
135 // The partition schema determines which fields are part of the partitioning.
136 auto partition_schema = arrow::schema({arrow::field("part", arrow::utf8())});
137 // We'll use Hive-style partitioning, which creates directories with "key=value" pairs.
138 auto partitioning = std::make_shared<ds::HivePartitioning>(partition_schema);
139 // We'll write Parquet files.
140 auto format = std::make_shared<ds::ParquetFileFormat>();
141 ds::FileSystemDatasetWriteOptions write_options;
142 write_options.file_write_options = format->DefaultWriteOptions();
143 write_options.filesystem = filesystem;
144 write_options.base_dir = base_path;
145 write_options.partitioning = partitioning;
146 write_options.basename_template = "part{i}.parquet";
147 ABORT_ON_FAILURE(ds::FileSystemDataset::Write(write_options, scanner));
148 return base_path;
149 }
150
151 // Read the whole dataset with the given format, without partitioning.
152 std::shared_ptr<arrow::Table> ScanWholeDataset(
153 const std::shared_ptr<fs::FileSystem>& filesystem,
154 const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
155 // Create a dataset by scanning the filesystem for files
156 fs::FileSelector selector;
157 selector.base_dir = base_dir;
158 auto factory = ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
159 ds::FileSystemFactoryOptions())
160 .ValueOrDie();
161 auto dataset = factory->Finish().ValueOrDie();
162 // Print out the fragments
163 for (const auto& fragment : dataset->GetFragments().ValueOrDie()) {
164 std::cout << "Found fragment: " << (*fragment)->ToString() << std::endl;
165 }
166 // Read the entire dataset as a Table
167 auto scan_builder = dataset->NewScan().ValueOrDie();
168 auto scanner = scan_builder->Finish().ValueOrDie();
169 return scanner->ToTable().ValueOrDie();
170 }
171
172 // Read a dataset, but select only column "b" and only rows where b < 4.
173 //
174 // This is useful when you only want a few columns from a dataset. Where possible,
175 // Datasets will push down the column selection such that less work is done.
176 std::shared_ptr<arrow::Table> FilterAndSelectDataset(
177 const std::shared_ptr<fs::FileSystem>& filesystem,
178 const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
179 fs::FileSelector selector;
180 selector.base_dir = base_dir;
181 auto factory = ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
182 ds::FileSystemFactoryOptions())
183 .ValueOrDie();
184 auto dataset = factory->Finish().ValueOrDie();
185 // Read specified columns with a row filter
186 auto scan_builder = dataset->NewScan().ValueOrDie();
187 ABORT_ON_FAILURE(scan_builder->Project({"b"}));
188 ABORT_ON_FAILURE(scan_builder->Filter(ds::less(ds::field_ref("b"), ds::literal(4))));
189 auto scanner = scan_builder->Finish().ValueOrDie();
190 return scanner->ToTable().ValueOrDie();
191 }
192
193 // Read a dataset, but with column projection.
194 //
195 // This is useful to derive new columns from existing data. For example, here we
196 // demonstrate casting a column to a different type, and turning a numeric column into a
197 // boolean column based on a predicate. You could also rename columns or perform
198 // computations involving multiple columns.
199 std::shared_ptr<arrow::Table> ProjectDataset(
200 const std::shared_ptr<fs::FileSystem>& filesystem,
201 const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
202 fs::FileSelector selector;
203 selector.base_dir = base_dir;
204 auto factory = ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
205 ds::FileSystemFactoryOptions())
206 .ValueOrDie();
207 auto dataset = factory->Finish().ValueOrDie();
208 // Read specified columns with a row filter
209 auto scan_builder = dataset->NewScan().ValueOrDie();
210 ABORT_ON_FAILURE(scan_builder->Project(
211 {
212 // Leave column "a" as-is.
213 ds::field_ref("a"),
214 // Cast column "b" to float32.
215 ds::call("cast", {ds::field_ref("b")},
216 arrow::compute::CastOptions::Safe(arrow::float32())),
217 // Derive a boolean column from "c".
218 ds::equal(ds::field_ref("c"), ds::literal(1)),
219 },
220 {"a_renamed", "b_as_float32", "c_1"}));
221 auto scanner = scan_builder->Finish().ValueOrDie();
222 return scanner->ToTable().ValueOrDie();
223 }
224
225 // Read a dataset, but with column projection.
226 //
227 // This time, we read all original columns plus one derived column. This simply combines
228 // the previous two examples: selecting a subset of columns by name, and deriving new
229 // columns with an expression.
230 std::shared_ptr<arrow::Table> SelectAndProjectDataset(
231 const std::shared_ptr<fs::FileSystem>& filesystem,
232 const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
233 fs::FileSelector selector;
234 selector.base_dir = base_dir;
235 auto factory = ds::FileSystemDatasetFactory::Make(filesystem, selector, format,
236 ds::FileSystemFactoryOptions())
237 .ValueOrDie();
238 auto dataset = factory->Finish().ValueOrDie();
239 // Read specified columns with a row filter
240 auto scan_builder = dataset->NewScan().ValueOrDie();
241 std::vector<std::string> names;
242 std::vector<ds::Expression> exprs;
243 // Read all the original columns.
244 for (const auto& field : dataset->schema()->fields()) {
245 names.push_back(field->name());
246 exprs.push_back(ds::field_ref(field->name()));
247 }
248 // Also derive a new column.
249 names.push_back("b_large");
250 exprs.push_back(ds::greater(ds::field_ref("b"), ds::literal(1)));
251 ABORT_ON_FAILURE(scan_builder->Project(exprs, names));
252 auto scanner = scan_builder->Finish().ValueOrDie();
253 return scanner->ToTable().ValueOrDie();
254 }
255
256 // Read an entire dataset, but with partitioning information.
257 std::shared_ptr<arrow::Table> ScanPartitionedDataset(
258 const std::shared_ptr<fs::FileSystem>& filesystem,
259 const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
260 fs::FileSelector selector;
261 selector.base_dir = base_dir;
262 selector.recursive = true; // Make sure to search subdirectories
263 ds::FileSystemFactoryOptions options;
264 // We'll use Hive-style partitioning. We'll let Arrow Datasets infer the partition
265 // schema.
266 options.partitioning = ds::HivePartitioning::MakeFactory();
267 auto factory = ds::FileSystemDatasetFactory::Make(filesystem, selector, format, options)
268 .ValueOrDie();
269 auto dataset = factory->Finish().ValueOrDie();
270 // Print out the fragments
271 for (const auto& fragment : dataset->GetFragments().ValueOrDie()) {
272 std::cout << "Found fragment: " << (*fragment)->ToString() << std::endl;
273 std::cout << "Partition expression: "
274 << (*fragment)->partition_expression().ToString() << std::endl;
275 }
276 auto scan_builder = dataset->NewScan().ValueOrDie();
277 auto scanner = scan_builder->Finish().ValueOrDie();
278 return scanner->ToTable().ValueOrDie();
279 }
280
281 // Read an entire dataset, but with partitioning information. Also, filter the dataset on
282 // the partition values.
283 std::shared_ptr<arrow::Table> FilterPartitionedDataset(
284 const std::shared_ptr<fs::FileSystem>& filesystem,
285 const std::shared_ptr<ds::FileFormat>& format, const std::string& base_dir) {
286 fs::FileSelector selector;
287 selector.base_dir = base_dir;
288 selector.recursive = true;
289 ds::FileSystemFactoryOptions options;
290 options.partitioning = ds::HivePartitioning::MakeFactory();
291 auto factory = ds::FileSystemDatasetFactory::Make(filesystem, selector, format, options)
292 .ValueOrDie();
293 auto dataset = factory->Finish().ValueOrDie();
294 auto scan_builder = dataset->NewScan().ValueOrDie();
295 // Filter based on the partition values. This will mean that we won't even read the
296 // files whose partition expressions don't match the filter.
297 ABORT_ON_FAILURE(
298 scan_builder->Filter(ds::equal(ds::field_ref("part"), ds::literal("b"))));
299 auto scanner = scan_builder->Finish().ValueOrDie();
300 return scanner->ToTable().ValueOrDie();
301 }
302
303 int main(int argc, char** argv) {
304 if (argc < 3) {
305 // Fake success for CI purposes.
306 return EXIT_SUCCESS;
307 }
308
309 std::string uri = argv[1];
310 std::string format_name = argv[2];
311 std::string mode = argc > 3 ? argv[3] : "no_filter";
312 std::string root_path;
313 auto fs = fs::FileSystemFromUri(uri, &root_path).ValueOrDie();
314
315 std::string base_path;
316 std::shared_ptr<ds::FileFormat> format;
317 if (format_name == "feather") {
318 format = std::make_shared<ds::IpcFileFormat>();
319 base_path = CreateExampleFeatherDataset(fs, root_path);
320 } else if (format_name == "parquet") {
321 format = std::make_shared<ds::ParquetFileFormat>();
322 base_path = CreateExampleParquetDataset(fs, root_path);
323 } else if (format_name == "parquet_hive") {
324 format = std::make_shared<ds::ParquetFileFormat>();
325 base_path = CreateExampleParquetHivePartitionedDataset(fs, root_path);
326 } else {
327 std::cerr << "Unknown format: " << format_name << std::endl;
328 std::cerr << "Supported formats: feather, parquet, parquet_hive" << std::endl;
329 return EXIT_FAILURE;
330 }
331
332 std::shared_ptr<arrow::Table> table;
333 if (mode == "no_filter") {
334 table = ScanWholeDataset(fs, format, base_path);
335 } else if (mode == "filter") {
336 table = FilterAndSelectDataset(fs, format, base_path);
337 } else if (mode == "project") {
338 table = ProjectDataset(fs, format, base_path);
339 } else if (mode == "select_project") {
340 table = SelectAndProjectDataset(fs, format, base_path);
341 } else if (mode == "partitioned") {
342 table = ScanPartitionedDataset(fs, format, base_path);
343 } else if (mode == "filter_partitioned") {
344 table = FilterPartitionedDataset(fs, format, base_path);
345 } else {
346 std::cerr << "Unknown mode: " << mode << std::endl;
347 std::cerr
348 << "Supported modes: no_filter, filter, project, select_project, partitioned"
349 << std::endl;
350 return EXIT_FAILURE;
351 }
352 std::cout << "Read " << table->num_rows() << " rows" << std::endl;
353 std::cout << table->ToString() << std::endl;
354 return EXIT_SUCCESS;
355 }