Apache Arrow (C++)
A columnar in-memory analytics layer designed to accelerate big data.
plasma

Using the Plasma In-Memory Object Store from C++

Apache Arrow offers the ability to share your data structures among multiple processes simultaneously through Plasma, an in-memory object store.

Note that the Plasma API is not stable.

Plasma clients are processes that run on the same machine as the object store. They communicate with the object store over Unix domain sockets, and they read and write data in the object store through shared memory.

Plasma objects are immutable once they have been created.

The following goes over the basics so you can begin using Plasma in your big data applications.

Starting the Plasma store

To start running the Plasma object store so that clients may connect and access the data, run the following command:

plasma_store -m 1000000000 -s /tmp/plasma

The -m flag specifies the size of the object store in bytes. The -s flag specifies the path of the Unix domain socket that the store will listen at.

Therefore, the above command initializes a Plasma store up to 1 GB of memory and sets the socket to /tmp/plasma.

The Plasma store will remain available as long as the plasma_store process is running in a terminal window. Messages, such as alerts for disconnecting clients, may occasionally be output. To stop running the Plasma store, you can press Ctrl-C in the terminal window.

Alternatively, you can run the Plasma store in the background and ignore all message output with the following terminal command:

plasma_store -m 1000000000 -s /tmp/plasma 1> /dev/null 2> /dev/null &

The Plasma store will instead run silently in the background. To stop running the Plasma store in this case, issue the command below:

killall plasma_store

Creating a Plasma client

Now that the Plasma object store is up and running, it is time to make a client process connect to it. To use the Plasma object store as a client, your application should initialize a plasma::PlasmaClient object and tell it to connect to the socket specified when starting up the Plasma object store.

#include <plasma/client.h>
using namespace plasma;
int main(int argc, char** argv) {
// Start up and connect a Plasma client.
PlasmaClient client;
// Disconnect the Plasma client.
}

Save this program in a file test.cc and compile it with

g++ test.cc `pkg-config --cflags --libs plasma` --std=c++11

Note that multiple clients can be created within the same process.

Note that a PlasmaClient object is not thread safe.

If the Plasma store is still running, you can now execute the a.out executable and the store will print something like

Disconnecting client on fd 5

which shows that the client was successfully disconnected.

Object IDs

The Plasma object store uses twenty-byte identifiers for accessing objects stored in shared memory. Each object in the Plasma store should be associated with a unique ID. The Object ID is then a key that can be used by any client to fetch that object from the Plasma store.

Random generation of Object IDs is often good enough to ensure unique IDs:

// Randomly generate an Object ID.
ObjectID object_id = ObjectID::from_random();

Now, any connected client that knows the object's Object ID can access the same object from the Plasma object store. For easy transportation of Object IDs, you can convert/serialize an Object ID into a binary string and back as follows:

// From ObjectID to binary string
std:string id_string = object_id.binary();
// From binary string to ObjectID
ObjectID id_object = ObjectID::from_binary(&id_string);

You can also get a human readable representation of ObjectIDs in the same format that git uses for commit hashes by running ObjectID::hex.

Here is a test program you can run:

#include <iostream>
#include <string>
#include <plasma/client.h>
using namespace plasma;
int main(int argc, char** argv) {
std::cout << "object_id1 is " << object_id1.hex() << std::endl;
std::string id_string = object_id1.binary();
ObjectID object_id2 = ObjectID::from_binary(id_string);
std::cout << "object_id2 is " << object_id2.hex() << std::endl;
}

Creating an Object

Now that you learned about Object IDs that are used to refer to objects, let's look at how objects can be stored in Plasma.

Storing objects is a two-stage process. First a buffer is allocated with a call to Create. Then it can be constructed in place by the client. Then it is made immutable and shared with other clients via a call to Seal.

The Create call blocks while the Plasma store allocates a buffer of the appropriate size. The client will then map the buffer into its own address space. At this point the object can be constructed in place using a pointer that was written by the Create command.

int64_t data_size = 100;
// The address of the buffer allocated by the Plasma store will be written at
// this address.
uint8_t* data;
// Create a Plasma object by specifying its ID and size.
ARROW_CHECK_OK(client.Create(object_id, data_size, NULL, 0, &data));

You can also specify metadata for the object; the third argument is the metadata (as raw bytes) and the fourth argument is the size of the metadata.

// Create a Plasma object with metadata.
int64_t data_size = 100;
std::string metadata = "{'author': 'john'}";
uint8_t* data;
client.Create(object_id, data_size, (uint8_t*) metadata.data(), metadata.size(), &data);

Now that we've obtained a pointer to our object's data, we can write our data to it:

// Write some data for the Plasma object.
for (int64_t i = 0; i < data_size; i++) {
data[i] = static_cast<uint8_t>(i % 4);
}

When the client is done, the client seals the buffer, making the object immutable, and making it available to other Plasma clients:

// Seal the object. This makes it available for all clients.
client.Seal(object_id);

Here is an example that combines all these features:

#include <plasma/client.h>
using namespace plasma;
int main(int argc, char** argv) {
// Start up and connect a Plasma client.
PlasmaClient client;
// Create an object with a random ObjectID.
ObjectID object_id = ObjectID::from_binary("00000000000000000000");
int64_t data_size = 1000;
uint8_t *data;
std::string metadata = "{'author': 'john'}";
ARROW_CHECK_OK(client.Create(object_id, data_size, (uint8_t*) metadata.data(), metadata.size(), &data));
// Write some data into the object.
for (int64_t i = 0; i < data_size; i++) {
data[i] = static_cast<uint8_t>(i % 4);
}
// Seal the object.
ARROW_CHECK_OK(client.Seal(object_id));
// Disconnect the client.
}

This example can be compiled with

g++ create.cc `pkg-config --cflags --libs plasma` --std=c++11 -o create

To verify that an object exists in the Plasma object store, you can call PlasmaClient::Contains() to check if an object has been created and sealed for a given Object ID. Note that this function will still return False if the object has been created, but not yet sealed:

// Check if an object has been created and sealed.
bool has_object;
client.Contains(object_id, &has_object);
if (has_object) {
// Object has been created and sealed, proceed
}

Getting an Object

After an object has been sealed, any client who knows the Object ID can get the object. To store the retrieved object contents, you should create an ObjectBuffer, then call PlasmaClient::Get() as follows:

// Get from the Plasma store by Object ID.
ObjectBuffer object_buffer;
client.Get(&object_id, 1, -1, &object_buffer);

PlasmaClient::Get() isn't limited to fetching a single object from the Plasma store at once. You can specify an array of Object IDs and ObjectBuffers to fetch at once, so long as you also specify the number of objects being fetched:

// Get two objects at once from the Plasma store. This function
// call will block until both objects have been fetched.
ObjectBuffer multiple_buffers[2];
ObjectID multiple_ids[2] = {object_id1, object_id2};
client.Get(multiple_ids, 2, -1, multiple_buffers);

Since PlasmaClient::Get() is a blocking function call, it may be necessary to limit the amount of time the function is allowed to take when trying to fetch from the Plasma store. You can pass in a timeout in milliseconds when calling PlasmaClient::Get(). To use PlasmaClient::Get() without a timeout, just pass in -1 like in the previous example calls:

// Make the function call give up fetching the object if it takes
// more than 100 milliseconds.
int64_t timeout = 100;
client.Get(&object_id, 1, timeout, &object_buffer);

Finally, to access the object, you can access the data and metadata attributes of the ObjectBuffer. The data can be indexed like any array:

// Access object data.
uint8_t* data = object_buffer.data;
int64_t data_size = object_buffer.data_size;
// Access object metadata.
uint8_t* metadata = object_buffer.metadata;
uint8_t metadata_size = object_buffer.metadata_size;
// Index into data array.
uint8_t first_data_byte = data[0];

Here is a longer example that shows these capabilities:

#include <plasma/client.h>
using namespace plasma;
int main(int argc, char** argv) {
// Start up and connect a Plasma client.
PlasmaClient client;
ObjectID object_id = ObjectID::from_binary("00000000000000000000");
ObjectBuffer object_buffer;
ARROW_CHECK_OK(client.Get(&object_id, 1, -1, &object_buffer));
// Retrieve object data.
uint8_t* data = object_buffer.data;
int64_t data_size = object_buffer.data_size;
// Check that the data agrees with what was written in the other process.
for (int64_t i = 0; i < data_size; i++) {
ARROW_CHECK(data[i] == static_cast<uint8_t>(i % 4));
}
// Disconnect the client.
}

If you compile it with

g++ get.cc `pkg-config --cflags --libs plasma` --std=c++11 -o get

and run it with ./get, all the assertions will pass if you run the create example from above on the same Plasma store.

Object Lifetime Management

The Plasma store internally does reference counting to make sure objects that are mapped into the address space of one of the clients with PlasmaClient::Get are accessible. To unmap objects from a client, call PlasmaClient::Release. All objects that are mapped into a clients address space will automatically be released when the client is disconnected from the store (this happens even if the client process crashes or otherwise fails to call Disconnect).

If a new object is created and there is not enough space in the Plasma store, the store will evict the least recently used object (an object is in use if at least one client has gotten it but not released it).

Object notifications

Additionally, you can arrange to have Plasma notify you when objects are sealed in the object store. This may especially be handy when your program is collaborating with other Plasma clients, and needs to know when they make objects available.

First, you can subscribe your current Plasma client to such notifications by getting a file descriptor:

// Start receiving notifications into file_descriptor.
int fd;

Once you have the file descriptor, you can have your current Plasma client wait to receive the next object notification. Object notifications include information such as Object ID, data size, and metadata size of the next newly available object:

// Receive notification of the next newly available object.
// Notification information is stored in object_id, data_size, and metadata_size
ObjectID object_id;
int64_t data_size;
int64_t metadata_size;
ARROW_CHECK_OK(client.GetNotification(fd, &object_id, &data_size, &metadata_size));
// Get the newly available object.
ObjectBuffer object_buffer;
ARROW_CHECK_OK(client.Get(&object_id, 1, -1, &object_buffer));

Here is a full program that shows this capability:

#include <plasma/client.h>
using namespace plasma;
int main(int argc, char** argv) {
// Start up and connect a Plasma client.
PlasmaClient client;
int fd;
ARROW_CHECK_OK(client.Subscribe(&fd));
ObjectID object_id;
int64_t data_size;
int64_t metadata_size;
while (true) {
ARROW_CHECK_OK(client.GetNotification(fd, &object_id, &data_size, &metadata_size));
std::cout << "Received object notification for object_id = "
<< object_id.hex() << ", with data_size = " << data_size
<< ", and metadata_size = " << metadata_size << std::endl;
}
// Disconnect the client.
}

If you compile it with

g++ subscribe.cc `pkg-config --cflags --libs plasma` --std=c++11 -o subscribe

and invoke ./create and ./subscribe while the Plasma store is running, you can observe the new object arriving.