Dataset

Warning

Experimental: The Java module dataset is currently under early development. API might be changed in each release of Apache Arrow until it gets mature.

Dataset is an universal layer in Apache Arrow for querying data in different formats or in different partitioning strategies. Usually the data to be queried is supposed to be located from a traditional file system, however Arrow Dataset is not designed only for querying files but can be extended to serve all possible data sources such as from inter-process communication or from other network locations, etc.

Getting Started

Below shows a simplest example of using Dataset to query a Parquet file in Java:

// read data from file /opt/example.parquet
String uri = "file:/opt/example.parquet";
BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
DatasetFactory factory = new FileSystemDatasetFactory(allocator,
    NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri);
Dataset dataset = factory.finish();
Scanner scanner = dataset.newScan(new ScanOptions(100)));
List<ArrowRecordBatch> batches = StreamSupport.stream(
    scanner.scan().spliterator(), false)
        .flatMap(t -> stream(t.execute()))
        .collect(Collectors.toList());

// do something with read record batches, for example:
analyzeArrowData(batches);

// finished the analysis of the data, close all resources:
AutoCloseables.close(batches);
AutoCloseables.close(factory, dataset, scanner);

Note

ArrowRecordBatch is a low-level composite Arrow data exchange format that doesn’t provide API to read typed data from it directly. It’s recommended to use utilities VectorLoader to load it into a schema aware container VectorSchemaRoot by which user could be able to access decoded data conveniently in Java.

See also

Load record batches with VectorSchemaRoot.

Schema

Schema of the data to be queried can be inspected via method DatasetFactory#inspect() before actually reading it. For example:

// read data from local file /opt/example.parquet
String uri = "file:/opt/example.parquet";
BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
DatasetFactory factory = new FileSystemDatasetFactory(allocator,
    NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri);

// inspect schema
Schema schema = factory.inspect();

For some of the data format that is compatible with a user-defined schema, user can use method DatasetFactory#inspect(Schema schema) to create the dataset:

Schema schema = createUserSchema()
Dataset dataset = factory.finish(schema);

Otherwise when the non-parameter method DatasetFactory#inspect() is called, schema will be inferred automatically from data source. The same as the result of DatasetFactory#inspect().

Also, if projector is specified during scanning (see next section Projection), the actual schema of output data can be got within method Scanner::schema():

Scanner scanner = dataset.newScan(
    new ScanOptions(100, Optional.of(new String[] {"id", "name"})));
Schema projectedSchema = scanner.schema();

Projection

User can specify projections in ScanOptions. For FileSystemDataset, only column projection is allowed for now, which means, only column names in the projection list will be accepted. For example:

String[] projection = new String[] {"id", "name"};
ScanOptions options = new ScanOptions(100, Optional.of(projection));

If no projection is needed, leave the optional projection argument absent in ScanOptions:

ScanOptions options = new ScanOptions(100, Optional.empty());

Or use shortcut construtor:

ScanOptions options = new ScanOptions(100);

Then all columns will be emitted during scanning.

Read Data from HDFS

FileSystemDataset supports reading data from non-local file systems. HDFS support is included in the official Apache Arrow Java package releases and can be used directly without re-building the source code.

To access HDFS data using Dataset API, pass a general HDFS URI to FilesSystemDatasetFactory:

String uri = "hdfs://{hdfs_host}:{port}/data/example.parquet";
BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
DatasetFactory factory = new FileSystemDatasetFactory(allocator,
    NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri);

Native Memory Management

To gain better performance and reduce code complexity, Java FileSystemDataset internally relys on C++ arrow::dataset::FileSystemDataset via JNI. As a result, all Arrow data read from FileSystemDataset is supposed to be allocated off the JVM heap. To manage this part of memory, an utility class NativeMemoryPool is provided to users.

As a basic example, by using a listenable NativeMemoryPool, user can pass a listener hooking on C++ buffer allocation/deallocation:

AtomicLong reserved = new AtomicLong(0L);
ReservationListener listener = new ReservationListener() {
  @Override
  public void reserve(long size) {
    reserved.getAndAdd(size);
  }

  @Override
  public void unreserve(long size) {
    reserved.getAndAdd(-size);
  }
};
NativeMemoryPool pool = NativeMemoryPool.createListenable(listener);
FileSystemDatasetFactory factory = new FileSystemDatasetFactory(allocator,
    pool, FileFormat.PARQUET, uri);

Also, it’s a very common case to reserve the same amount of JVM direct memory for the data read from datasets. For this use a built-in utility class DirectReservationListener is provided:

NativeMemoryPool pool = NativeMemoryPool.createListenable(
    DirectReservationListener.instance());

This way, once the allocated byte count of Arrow buffers reaches the limit of JVM direct memory, OutOfMemoryError: Direct buffer memory will be thrown during scanning.

Note

The default instance NativeMemoryPool.getDefaultMemoryPool() does nothing on buffer allocation/deallocation. It’s OK to use it in the case of POC or testing, but for production use in complex environment, it’s recommended to manage memory by using a listenable memory pool.

Note

The BufferAllocator instance passed to FileSystemDatasetFactory’s constructor is also aware of the overall memory usage of the produced dataset instances. Once the Java buffers are created the passed allocator will become their parent allocator.

Native Object Resource Management

As another result of relying on JNI, all components related to FileSystemDataset should be closed manually to release the corresponding native objects after using. For example:

DatasetFactory factory = new FileSystemDatasetFactory(allocator,
    NativeMemoryPool.getDefault(), FileFormat.PARQUET, uri);
Dataset dataset = factory.finish();
Scanner scanner = dataset.newScan(new ScanOptions(100));

// do something

AutoCloseables.close(factory, dataset, scanner);

If user forgets to close them then native object leakage might be caused.