Integrating PyArrow with Java

Arrow supports exchanging data within the same process through the The Arrow C data interface.

This can be used to exchange data between Python and Java functions and methods so that the two languages can interact without any cost of marshaling and unmarshaling data.

Note

The article takes for granted that you have a Python environment with pyarrow correctly installed and a Java environment with arrow library correctly installed. The Arrow Java version must have been compiled with mvn -Parrow-c-data to ensure CData exchange support is enabled. See Python Install Instructions and Java Documentation for further details.

Invoking Java methods from Python

Suppose we have a simple Java class providing a number as its output:

public class Simple {
    public static int getNumber() {
        return 4;
    }
}

We would save such class in the Simple.java file and proceed with compiling it to Simple.class using javac Simple.java.

Once the Simple.class file is created we can use the class from Python using the JPype library which enables a Java runtime within the Python interpreter.

jpype1 can be installed using pip like most Python libraries

$ pip install jpype1

The most basic thing we can do with our Simple class is to use the Simple.getNumber method from Python and see if it will return the result.

To do so, we can create a simple.py file which uses jpype to import the Simple class from Simple.class file and invoke the Simple.getNumber method:

import jpype
from jpype.types import *

jpype.startJVM(classpath=["./"])

Simple = JClass('Simple')

print(Simple.getNumber())

Running the simple.py file will show how our Python code is able to access the Java method and print the expected result:

$ python simple.py
4

Java to Python using pyarrow.jvm

PyArrow provides a pyarrow.jvm module that makes easier to interact with Java classes and convert the Java objects to actual Python objects.

To showcase pyarrow.jvm we could create a more complex class, named FillTen.java

import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.BigIntVector;


public class FillTen {
    static RootAllocator allocator = new RootAllocator();

    public static BigIntVector createArray() {
        BigIntVector intVector = new BigIntVector("ints", allocator);
        intVector.allocateNew(10);
        intVector.setValueCount(10);
        FillTen.fillVector(intVector);
        return intVector;
    }

    private static void fillVector(BigIntVector iv) {
        iv.setSafe(0, 1);
        iv.setSafe(1, 2);
        iv.setSafe(2, 3);
        iv.setSafe(3, 4);
        iv.setSafe(4, 5);
        iv.setSafe(5, 6);
        iv.setSafe(6, 7);
        iv.setSafe(7, 8);
        iv.setSafe(8, 9);
        iv.setSafe(9, 10);
    }
}

This class provides a public createArray method that anyone can invoke to get back an array containing numbers from 1 to 10.

Given that this class now has a dependency on a bunch of packages, compiling it with javac is not enough anymore. We need to create a dedicated pom.xml file where we can collect the dependencies:

<project>
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.apache.arrow.py2java</groupId>
    <artifactId>FillTen</artifactId>
    <version>1</version>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

    <dependencies>
        <dependency>
        <groupId>org.apache.arrow</groupId>
        <artifactId>arrow-memory</artifactId>
        <version>8.0.0</version>
        <type>pom</type>
        </dependency>
        <dependency>
        <groupId>org.apache.arrow</groupId>
        <artifactId>arrow-memory-netty</artifactId>
        <version>8.0.0</version>
        <type>jar</type>
        </dependency>
        <dependency>
        <groupId>org.apache.arrow</groupId>
        <artifactId>arrow-vector</artifactId>
        <version>8.0.0</version>
        <type>pom</type>
        </dependency>
        <dependency>
        <groupId>org.apache.arrow</groupId>
        <artifactId>arrow-c-data</artifactId>
        <version>8.0.0</version>
        <type>jar</type>
        </dependency>
    </dependencies>
</project>

Once the FillTen.java file with the class is created as src/main/java/FillTen.java we can use maven to compile the project with mvn package and get it available in the target directory.

$ mvn package
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------< org.apache.arrow.py2java:FillTen >------------------
[INFO] Building FillTen 1
[INFO] --------------------------------[ jar ]---------------------------------
[INFO]
[INFO] --- maven-compiler-plugin:3.1:compile (default-compile) @ FillTen ---
[INFO] Changes detected - recompiling the module!
[INFO] Compiling 1 source file to /experiments/java2py/target/classes
[INFO]
[INFO] --- maven-jar-plugin:2.4:jar (default-jar) @ FillTen ---
[INFO] Building jar: /experiments/java2py/target/FillTen-1.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------

Now that we have the package built, we can make it available to Python. To do so, we need to make sure that not only the package itself is available, but that also its dependencies are.

We can use maven to collect all dependencies and make them available in a single place (the dependencies directory) so that we can more easily load them from Python:

$ mvn org.apache.maven.plugins:maven-dependency-plugin:2.7:copy-dependencies -DoutputDirectory=dependencies
[INFO] Scanning for projects...
[INFO]
[INFO] ------------------< org.apache.arrow.py2java:FillTen >------------------
[INFO] Building FillTen 1
[INFO] --------------------------------[ jar ]---------------------------------
[INFO]
[INFO] --- maven-dependency-plugin:2.7:copy-dependencies (default-cli) @ FillTen ---
[INFO] Copying jsr305-3.0.2.jar to /experiments/java2py/dependencies/jsr305-3.0.2.jar
[INFO] Copying netty-common-4.1.72.Final.jar to /experiments/java2py/dependencies/netty-common-4.1.72.Final.jar
[INFO] Copying arrow-memory-core-8.0.0-SNAPSHOT.jar to /experiments/java2py/dependencies/arrow-memory-core-8.0.0-SNAPSHOT.jar
[INFO] Copying arrow-vector-8.0.0-SNAPSHOT.jar to /experiments/java2py/dependencies/arrow-vector-8.0.0-SNAPSHOT.jar
[INFO] Copying arrow-c-data-8.0.0-SNAPSHOT.jar to /experiments/java2py/dependencies/arrow-c-data-8.0.0-SNAPSHOT.jar
[INFO] Copying arrow-vector-8.0.0-SNAPSHOT.pom to /experiments/java2py/dependencies/arrow-vector-8.0.0-SNAPSHOT.pom
[INFO] Copying jackson-core-2.11.4.jar to /experiments/java2py/dependencies/jackson-core-2.11.4.jar
[INFO] Copying jackson-annotations-2.11.4.jar to /experiments/java2py/dependencies/jackson-annotations-2.11.4.jar
[INFO] Copying slf4j-api-1.7.25.jar to /experiments/java2py/dependencies/slf4j-api-1.7.25.jar
[INFO] Copying arrow-memory-netty-8.0.0-SNAPSHOT.jar to /experiments/java2py/dependencies/arrow-memory-netty-8.0.0-SNAPSHOT.jar
[INFO] Copying arrow-format-8.0.0-SNAPSHOT.jar to /experiments/java2py/dependencies/arrow-format-8.0.0-SNAPSHOT.jar
[INFO] Copying flatbuffers-java-1.12.0.jar to /experiments/java2py/dependencies/flatbuffers-java-1.12.0.jar
[INFO] Copying arrow-memory-8.0.0-SNAPSHOT.pom to /experiments/java2py/dependencies/arrow-memory-8.0.0-SNAPSHOT.pom
[INFO] Copying netty-buffer-4.1.72.Final.jar to /experiments/java2py/dependencies/netty-buffer-4.1.72.Final.jar
[INFO] Copying jackson-databind-2.11.4.jar to /experiments/java2py/dependencies/jackson-databind-2.11.4.jar
[INFO] Copying commons-codec-1.10.jar to /experiments/java2py/dependencies/commons-codec-1.10.jar
[INFO] ------------------------------------------------------------------------
[INFO] BUILD SUCCESS
[INFO] ------------------------------------------------------------------------

Note

Instead of manually collecting dependencies, you could also rely on the maven-assembly-plugin to build a single jar with all dependencies.

Once our package and all its depdendencies are available, we can invoke it from fillten_pyarrowjvm.py script that will import the FillTen class and print out the result of invoking FillTen.createArray

import jpype
import jpype.imports
from jpype.types import *

# Start a JVM making available all dependencies we collected
# and our class from target/FillTen-1.jar
jpype.startJVM(classpath=["./dependencies/*", "./target/*"])

FillTen = JClass('FillTen')

array = FillTen.createArray()
print("ARRAY", type(array), array)

# Convert the proxied BigIntVector to an actual pyarrow array
import pyarrow.jvm
pyarray = pyarrow.jvm.array(array)
print("ARRAY", type(pyarray), pyarray)
del pyarray

Running the python script will lead to two lines getting printed:

ARRAY <java class 'org.apache.arrow.vector.BigIntVector'> [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
ARRAY <class 'pyarrow.lib.Int64Array'> [
    1,
    2,
    3,
    4,
    5,
    6,
    7,
    8,
    9,
    10
]

The first line is the raw result of invoking the FillTen.createArray method. The resulting object is a proxy to the actual Java object, so it’s not really a pyarrow Array, it will lack most of its capabilities and methods. That’s why we subsequently use pyarrow.jvm.array to convert it to an actual pyarrow array. That allows us to treat it like any other pyarrow array. The result is the second line in the output where the array is correctly reported as being of type pyarrow.lib.Int64Array and is printed using the pyarrow style.

Note

At the moment the pyarrow.jvm module is fairly limited in capabilities, nested types like structs are not supported and it only works on a JVM running within the same process like JPype.

Java to Python communication using the C Data Interface

The C Data Interface is a protocol implemented in Arrow to exchange data within different environments without the cost of marshaling and copying data.

This allows to expose data coming from Python or Java to functions that are implemented in the other language.

Note

In the future the pyarrow.jvm will be implemented to leverage the C Data interface, at the moment is instead specifically written for JPype

To showcase how C Data works, we are going to tweak a bit both our FillTen Java class and our fillten.py Python script. Given a PyArrow array, we are going to expose a function in Java that sets its content to by the numbers from 1 to 10.

Using C Data interface in pyarrow at the moment requires installing cffi explicitly, like most Python distributions it can be installed with

$ pip install cffi

The first thing we would have to do is to tweak the Python script so that it sends to Java the exported references to the Array and its Schema according to the C Data interface:

import jpype
import jpype.imports
from jpype.types import *

# Init the JVM and make FillTen class available to Python.
jpype.startJVM(classpath=["./dependencies/*", "./target/*"])
FillTen = JClass('FillTen')

# Create a Python array of 10 elements
import pyarrow as pa
array = pa.array([0]*10)

from pyarrow.cffi import ffi as arrow_c

# Export the Python array through C Data
c_array = arrow_c.new("struct ArrowArray*")
c_array_ptr = int(arrow_c.cast("uintptr_t", c_array))
array._export_to_c(c_array_ptr)

# Export the Schema of the Array through C Data
c_schema = arrow_c.new("struct ArrowSchema*")
c_schema_ptr = int(arrow_c.cast("uintptr_t", c_schema))
array.type._export_to_c(c_schema_ptr)

# Send Array and its Schema to the Java function
# that will populate the array with numbers from 1 to 10
FillTen.fillCArray(c_array_ptr, c_schema_ptr)

# See how the content of our Python array was changed from Java
# while it remained of the Python type.
print("ARRAY", type(array), array)

Note

Changing content of arrays is not a safe operation, it was done for the purpose of creating this example, and it mostly works only because the array hasn’t changed size, type or nulls.

In the FillTen Java class, we already have the fillVector method, but that method is private and even if we made it public it would only accept a BigIntVector object and not the C Data array and schema references.

So we have to expand our FillTen class adding a fillCArray method that is able to perform the work of fillVector but on the C Data exchanged entities instead of the BigIntVector one:

import org.apache.arrow.c.ArrowArray;
import org.apache.arrow.c.ArrowSchema;
import org.apache.arrow.c.Data;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.BigIntVector;


public class FillTen {
    static RootAllocator allocator = new RootAllocator();

    public static void fillCArray(long c_array_ptr, long c_schema_ptr) {
        ArrowArray arrow_array = ArrowArray.wrap(c_array_ptr);
        ArrowSchema arrow_schema = ArrowSchema.wrap(c_schema_ptr);

        FieldVector v = Data.importVector(allocator, arrow_array, arrow_schema, null);
        FillTen.fillVector((BigIntVector)v);
    }

    private static void fillVector(BigIntVector iv) {
        iv.setSafe(0, 1);
        iv.setSafe(1, 2);
        iv.setSafe(2, 3);
        iv.setSafe(3, 4);
        iv.setSafe(4, 5);
        iv.setSafe(5, 6);
        iv.setSafe(6, 7);
        iv.setSafe(7, 8);
        iv.setSafe(8, 9);
        iv.setSafe(9, 10);
    }
}

The goal of the fillCArray method is to get the Array and Schema received in C Data exchange format and turn them back to an object of type FieldVector so that Arrow Java knows how to deal with it.

If we run again mvn package, update the maven dependencies and then our Python script, we should be able to see how the values printed by the Python script have been properly changed by the Java code:

$ mvn package
$ mvn org.apache.maven.plugins:maven-dependency-plugin:2.7:copy-dependencies -DoutputDirectory=dependencies
$ python fillten.py
ARRAY <class 'pyarrow.lib.Int64Array'> [
    1,
    2,
    3,
    4,
    5,
    6,
    7,
    8,
    9,
    10
]

We can also use the C Stream Interface to exchange pyarrow.RecordBatchReader between Java and Python. We’ll use this Java class as a demo, which lets you read an Arrow IPC file via Java’s implementation, or write data to a JSON file:

import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;

import org.apache.arrow.c.ArrowArrayStream;
import org.apache.arrow.c.Data;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.ipc.ArrowFileReader;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.arrow.vector.ipc.JsonFileWriter;

public class PythonInteropDemo implements AutoCloseable {
  private final BufferAllocator allocator;

  public PythonInteropDemo() {
    this.allocator = new RootAllocator();
  }

  public void exportStream(String path, long cStreamPointer) throws Exception {
    try (final ArrowArrayStream stream = ArrowArrayStream.wrap(cStreamPointer)) {
      ArrowFileReader reader = new ArrowFileReader(Files.newByteChannel(Paths.get(path)), allocator);
      Data.exportArrayStream(allocator, reader, stream);
    }
  }

  public void importStream(String path, long cStreamPointer) throws Exception {
    try (final ArrowArrayStream stream = ArrowArrayStream.wrap(cStreamPointer);
         final ArrowReader input = Data.importArrayStream(allocator, stream);
         JsonFileWriter writer = new JsonFileWriter(new File(path))) {
      writer.start(input.getVectorSchemaRoot().getSchema(), input);
      while (input.loadNextBatch()) {
        writer.write(input.getVectorSchemaRoot());
      }
    }
  }

  @Override
  public void close() throws Exception {
    allocator.close();
  }
}

On the Python side, we’ll use JPype as before, except this time we’ll send RecordBatchReaders back and forth:

import tempfile

import jpype
import jpype.imports
from jpype.types import *

# Init the JVM and make demo class available to Python.
jpype.startJVM(classpath=["./dependencies/*", "./target/*"])
PythonInteropDemo = JClass("PythonInteropDemo")
demo = PythonInteropDemo()

# Create a Python record batch reader
import pyarrow as pa
schema = pa.schema([
    ("ints", pa.int64()),
    ("strs", pa.string())
])
batches = [
    pa.record_batch([
        [0, 2, 4, 8],
        ["a", "b", "c", None],
    ], schema=schema),
    pa.record_batch([
        [None, 32, 64, None],
        ["e", None, None, "h"],
    ], schema=schema),
]
reader = pa.RecordBatchReader.from_batches(schema, batches)

from pyarrow.cffi import ffi as arrow_c

# Export the Python reader through C Data
c_stream = arrow_c.new("struct ArrowArrayStream*")
c_stream_ptr = int(arrow_c.cast("uintptr_t", c_stream))
reader._export_to_c(c_stream_ptr)

# Send reader to the Java function that writes a JSON file
with tempfile.NamedTemporaryFile() as temp:
    demo.importStream(temp.name, c_stream_ptr)

    # Read the JSON file back
    with open(temp.name) as source:
        print("JSON file written by Java:")
        print(source.read())


# Write an Arrow IPC file for Java to read
with tempfile.NamedTemporaryFile() as temp:
    with pa.ipc.new_file(temp.name, schema) as sink:
        for batch in batches:
            sink.write_batch(batch)

    demo.exportStream(temp.name, c_stream_ptr)
    with pa.RecordBatchReader._import_from_c(c_stream_ptr) as source:
        print("IPC file read by Java:")
        print(source.read_all())
$ mvn package
$ mvn org.apache.maven.plugins:maven-dependency-plugin:2.7:copy-dependencies -DoutputDirectory=dependencies
$ python demo.py
JSON file written by Java:
{"schema":{"fields":[{"name":"ints","nullable":true,"type":{"name":"int","bitWidth":64,"isSigned":true},"children":[]},{"name":"strs","nullable":true,"type":{"name":"utf8"},"children":[]}]},"batches":[{"count":4,"columns":[{"name":"ints","count":4,"VALIDITY":[1,1,1,1],"DATA":["0","2","4","8"]},{"name":"strs","count":4,"VALIDITY":[1,1,1,0],"OFFSET":[0,1,2,3,3],"DATA":["a","b","c",""]}]},{"count":4,"columns":[{"name":"ints","count":4,"VALIDITY":[0,1,1,0],"DATA":["0","32","64","0"]},{"name":"strs","count":4,"VALIDITY":[1,0,0,1],"OFFSET":[0,1,1,1,2],"DATA":["e","","","h"]}]}]}
IPC file read by Java:
pyarrow.Table
ints: int64
strs: string
----
ints: [[0,2,4,8],[null,32,64,null]]
strs: [["a","b","c",null],["e",null,null,"h"]]