Reading/Writing IPC formats#
Arrow defines two types of binary formats for serializing record batches:
Streaming format: for sending an arbitrary number of record batches. The format must be processed from start to end, and does not support random access
File or Random Access format: for serializing a fixed number of record batches. It supports random access, and thus is very useful when used with memory maps
Writing and Reading Streaming Format#
First, let’s populate a VectorSchemaRoot
with a small batch of records
BitVector bitVector = new BitVector("boolean", allocator);
VarCharVector varCharVector = new VarCharVector("varchar", allocator);
for (int i = 0; i < 10; i++) {
bitVector.setSafe(i, i % 2 == 0 ? 0 : 1);
varCharVector.setSafe(i, ("test" + i).getBytes(StandardCharsets.UTF_8));
}
bitVector.setValueCount(10);
varCharVector.setValueCount(10);
List<Field> fields = Arrays.asList(bitVector.getField(), varCharVector.getField());
List<FieldVector> vectors = Arrays.asList(bitVector, varCharVector);
VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors);
Now, we can begin writing a stream containing some number of these batches. For this we use ArrowStreamWriter
(DictionaryProvider used for any vectors that are dictionary encoded is optional and can be null)):
ByteArrayOutputStream out = new ByteArrayOutputStream();
ArrowStreamWriter writer = new ArrowStreamWriter(root, /*DictionaryProvider=*/null, Channels.newChannel(out));
Here we used an in-memory stream, but this could have been a socket or some other IO stream. Then we can do
writer.start();
// write the first batch
writer.writeBatch();
// write another four batches.
for (int i = 0; i < 4; i++) {
// populate VectorSchemaRoot data and write the second batch
BitVector childVector1 = (BitVector)root.getVector(0);
VarCharVector childVector2 = (VarCharVector)root.getVector(1);
childVector1.reset();
childVector2.reset();
... do some populate work here, could be different for each batch
writer.writeBatch();
}
// end
writer.end();
Note since the VectorSchemaRoot
in writer is a container that can hold batches, batches flow through
VectorSchemaRoot
as part of a pipeline, so we need to populate data before writeBatch so that later batches
could overwrite previous ones.
Now the ByteArrayOutputStream
contains the complete stream which contains 5 record batches.
We can read such a stream with ArrowStreamReader
, note that VectorSchemaRoot
within
reader will be loaded with new values on every call to loadNextBatch()
try (ArrowStreamReader reader = new ArrowStreamReader(new ByteArrayInputStream(out.toByteArray()), allocator)) {
Schema schema = reader.getVectorSchemaRoot().getSchema();
for (int i = 0; i < 5; i++) {
// This will be loaded with new values on every call to loadNextBatch
VectorSchemaRoot readBatch = reader.getVectorSchemaRoot();
reader.loadNextBatch();
... do something with readBatch
}
}
Here we also give a simple example with dictionary encoded vectors
DictionaryProvider.MapDictionaryProvider provider = new DictionaryProvider.MapDictionaryProvider();
// create dictionary and provider
final VarCharVector dictVector = new VarCharVector("dict", allocator);
dictVector.allocateNewSafe();
dictVector.setSafe(0, "aa".getBytes());
dictVector.setSafe(1, "bb".getBytes());
dictVector.setSafe(2, "cc".getBytes());
dictVector.setValueCount(3);
Dictionary dictionary =
new Dictionary(dictVector, new DictionaryEncoding(1L, false, /*indexType=*/null));
provider.put(dictionary);
// create vector and encode it
final VarCharVector vector = new VarCharVector("vector", allocator);
vector.allocateNewSafe();
vector.setSafe(0, "bb".getBytes());
vector.setSafe(1, "bb".getBytes());
vector.setSafe(2, "cc".getBytes());
vector.setSafe(3, "aa".getBytes());
vector.setValueCount(4);
// get the encoded vector
IntVector encodedVector = (IntVector) DictionaryEncoder.encode(vector, dictionary);
// create VectorSchemaRoot
List<Field> fields = Arrays.asList(encodedVector.getField());
List<FieldVector> vectors = Arrays.asList(encodedVector);
VectorSchemaRoot root = new VectorSchemaRoot(fields, vectors);
// write data
ByteArrayOutputStream out = new ByteArrayOutputStream();
ArrowStreamWriter writer = new ArrowStreamWriter(root, provider, Channels.newChannel(out));
writer.start();
writer.writeBatch();
writer.end();
// read data
try (ArrowStreamReader reader = new ArrowStreamReader(new ByteArrayInputStream(out.toByteArray()), allocator)) {
reader.loadNextBatch();
VectorSchemaRoot readRoot = reader.getVectorSchemaRoot();
// get the encoded vector
IntVector intVector = (IntVector) readRoot.getVector(0);
// get dictionaries and decode the vector
Map<Long, Dictionary> dictionaryMap = reader.getDictionaryVectors();
long dictionaryId = intVector.getField().getDictionary().getId();
VarCharVector varCharVector =
(VarCharVector) DictionaryEncoder.decode(intVector, dictionaryMap.get(dictionaryId));
}
Writing and Reading Random Access Files#
The ArrowFileWriter
has the same API as ArrowStreamWriter
ByteArrayOutputStream out = new ByteArrayOutputStream();
ArrowFileWriter writer = new ArrowFileWriter(root, null, Channels.newChannel(out));
writer.start();
// write the first batch
writer.writeBatch();
// write another four batches.
for (int i = 0; i < 4; i++) {
... do populate work
writer.writeBatch();
}
writer.end();
The difference between ArrowFileReader
and ArrowStreamReader
is that the input source
must have a seek
method for random access. Because we have access to the entire payload, we know the
number of record batches in the file, and can read any at random
try (ArrowFileReader reader = new ArrowFileReader(
new ByteArrayReadableSeekableByteChannel(out.toByteArray()), allocator)) {
// read the 4-th batch
ArrowBlock block = reader.getRecordBlocks().get(3);
reader.loadRecordBatch(block);
VectorSchemaRoot readBatch = reader.getVectorSchemaRoot();
}