Apache Arrow (C++)
A columnar in-memory analytics layer designed to accelerate big data.
protocol.h
Go to the documentation of this file.
1 // Licensed to the Apache Software Foundation (ASF) under one
2 // or more contributor license agreements. See the NOTICE file
3 // distributed with this work for additional information
4 // regarding copyright ownership. The ASF licenses this file
5 // to you under the Apache License, Version 2.0 (the
6 // "License"); you may not use this file except in compliance
7 // with the License. You may obtain a copy of the License at
8 //
9 // http://www.apache.org/licenses/LICENSE-2.0
10 //
11 // Unless required by applicable law or agreed to in writing,
12 // software distributed under the License is distributed on an
13 // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14 // KIND, either express or implied. See the License for the
15 // specific language governing permissions and limitations
16 // under the License.
17 
18 #ifndef PLASMA_PROTOCOL_H
19 #define PLASMA_PROTOCOL_H
20 
21 #include <memory>
22 #include <unordered_map>
23 #include <vector>
24 
25 #include "arrow/status.h"
26 #include "plasma/plasma.h"
27 #include "plasma/plasma_generated.h"
28 
29 namespace plasma {
30 
31 using arrow::Status;
32 
33 template <class T>
34 bool verify_flatbuffer(T* object, uint8_t* data, size_t size) {
35  flatbuffers::Verifier verifier(data, size);
36  return object->Verify(verifier);
37 }
38 
39 /* Plasma receive message. */
40 
41 Status PlasmaReceive(int sock, int64_t message_type, std::vector<uint8_t>* buffer);
42 
43 /* Plasma Create message functions. */
44 
45 Status SendCreateRequest(int sock, ObjectID object_id, int64_t data_size,
46  int64_t metadata_size, int device_num);
47 
48 Status ReadCreateRequest(uint8_t* data, size_t size, ObjectID* object_id,
49  int64_t* data_size, int64_t* metadata_size, int* device_num);
50 
51 Status SendCreateReply(int sock, ObjectID object_id, PlasmaObject* object, int error,
52  int64_t mmap_size);
53 
54 Status ReadCreateReply(uint8_t* data, size_t size, ObjectID* object_id,
55  PlasmaObject* object, int* store_fd, int64_t* mmap_size);
56 
57 Status SendAbortRequest(int sock, ObjectID object_id);
58 
59 Status ReadAbortRequest(uint8_t* data, size_t size, ObjectID* object_id);
60 
61 Status SendAbortReply(int sock, ObjectID object_id);
62 
63 Status ReadAbortReply(uint8_t* data, size_t size, ObjectID* object_id);
64 
65 /* Plasma Seal message functions. */
66 
67 Status SendSealRequest(int sock, ObjectID object_id, unsigned char* digest);
68 
69 Status ReadSealRequest(uint8_t* data, size_t size, ObjectID* object_id,
70  unsigned char* digest);
71 
72 Status SendSealReply(int sock, ObjectID object_id, int error);
73 
74 Status ReadSealReply(uint8_t* data, size_t size, ObjectID* object_id);
75 
76 /* Plasma Get message functions. */
77 
78 Status SendGetRequest(int sock, const ObjectID* object_ids, int64_t num_objects,
79  int64_t timeout_ms);
80 
81 Status ReadGetRequest(uint8_t* data, size_t size, std::vector<ObjectID>& object_ids,
82  int64_t* timeout_ms);
83 
84 Status SendGetReply(
85  int sock, ObjectID object_ids[],
86  std::unordered_map<ObjectID, PlasmaObject, UniqueIDHasher>& plasma_objects,
87  int64_t num_objects, const std::vector<int>& store_fds,
88  const std::vector<int64_t>& mmap_sizes);
89 
90 Status ReadGetReply(uint8_t* data, size_t size, ObjectID object_ids[],
91  PlasmaObject plasma_objects[], int64_t num_objects,
92  std::vector<int>& store_fds, std::vector<int64_t>& mmap_sizes);
93 
94 /* Plasma Release message functions. */
95 
96 Status SendReleaseRequest(int sock, ObjectID object_id);
97 
98 Status ReadReleaseRequest(uint8_t* data, size_t size, ObjectID* object_id);
99 
100 Status SendReleaseReply(int sock, ObjectID object_id, int error);
101 
102 Status ReadReleaseReply(uint8_t* data, size_t size, ObjectID* object_id);
103 
104 /* Plasma Delete message functions. */
105 
106 Status SendDeleteRequest(int sock, ObjectID object_id);
107 
108 Status ReadDeleteRequest(uint8_t* data, size_t size, ObjectID* object_id);
109 
110 Status SendDeleteReply(int sock, ObjectID object_id, int error);
111 
112 Status ReadDeleteReply(uint8_t* data, size_t size, ObjectID* object_id);
113 
114 /* Satus messages. */
115 
116 Status SendStatusRequest(int sock, const ObjectID* object_ids, int64_t num_objects);
117 
118 Status ReadStatusRequest(uint8_t* data, size_t size, ObjectID object_ids[],
119  int64_t num_objects);
120 
121 Status SendStatusReply(int sock, ObjectID object_ids[], int object_status[],
122  int64_t num_objects);
123 
124 int64_t ReadStatusReply_num_objects(uint8_t* data, size_t size);
125 
126 Status ReadStatusReply(uint8_t* data, size_t size, ObjectID object_ids[],
127  int object_status[], int64_t num_objects);
128 
129 /* Plasma Constains message functions. */
130 
131 Status SendContainsRequest(int sock, ObjectID object_id);
132 
133 Status ReadContainsRequest(uint8_t* data, size_t size, ObjectID* object_id);
134 
135 Status SendContainsReply(int sock, ObjectID object_id, bool has_object);
136 
137 Status ReadContainsReply(uint8_t* data, size_t size, ObjectID* object_id,
138  bool* has_object);
139 
140 /* Plasma Connect message functions. */
141 
142 Status SendConnectRequest(int sock);
143 
144 Status ReadConnectRequest(uint8_t* data, size_t size);
145 
146 Status SendConnectReply(int sock, int64_t memory_capacity);
147 
148 Status ReadConnectReply(uint8_t* data, size_t size, int64_t* memory_capacity);
149 
150 /* Plasma Evict message functions (no reply so far). */
151 
152 Status SendEvictRequest(int sock, int64_t num_bytes);
153 
154 Status ReadEvictRequest(uint8_t* data, size_t size, int64_t* num_bytes);
155 
156 Status SendEvictReply(int sock, int64_t num_bytes);
157 
158 Status ReadEvictReply(uint8_t* data, size_t size, int64_t& num_bytes);
159 
160 /* Plasma Fetch Remote message functions. */
161 
162 Status SendFetchRequest(int sock, const ObjectID* object_ids, int64_t num_objects);
163 
164 Status ReadFetchRequest(uint8_t* data, size_t size, std::vector<ObjectID>& object_ids);
165 
166 /* Plasma Wait message functions. */
167 
168 Status SendWaitRequest(int sock, ObjectRequest object_requests[], int64_t num_requests,
169  int num_ready_objects, int64_t timeout_ms);
170 
171 Status ReadWaitRequest(uint8_t* data, size_t size, ObjectRequestMap& object_requests,
172  int64_t* timeout_ms, int* num_ready_objects);
173 
174 Status SendWaitReply(int sock, const ObjectRequestMap& object_requests,
175  int num_ready_objects);
176 
177 Status ReadWaitReply(uint8_t* data, size_t size, ObjectRequest object_requests[],
178  int* num_ready_objects);
179 
180 /* Plasma Subscribe message functions. */
181 
182 Status SendSubscribeRequest(int sock);
183 
184 /* Data messages. */
185 
186 Status SendDataRequest(int sock, ObjectID object_id, const char* address, int port);
187 
188 Status ReadDataRequest(uint8_t* data, size_t size, ObjectID* object_id, char** address,
189  int* port);
190 
191 Status SendDataReply(int sock, ObjectID object_id, int64_t object_size,
192  int64_t metadata_size);
193 
194 Status ReadDataReply(uint8_t* data, size_t size, ObjectID* object_id,
195  int64_t* object_size, int64_t* metadata_size);
196 
197 } // namespace plasma
198 
199 #endif /* PLASMA_PROTOCOL */
Status ReadWaitRequest(uint8_t *data, size_t size, ObjectRequestMap &object_requests, int64_t *timeout_ms, int *num_ready_objects)
Status SendAbortRequest(int sock, ObjectID object_id)
Status SendFetchRequest(int sock, const ObjectID *object_ids, int64_t num_objects)
Status ReadCreateRequest(uint8_t *data, size_t size, ObjectID *object_id, int64_t *data_size, int64_t *metadata_size, int *device_num)
Status SendDataRequest(int sock, ObjectID object_id, const char *address, int port)
Status ReadDeleteReply(uint8_t *data, size_t size, ObjectID *object_id)
Status SendConnectRequest(int sock)
std::unordered_map< ObjectID, ObjectRequest, UniqueIDHasher > ObjectRequestMap
Mapping from object IDs to type and status of the request.
Definition: plasma.h:71
Status ReadStatusReply(uint8_t *data, size_t size, ObjectID object_ids[], int object_status[], int64_t num_objects)
Status SendWaitReply(int sock, const ObjectRequestMap &object_requests, int num_ready_objects)
Status ReadFetchRequest(uint8_t *data, size_t size, std::vector< ObjectID > &object_ids)
Status ReadCreateReply(uint8_t *data, size_t size, ObjectID *object_id, PlasmaObject *object, int *store_fd, int64_t *mmap_size)
Status ReadGetReply(uint8_t *data, size_t size, ObjectID object_ids[], PlasmaObject plasma_objects[], int64_t num_objects, std::vector< int > &store_fds, std::vector< int64_t > &mmap_sizes)
Status SendStatusReply(int sock, ObjectID object_ids[], int object_status[], int64_t num_objects)
Status SendDeleteReply(int sock, ObjectID object_id, int error)
Status SendReleaseReply(int sock, ObjectID object_id, int error)
Status PlasmaReceive(int sock, int64_t message_type, std::vector< uint8_t > *buffer)
Status SendCreateReply(int sock, ObjectID object_id, PlasmaObject *object, int error, int64_t mmap_size)
Status SendDeleteRequest(int sock, ObjectID object_id)
UniqueID ObjectID
Definition: common.h:63
Definition: status.h:93
Status SendStatusRequest(int sock, const ObjectID *object_ids, int64_t num_objects)
Status SendSealRequest(int sock, ObjectID object_id, unsigned char *digest)
Status SendGetRequest(int sock, const ObjectID *object_ids, int64_t num_objects, int64_t timeout_ms)
Status SendCreateRequest(int sock, ObjectID object_id, int64_t data_size, int64_t metadata_size, int device_num)
Status ReadWaitReply(uint8_t *data, size_t size, ObjectRequest object_requests[], int *num_ready_objects)
Status SendEvictReply(int sock, int64_t num_bytes)
Status ReadConnectReply(uint8_t *data, size_t size, int64_t *memory_capacity)
Status SendSealReply(int sock, ObjectID object_id, int error)
Status ReadConnectRequest(uint8_t *data, size_t size)
Status ReadDeleteRequest(uint8_t *data, size_t size, ObjectID *object_id)
Status SendConnectReply(int sock, int64_t memory_capacity)
Status SendReleaseRequest(int sock, ObjectID object_id)
Definition: client.h:35
Status SendEvictRequest(int sock, int64_t num_bytes)
int64_t ReadStatusReply_num_objects(uint8_t *data, size_t size)
Status ReadGetRequest(uint8_t *data, size_t size, std::vector< ObjectID > &object_ids, int64_t *timeout_ms)
Status ReadSealRequest(uint8_t *data, size_t size, ObjectID *object_id, unsigned char *digest)
Status ReadReleaseReply(uint8_t *data, size_t size, ObjectID *object_id)
Status ReadEvictRequest(uint8_t *data, size_t size, int64_t *num_bytes)
Status ReadAbortRequest(uint8_t *data, size_t size, ObjectID *object_id)
Status SendWaitRequest(int sock, ObjectRequest object_requests[], int64_t num_requests, int num_ready_objects, int64_t timeout_ms)
Status SendAbortReply(int sock, ObjectID object_id)
Status ReadStatusRequest(uint8_t *data, size_t size, ObjectID object_ids[], int64_t num_objects)
Status ReadEvictReply(uint8_t *data, size_t size, int64_t &num_bytes)
object_status
Definition: plasma.h:105
Status SendDataReply(int sock, ObjectID object_id, int64_t object_size, int64_t metadata_size)
Status ReadReleaseRequest(uint8_t *data, size_t size, ObjectID *object_id)
Status ReadContainsRequest(uint8_t *data, size_t size, ObjectID *object_id)
bool verify_flatbuffer(T *object, uint8_t *data, size_t size)
Definition: protocol.h:34
Status SendGetReply(int sock, ObjectID object_ids[], std::unordered_map< ObjectID, PlasmaObject, UniqueIDHasher > &plasma_objects, int64_t num_objects, const std::vector< int > &store_fds, const std::vector< int64_t > &mmap_sizes)
Status ReadDataRequest(uint8_t *data, size_t size, ObjectID *object_id, char **address, int *port)
Status SendContainsRequest(int sock, ObjectID object_id)
Status ReadSealReply(uint8_t *data, size_t size, ObjectID *object_id)
Status SendSubscribeRequest(int sock)
Status ReadDataReply(uint8_t *data, size_t size, ObjectID *object_id, int64_t *object_size, int64_t *metadata_size)
Status ReadContainsReply(uint8_t *data, size_t size, ObjectID *object_id, bool *has_object)
Status ReadAbortReply(uint8_t *data, size_t size, ObjectID *object_id)
Status SendContainsReply(int sock, ObjectID object_id, bool has_object)