The Plasma In-Memory Object Store

Warning

Plasma is deprecated since Arrow 10.0.0. It will be removed in 12.0.0 or so.

Note

As present, Plasma is only supported for use on Linux and macOS.

The Plasma API

Starting the Plasma store

You can start the Plasma store by issuing a terminal command similar to the following:

plasma_store -m 1000000000 -s /tmp/plasma

The -m flag specifies the size of the store in bytes, and the -s flag specifies the socket that the store will listen at. Thus, the above command allows the Plasma store to use up to 1GB of memory, and sets the socket to /tmp/plasma.

Leaving the current terminal window open as long as Plasma store should keep running. Messages, concerning such as disconnecting clients, may occasionally be printed to the screen. To stop running the Plasma store, you can press Ctrl-C in the terminal.

Creating a Plasma client

To start a Plasma client from Python, call plasma.connect using the same socket name:

import pyarrow.plasma as plasma
client = plasma.connect("/tmp/plasma")

If the following error occurs from running the above Python code, that means that either the socket given is incorrect, or the ./plasma_store is not currently running. Check to see if the Plasma store is still running.

>>> client = plasma.connect("/tmp/plasma")
Connection to socket failed for pathname /tmp/plasma
Could not connect to socket /tmp/plasma

Object IDs

Each object in the Plasma store should be associated with a unique ID. The Object ID then serves as a key that any client can use to retrieve that object from the Plasma store. You can form an ObjectID object from a byte string of length 20.

# Create an ObjectID.
>>> id = plasma.ObjectID(20 * b"a")

# The character "a" is encoded as 61 in hex.
>>> id
ObjectID(6161616161616161616161616161616161616161)

The random generation of Object IDs is often good enough to ensure unique IDs. You can easily create a helper function that randomly generates object IDs as follows:

import numpy as np

def random_object_id():
  return plasma.ObjectID(np.random.bytes(20))

Putting and Getting Python Objects

Plasma supports two APIs for creating and accessing objects: A high level API that allows storing and retrieving Python objects and a low level API that allows creating, writing and sealing buffers and operating on the binary data directly. In this section we describe the high level API.

This is how you can put and get a Python object:

# Create a python object.
object_id = client.put("hello, world")

# Get the object.
client.get(object_id)

This works with all Python objects supported by the Arrow Python object serialization.

You can also get multiple objects at the same time (which can be more efficient since it avoids IPC round trips):

# Create multiple python objects.
object_id1 = client.put(1)
object_id2 = client.put(2)
object_id3 = client.put(3)

# Get the objects.
client.get([object_id1, object_id2, object_id3])

Furthermore, it is possible to provide a timeout for the get call. If the object is not available within the timeout, the special object pyarrow.ObjectNotAvailable will be returned.

Creating an Object Buffer

Objects are created in Plasma in two stages. First, they are created, which allocates a buffer for the object. At this point, the client can write to the buffer and construct the object within the allocated buffer.

To create an object for Plasma, you need to create an object ID, as well as give the object’s maximum size in bytes.

# Create an object buffer.
object_id = plasma.ObjectID(20 * b"a")
object_size = 1000
buffer = memoryview(client.create(object_id, object_size))

# Write to the buffer.
for i in range(1000):
  buffer[i] = i % 128

When the client is done, the client seals the buffer, making the object immutable, and making it available to other Plasma clients.

# Seal the object. This makes the object immutable and available to other clients.
client.seal(object_id)

Getting an Object Buffer

After an object has been sealed, any client who knows the object ID can get the object buffer.

# Create a different client. Note that this second client could be
# created in the same or in a separate, concurrent Python session.
client2 = plasma.connect("/tmp/plasma")

# Get the object in the second client. This blocks until the object has been sealed.
object_id2 = plasma.ObjectID(20 * b"a")
[buffer2] = client2.get_buffers([object_id])

If the object has not been sealed yet, then the call to client.get_buffers will block until the object has been sealed by the client constructing the object. Using the timeout_ms argument to get, you can specify a timeout for this (in milliseconds). After the timeout, the interpreter will yield control back.

>>> buffer
<memory at 0x7fdbdc96e708>
>>> buffer[1]
1
>>> buffer2
<plasma.plasma.PlasmaBuffer object at 0x7fdbf2770e88>
>>> view2 = memoryview(buffer2)
>>> view2[1]
1
>>> view2[129]
1
>>> bytes(buffer[1:4])
b'\x01\x02\x03'
>>> bytes(view2[1:4])
b'\x01\x02\x03'

Listing objects in the store

The objects in the store can be listed in the following way (note that this functionality is currently experimental and the concrete representation of the object info might change in the future):

import pyarrow.plasma as plasma
import time

client = plasma.connect("/tmp/plasma")

client.put("hello, world")
# Sleep a little so we get different creation times
time.sleep(2)
client.put("another object")
# Create an object that is not sealed yet
object_id = plasma.ObjectID.from_random()
client.create(object_id, 100)
print(client.list())

>>> {ObjectID(4cba8f80c54c6d265b46c2cdfcee6e32348b12be): {'construct_duration': 0,
>>>  'create_time': 1535223642,
>>>  'data_size': 460,
>>>  'metadata_size': 0,
>>>  'ref_count': 0,
>>>  'state': 'sealed'},
>>> ObjectID(a7598230b0c26464c9d9c99ae14773ee81485428): {'construct_duration': 0,
>>>  'create_time': 1535223644,
>>>  'data_size': 460,
>>>  'metadata_size': 0,
>>>  'ref_count': 0,
>>>  'state': 'sealed'},
>>> ObjectID(e603ab0c92098ebf08f90bfcea33ff98f6476870): {'construct_duration': -1,
>>>  'create_time': 1535223644,
>>>  'data_size': 100,
>>>  'metadata_size': 0,
>>>  'ref_count': 1,
>>>  'state': 'created'}}

Using Arrow and Pandas with Plasma

Storing Arrow Objects in Plasma

To store an Arrow object in Plasma, we must first create the object and then seal it. However, Arrow objects such as Tensors may be more complicated to write than simple binary data.

To create the object in Plasma, you still need an ObjectID and a size to pass in. To find out the size of your Arrow object, you can use pyarrow API such as pyarrow.ipc.get_tensor_size.

import numpy as np
import pyarrow as pa

# Create a pyarrow.Tensor object from a numpy random 2-dimensional array
data = np.random.randn(10, 4)
tensor = pa.Tensor.from_numpy(data)

# Create the object in Plasma
object_id = plasma.ObjectID(np.random.bytes(20))
data_size = pa.ipc.get_tensor_size(tensor)
buf = client.create(object_id, data_size)

To write the Arrow Tensor object into the buffer, you can use Plasma to convert the memoryview buffer into a pyarrow.FixedSizeBufferWriter object. A pyarrow.FixedSizeBufferWriter is a format suitable for Arrow’s pyarrow.ipc.write_tensor:

# Write the tensor into the Plasma-allocated buffer
stream = pa.FixedSizeBufferWriter(buf)
pa.ipc.write_tensor(tensor, stream)  # Writes tensor's 552 bytes to Plasma stream

To finish storing the Arrow object in Plasma, call seal:

# Seal the Plasma object
client.seal(object_id)

Getting Arrow Objects from Plasma

To read the object, first retrieve it as a PlasmaBuffer using its object ID.

# Get the arrow object by ObjectID.
[buf2] = client.get_buffers([object_id])

To convert the PlasmaBuffer back into an Arrow Tensor, first create a pyarrow BufferReader object from it. You can then pass the BufferReader into pyarrow.ipc.read_tensor to reconstruct the Arrow Tensor object:

# Reconstruct the Arrow tensor object.
reader = pa.BufferReader(buf2)
tensor2 = pa.ipc.read_tensor(reader)

Finally, you can use pyarrow.ipc.read_tensor to convert the Arrow object back into numpy data:

# Convert back to numpy
array = tensor2.to_numpy()

Storing Pandas DataFrames in Plasma

Storing a Pandas DataFrame still follows the create then seal process of storing an object in the Plasma store, however one cannot directly write the DataFrame to Plasma with Pandas alone. Plasma also needs to know the size of the DataFrame to allocate a buffer for.

See Pandas Integration for more information on using Arrow with Pandas.

You can create the pyarrow equivalent of a Pandas DataFrame by using pyarrow.from_pandas to convert it to a RecordBatch.

import pyarrow as pa
import pandas as pd

# Create a Pandas DataFrame
d = {'one' : pd.Series([1., 2., 3.], index=['a', 'b', 'c']),
     'two' : pd.Series([1., 2., 3., 4.], index=['a', 'b', 'c', 'd'])}
df = pd.DataFrame(d)

# Convert the Pandas DataFrame into a PyArrow RecordBatch
record_batch = pa.RecordBatch.from_pandas(df)

Creating the Plasma object requires an ObjectID and the size of the data. Now that we have converted the Pandas DataFrame into a PyArrow RecordBatch, use the MockOutputStream to determine the size of the Plasma object.

# Create the Plasma object from the PyArrow RecordBatch. Most of the work here
# is done to determine the size of buffer to request from the object store.
object_id = plasma.ObjectID(np.random.bytes(20))
mock_sink = pa.MockOutputStream()
with pa.RecordBatchStreamWriter(mock_sink, record_batch.schema) as stream_writer:
    stream_writer.write_batch(record_batch)
data_size = mock_sink.size()
buf = client.create(object_id, data_size)

The DataFrame can now be written to the buffer as follows.

# Write the PyArrow RecordBatch to Plasma
stream = pa.FixedSizeBufferWriter(buf)
with pa.RecordBatchStreamWriter(stream, record_batch.schema) as stream_writer:
    stream_writer.write_batch(record_batch)

Finally, seal the finished object for use by all clients:

# Seal the Plasma object
client.seal(object_id)

Getting Pandas DataFrames from Plasma

Since we store the Pandas DataFrame as a PyArrow RecordBatch object, to get the object back from the Plasma store, we follow similar steps to those specified in Getting Arrow Objects from Plasma.

We first have to convert the PlasmaBuffer returned from client.get_buffers into an Arrow BufferReader object.

# Fetch the Plasma object
[data] = client.get_buffers([object_id])  # Get PlasmaBuffer from ObjectID
buffer = pa.BufferReader(data)

From the BufferReader, we can create a specific RecordBatchStreamReader in Arrow to reconstruct the stored PyArrow RecordBatch object.

# Convert object back into an Arrow RecordBatch
reader = pa.RecordBatchStreamReader(buffer)
record_batch = reader.read_next_batch()

The last step is to convert the PyArrow RecordBatch object back into the original Pandas DataFrame structure.

# Convert back into Pandas
result = record_batch.to_pandas()

Using Plasma with Huge Pages

On Linux it is possible to use the Plasma store with huge pages for increased throughput. You first need to create a file system and activate huge pages with

sudo mkdir -p /mnt/hugepages
gid=`id -g`
uid=`id -u`
sudo mount -t hugetlbfs -o uid=$uid -o gid=$gid none /mnt/hugepages
sudo bash -c "echo $gid > /proc/sys/vm/hugetlb_shm_group"
sudo bash -c "echo 20000 > /proc/sys/vm/nr_hugepages"

Note that you only need root access to create the file system, not for running the object store. You can then start the Plasma store with the -d flag for the mount point of the huge page file system and the -h flag which indicates that huge pages are activated:

plasma_store -s /tmp/plasma -m 10000000000 -d /mnt/hugepages -h

You can test this with the following script:

import numpy as np
import pyarrow as pa
import pyarrow.plasma as plasma
import time

client = plasma.connect("/tmp/plasma")

data = np.random.randn(100000000)
tensor = pa.Tensor.from_numpy(data)

object_id = plasma.ObjectID(np.random.bytes(20))
buf = client.create(object_id, pa.ipc.get_tensor_size(tensor))

stream = pa.FixedSizeBufferWriter(buf)
stream.set_memcopy_threads(4)
a = time.time()
pa.ipc.write_tensor(tensor, stream)
print("Writing took ", time.time() - a)