Apache Arrow (C++)
A columnar in-memory analytics layer designed to accelerate big data.
protocol.h
Go to the documentation of this file.
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 #ifndef PLASMA_PROTOCOL_H
19 #define PLASMA_PROTOCOL_H
20 
21 #include <memory>
22 #include <unordered_map>
23 #include <vector>
24 
25 #include "arrow/status.h"
26 #include "plasma/plasma.h"
27 #include "plasma/plasma_generated.h"
28 
29 namespace plasma {
30 
31 using arrow::Status;
32 
33 using flatbuf::MessageType;
34 using flatbuf::PlasmaError;
35 
36 template <class T>
37 bool VerifyFlatbuffer(T* object, uint8_t* data, size_t size) {
38  flatbuffers::Verifier verifier(data, size);
39  return object->Verify(verifier);
40 }
41 
42 /* Plasma receive message. */
43 
44 Status PlasmaReceive(int sock, MessageType message_type, std::vector<uint8_t>* buffer);
45 
46 /* Plasma Create message functions. */
47 
48 Status SendCreateRequest(int sock, ObjectID object_id, int64_t data_size,
49  int64_t metadata_size, int device_num);
50 
51 Status ReadCreateRequest(uint8_t* data, size_t size, ObjectID* object_id,
52  int64_t* data_size, int64_t* metadata_size, int* device_num);
53 
54 Status SendCreateReply(int sock, ObjectID object_id, PlasmaObject* object,
55  PlasmaError error, int64_t mmap_size);
56 
57 Status ReadCreateReply(uint8_t* data, size_t size, ObjectID* object_id,
58  PlasmaObject* object, int* store_fd, int64_t* mmap_size);
59 
60 Status SendAbortRequest(int sock, ObjectID object_id);
61 
62 Status ReadAbortRequest(uint8_t* data, size_t size, ObjectID* object_id);
63 
64 Status SendAbortReply(int sock, ObjectID object_id);
65 
66 Status ReadAbortReply(uint8_t* data, size_t size, ObjectID* object_id);
67 
68 /* Plasma Seal message functions. */
69 
70 Status SendSealRequest(int sock, ObjectID object_id, unsigned char* digest);
71 
72 Status ReadSealRequest(uint8_t* data, size_t size, ObjectID* object_id,
73  unsigned char* digest);
74 
75 Status SendSealReply(int sock, ObjectID object_id, PlasmaError error);
76 
77 Status ReadSealReply(uint8_t* data, size_t size, ObjectID* object_id);
78 
79 /* Plasma Get message functions. */
80 
81 Status SendGetRequest(int sock, const ObjectID* object_ids, int64_t num_objects,
82  int64_t timeout_ms);
83 
84 Status ReadGetRequest(uint8_t* data, size_t size, std::vector<ObjectID>& object_ids,
85  int64_t* timeout_ms);
86 
87 Status SendGetReply(int sock, ObjectID object_ids[],
88  std::unordered_map<ObjectID, PlasmaObject>& plasma_objects,
89  int64_t num_objects, const std::vector<int>& store_fds,
90  const std::vector<int64_t>& mmap_sizes);
91 
92 Status ReadGetReply(uint8_t* data, size_t size, ObjectID object_ids[],
93  PlasmaObject plasma_objects[], int64_t num_objects,
94  std::vector<int>& store_fds, std::vector<int64_t>& mmap_sizes);
95 
96 /* Plasma Release message functions. */
97 
98 Status SendReleaseRequest(int sock, ObjectID object_id);
99 
100 Status ReadReleaseRequest(uint8_t* data, size_t size, ObjectID* object_id);
101 
102 Status SendReleaseReply(int sock, ObjectID object_id, PlasmaError error);
103 
104 Status ReadReleaseReply(uint8_t* data, size_t size, ObjectID* object_id);
105 
106 /* Plasma Delete objects message functions. */
107 
108 Status SendDeleteRequest(int sock, const std::vector<ObjectID>& object_ids);
109 
110 Status ReadDeleteRequest(uint8_t* data, size_t size, std::vector<ObjectID>* object_ids);
111 
112 Status SendDeleteReply(int sock, const std::vector<ObjectID>& object_ids,
113  const std::vector<PlasmaError>& errors);
114 
115 Status ReadDeleteReply(uint8_t* data, size_t size, std::vector<ObjectID>* object_ids,
116  std::vector<PlasmaError>* errors);
117 
118 /* Satus messages. */
119 
120 Status SendStatusRequest(int sock, const ObjectID* object_ids, int64_t num_objects);
121 
122 Status ReadStatusRequest(uint8_t* data, size_t size, ObjectID object_ids[],
123  int64_t num_objects);
124 
125 Status SendStatusReply(int sock, ObjectID object_ids[], int object_status[],
126  int64_t num_objects);
127 
128 int64_t ReadStatusReply_num_objects(uint8_t* data, size_t size);
129 
130 Status ReadStatusReply(uint8_t* data, size_t size, ObjectID object_ids[],
131  int object_status[], int64_t num_objects);
132 
133 /* Plasma Constains message functions. */
134 
135 Status SendContainsRequest(int sock, ObjectID object_id);
136 
137 Status ReadContainsRequest(uint8_t* data, size_t size, ObjectID* object_id);
138 
139 Status SendContainsReply(int sock, ObjectID object_id, bool has_object);
140 
141 Status ReadContainsReply(uint8_t* data, size_t size, ObjectID* object_id,
142  bool* has_object);
143 
144 /* Plasma List message functions. */
145 
146 Status SendListRequest(int sock);
147 
148 Status ReadListRequest(uint8_t* data, size_t size);
149 
150 Status SendListReply(int sock, const ObjectTable& objects);
151 
152 Status ReadListReply(uint8_t* data, size_t size, ObjectTable* objects);
153 
154 /* Plasma Connect message functions. */
155 
156 Status SendConnectRequest(int sock);
157 
158 Status ReadConnectRequest(uint8_t* data, size_t size);
159 
160 Status SendConnectReply(int sock, int64_t memory_capacity);
161 
162 Status ReadConnectReply(uint8_t* data, size_t size, int64_t* memory_capacity);
163 
164 /* Plasma Evict message functions (no reply so far). */
165 
166 Status SendEvictRequest(int sock, int64_t num_bytes);
167 
168 Status ReadEvictRequest(uint8_t* data, size_t size, int64_t* num_bytes);
169 
170 Status SendEvictReply(int sock, int64_t num_bytes);
171 
172 Status ReadEvictReply(uint8_t* data, size_t size, int64_t& num_bytes);
173 
174 /* Plasma Fetch Remote message functions. */
175 
176 Status SendFetchRequest(int sock, const ObjectID* object_ids, int64_t num_objects);
177 
178 Status ReadFetchRequest(uint8_t* data, size_t size, std::vector<ObjectID>& object_ids);
179 
180 /* Plasma Wait message functions. */
181 
182 Status SendWaitRequest(int sock, ObjectRequest object_requests[], int64_t num_requests,
183  int num_ready_objects, int64_t timeout_ms);
184 
185 Status ReadWaitRequest(uint8_t* data, size_t size, ObjectRequestMap& object_requests,
186  int64_t* timeout_ms, int* num_ready_objects);
187 
188 Status SendWaitReply(int sock, const ObjectRequestMap& object_requests,
189  int num_ready_objects);
190 
191 Status ReadWaitReply(uint8_t* data, size_t size, ObjectRequest object_requests[],
192  int* num_ready_objects);
193 
194 /* Plasma Subscribe message functions. */
195 
196 Status SendSubscribeRequest(int sock);
197 
198 /* Data messages. */
199 
200 Status SendDataRequest(int sock, ObjectID object_id, const char* address, int port);
201 
202 Status ReadDataRequest(uint8_t* data, size_t size, ObjectID* object_id, char** address,
203  int* port);
204 
205 Status SendDataReply(int sock, ObjectID object_id, int64_t object_size,
206  int64_t metadata_size);
207 
208 Status ReadDataReply(uint8_t* data, size_t size, ObjectID* object_id,
209  int64_t* object_size, int64_t* metadata_size);
210 
211 } // namespace plasma
212 
213 #endif /* PLASMA_PROTOCOL */
Status ReadWaitRequest(uint8_t *data, size_t size, ObjectRequestMap &object_requests, int64_t *timeout_ms, int *num_ready_objects)
Status SendAbortRequest(int sock, ObjectID object_id)
Status SendGetReply(int sock, ObjectID object_ids[], std::unordered_map< ObjectID, PlasmaObject > &plasma_objects, int64_t num_objects, const std::vector< int > &store_fds, const std::vector< int64_t > &mmap_sizes)
Status SendFetchRequest(int sock, const ObjectID *object_ids, int64_t num_objects)
Status ReadCreateRequest(uint8_t *data, size_t size, ObjectID *object_id, int64_t *data_size, int64_t *metadata_size, int *device_num)
Status SendDataRequest(int sock, ObjectID object_id, const char *address, int port)
Status SendConnectRequest(int sock)
Status ReadStatusReply(uint8_t *data, size_t size, ObjectID object_ids[], int object_status[], int64_t num_objects)
Status SendWaitReply(int sock, const ObjectRequestMap &object_requests, int num_ready_objects)
Status ReadFetchRequest(uint8_t *data, size_t size, std::vector< ObjectID > &object_ids)
Status ReadCreateReply(uint8_t *data, size_t size, ObjectID *object_id, PlasmaObject *object, int *store_fd, int64_t *mmap_size)
bool VerifyFlatbuffer(T *object, uint8_t *data, size_t size)
Definition: protocol.h:37
Status ReadGetReply(uint8_t *data, size_t size, ObjectID object_ids[], PlasmaObject plasma_objects[], int64_t num_objects, std::vector< int > &store_fds, std::vector< int64_t > &mmap_sizes)
Status SendSealReply(int sock, ObjectID object_id, PlasmaError error)
Status SendStatusReply(int sock, ObjectID object_ids[], int object_status[], int64_t num_objects)
Status ReadListRequest(uint8_t *data, size_t size)
UniqueID ObjectID
Definition: common.h:62
Status ReadDeleteRequest(uint8_t *data, size_t size, std::vector< ObjectID > *object_ids)
Status ReadListReply(uint8_t *data, size_t size, ObjectTable *objects)
Status ReadDeleteReply(uint8_t *data, size_t size, std::vector< ObjectID > *object_ids, std::vector< PlasmaError > *errors)
Definition: status.h:95
Status SendStatusRequest(int sock, const ObjectID *object_ids, int64_t num_objects)
Status SendSealRequest(int sock, ObjectID object_id, unsigned char *digest)
Status SendGetRequest(int sock, const ObjectID *object_ids, int64_t num_objects, int64_t timeout_ms)
Status SendCreateRequest(int sock, ObjectID object_id, int64_t data_size, int64_t metadata_size, int device_num)
Status ReadWaitReply(uint8_t *data, size_t size, ObjectRequest object_requests[], int *num_ready_objects)
Status SendEvictReply(int sock, int64_t num_bytes)
Status ReadConnectReply(uint8_t *data, size_t size, int64_t *memory_capacity)
Status ReadConnectRequest(uint8_t *data, size_t size)
Status SendReleaseReply(int sock, ObjectID object_id, PlasmaError error)
Status SendListReply(int sock, const ObjectTable &objects)
Status SendConnectReply(int sock, int64_t memory_capacity)
std::unordered_map< ObjectID, ObjectRequest > ObjectRequestMap
Mapping from object IDs to type and status of the request.
Definition: plasma.h:69
Status SendCreateReply(int sock, ObjectID object_id, PlasmaObject *object, PlasmaError error, int64_t mmap_size)
Status PlasmaReceive(int sock, MessageType message_type, std::vector< uint8_t > *buffer)
Status SendReleaseRequest(int sock, ObjectID object_id)
Definition: client.h:35
Status SendEvictRequest(int sock, int64_t num_bytes)
int64_t ReadStatusReply_num_objects(uint8_t *data, size_t size)
Status ReadGetRequest(uint8_t *data, size_t size, std::vector< ObjectID > &object_ids, int64_t *timeout_ms)
Status ReadSealRequest(uint8_t *data, size_t size, ObjectID *object_id, unsigned char *digest)
Status ReadReleaseReply(uint8_t *data, size_t size, ObjectID *object_id)
Status ReadEvictRequest(uint8_t *data, size_t size, int64_t *num_bytes)
Status ReadAbortRequest(uint8_t *data, size_t size, ObjectID *object_id)
Status SendWaitRequest(int sock, ObjectRequest object_requests[], int64_t num_requests, int num_ready_objects, int64_t timeout_ms)
Status SendListRequest(int sock)
Status SendAbortReply(int sock, ObjectID object_id)
Status ReadStatusRequest(uint8_t *data, size_t size, ObjectID object_ids[], int64_t num_objects)
Status ReadEvictReply(uint8_t *data, size_t size, int64_t &num_bytes)
Status SendDataReply(int sock, ObjectID object_id, int64_t object_size, int64_t metadata_size)
Status ReadReleaseRequest(uint8_t *data, size_t size, ObjectID *object_id)
Status ReadContainsRequest(uint8_t *data, size_t size, ObjectID *object_id)
std::unordered_map< ObjectID, std::unique_ptr< ObjectTableEntry > > ObjectTable
Mapping from ObjectIDs to information about the object.
Definition: common.h:139
Status ReadDataRequest(uint8_t *data, size_t size, ObjectID *object_id, char **address, int *port)
Status SendContainsRequest(int sock, ObjectID object_id)
Status SendDeleteRequest(int sock, const std::vector< ObjectID > &object_ids)
Status ReadSealReply(uint8_t *data, size_t size, ObjectID *object_id)
Status SendSubscribeRequest(int sock)
Status ReadDataReply(uint8_t *data, size_t size, ObjectID *object_id, int64_t *object_size, int64_t *metadata_size)
Status ReadContainsReply(uint8_t *data, size_t size, ObjectID *object_id, bool *has_object)
Status ReadAbortReply(uint8_t *data, size_t size, ObjectID *object_id)
Status SendDeleteReply(int sock, const std::vector< ObjectID > &object_ids, const std::vector< PlasmaError > &errors)
Status SendContainsReply(int sock, ObjectID object_id, bool has_object)