Interprocess messaging / communication (IPC)

Encapsulated message format

Data components in the stream and file formats are represented as encapsulated messages consisting of:

Schematically, we have:

<metadata_size: int32>
<metadata_flatbuffer: bytes>
<padding>
<message body>

The complete serialized message must be a multiple of 8 bytes so that messages can be relocated between streams. Otherwise the amount of padding between the metadata and the message body could be non-deterministic.

The metadata_size includes the size of the flatbuffer plus padding. The Message flatbuffer includes a version number, the particular message (as a flatbuffer union), and the size of the message body:

table Message {
  version: org.apache.arrow.flatbuf.MetadataVersion;
  header: MessageHeader;
  bodyLength: long;
}

Currently, we support 4 types of messages:

Streaming format

We provide a streaming format for record batches. It is presented as a sequence of encapsulated messages, each of which follows the format above. The schema comes first in the stream, and it is the same for all of the record batches that follow. If any fields in the schema are dictionary-encoded, one or more DictionaryBatch messages will follow the schema.

<SCHEMA>
<DICTIONARY 0>
...
<DICTIONARY k - 1>
<RECORD BATCH 0>
...
<RECORD BATCH n - 1>
<EOS [optional]: int32>

When a stream reader implementation is reading a stream, after each message, it may read the next 4 bytes to know how large the message metadata that follows is. Once the message flatbuffer is read, you can then read the message body.

The stream writer can signal end-of-stream (EOS) either by writing a 0 length as an int32 or simply closing the stream interface.

File format

We define a “file format” supporting random access in a very similar format to the streaming format. The file starts and ends with a magic string ARROW1 (plus padding). What follows in the file is identical to the stream format. At the end of the file, we write a footer containing a redundant copy of the schema (which is a part of the streaming format) plus memory offsets and sizes for each of the data blocks in the file. This enables random access any record batch in the file. See format/File.fbs for the precise details of the file footer.

Schematically we have:

<magic number "ARROW1">
<empty padding bytes [to 8 byte boundary]>
<STREAMING FORMAT>
<FOOTER>
<FOOTER SIZE: int32>
<magic number "ARROW1">

RecordBatch body structure

The RecordBatch metadata contains a depth-first (pre-order) flattened set of field metadata and physical memory buffers (some comments from Message.fbs have been shortened / removed):

table RecordBatch {
  length: long;
  nodes: [FieldNode];
  buffers: [Buffer];
}

struct FieldNode {
  length: long;
  null_count: long;
}

struct Buffer {
  /// The shared memory page id where this buffer is located. Currently this is
  /// not used
  page: int;

  /// The relative offset into the shared memory page where the bytes for this
  /// buffer starts
  offset: long;

  /// The absolute length (in bytes) of the memory buffer. The memory is found
  /// from offset (inclusive) to offset + length (non-inclusive).
  length: long;
}

In the context of a file, the page is not used, and the Buffer offsets use as a frame of reference the start of the message body. So, while in a general IPC setting these offsets may be anyplace in one or more shared memory regions, in the file format the offsets start from 0.

The location of a record batch and the size of the metadata block as well as the body of buffers is stored in the file footer:

struct Block {
  offset: long;
  metaDataLength: int;
  bodyLength: long;
}

The metaDataLength here includes the metadata length prefix, serialized metadata, and any additional padding bytes, and by construction must be a multiple of 8 bytes.

Some notes about this

Dictionary Batches

Dictionaries are written in the stream and file formats as a sequence of record batches, each having a single field. The complete semantic schema for a sequence of record batches, therefore, consists of the schema along with all of the dictionaries. The dictionary types are found in the schema, so it is necessary to read the schema to first determine the dictionary types so that the dictionaries can be properly interpreted.

table DictionaryBatch {
  id: long;
  data: RecordBatch;
}

The dictionary id in the message metadata can be referenced one or more times in the schema, so that dictionaries can even be used for multiple fields. See the Physical Layout document for more about the semantics of dictionary-encoded data.

Tensor (Multi-dimensional Array) Message Format

The Tensor message types provides a way to write a multidimensional array of fixed-size values (such as a NumPy ndarray) using Arrow’s shared memory tools. Arrow implementations in general are not required to implement this data format, though we provide a reference implementation in C++.

When writing a standalone encapsulated tensor message, we use the format as indicated above, but additionally align the starting offset (if writing to a shared memory region) to be a multiple of 8:

<PADDING>
<metadata size: int32>
<metadata>
<tensor body>