arrow_flight/client.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::{
19 decode::FlightRecordBatchStream,
20 flight_service_client::FlightServiceClient,
21 gen::{CancelFlightInfoRequest, CancelFlightInfoResult, RenewFlightEndpointRequest},
22 trailers::extract_lazy_trailers,
23 Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightEndpoint, FlightInfo,
24 HandshakeRequest, PollInfo, PutResult, Ticket,
25};
26use arrow_schema::Schema;
27use bytes::Bytes;
28use futures::{
29 future::ready,
30 stream::{self, BoxStream},
31 Stream, StreamExt, TryStreamExt,
32};
33use prost::Message;
34use tonic::{metadata::MetadataMap, transport::Channel};
35
36use crate::error::{FlightError, Result};
37use crate::streams::{FallibleRequestStream, FallibleTonicResponseStream};
38
39/// A "Mid level" [Apache Arrow Flight](https://arrow.apache.org/docs/format/Flight.html) client.
40///
41/// [`FlightClient`] is intended as a convenience for interactions
42/// with Arrow Flight servers. For more direct control, such as access
43/// to the response headers, use [`FlightServiceClient`] directly
44/// via methods such as [`Self::inner`] or [`Self::into_inner`].
45///
46/// # Example:
47/// ```no_run
48/// # async fn run() {
49/// # use arrow_flight::FlightClient;
50/// # use bytes::Bytes;
51/// use tonic::transport::Channel;
52/// let channel = Channel::from_static("http://localhost:1234")
53/// .connect()
54/// .await
55/// .expect("error connecting");
56///
57/// let mut client = FlightClient::new(channel);
58///
59/// // Send 'Hi' bytes as the handshake request to the server
60/// let response = client
61/// .handshake(Bytes::from("Hi"))
62/// .await
63/// .expect("error handshaking");
64///
65/// // Expect the server responded with 'Ho'
66/// assert_eq!(response, Bytes::from("Ho"));
67/// # }
68/// ```
69#[derive(Debug)]
70pub struct FlightClient {
71 /// Optional grpc header metadata to include with each request
72 metadata: MetadataMap,
73
74 /// The inner client
75 inner: FlightServiceClient<Channel>,
76}
77
78impl FlightClient {
79 /// Creates a client client with the provided [`Channel`]
80 pub fn new(channel: Channel) -> Self {
81 Self::new_from_inner(FlightServiceClient::new(channel))
82 }
83
84 /// Creates a new higher level client with the provided lower level client
85 pub fn new_from_inner(inner: FlightServiceClient<Channel>) -> Self {
86 Self {
87 metadata: MetadataMap::new(),
88 inner,
89 }
90 }
91
92 /// Return a reference to gRPC metadata included with each request
93 pub fn metadata(&self) -> &MetadataMap {
94 &self.metadata
95 }
96
97 /// Return a reference to gRPC metadata included with each request
98 ///
99 /// These headers can be used, for example, to include
100 /// authorization or other application specific headers.
101 pub fn metadata_mut(&mut self) -> &mut MetadataMap {
102 &mut self.metadata
103 }
104
105 /// Add the specified header with value to all subsequent
106 /// requests. See [`Self::metadata_mut`] for fine grained control.
107 pub fn add_header(&mut self, key: &str, value: &str) -> Result<()> {
108 let key = tonic::metadata::MetadataKey::<_>::from_bytes(key.as_bytes())
109 .map_err(|e| FlightError::ExternalError(Box::new(e)))?;
110
111 let value = value
112 .parse()
113 .map_err(|e| FlightError::ExternalError(Box::new(e)))?;
114
115 // ignore previous value
116 self.metadata.insert(key, value);
117
118 Ok(())
119 }
120
121 /// Return a reference to the underlying tonic
122 /// [`FlightServiceClient`]
123 pub fn inner(&self) -> &FlightServiceClient<Channel> {
124 &self.inner
125 }
126
127 /// Return a mutable reference to the underlying tonic
128 /// [`FlightServiceClient`]
129 pub fn inner_mut(&mut self) -> &mut FlightServiceClient<Channel> {
130 &mut self.inner
131 }
132
133 /// Consume this client and return the underlying tonic
134 /// [`FlightServiceClient`]
135 pub fn into_inner(self) -> FlightServiceClient<Channel> {
136 self.inner
137 }
138
139 /// Perform an Arrow Flight handshake with the server, sending
140 /// `payload` as the [`HandshakeRequest`] payload and returning
141 /// the [`HandshakeResponse`](crate::HandshakeResponse)
142 /// bytes returned from the server
143 ///
144 /// See [`FlightClient`] docs for an example.
145 pub async fn handshake(&mut self, payload: impl Into<Bytes>) -> Result<Bytes> {
146 let request = HandshakeRequest {
147 protocol_version: 0,
148 payload: payload.into(),
149 };
150
151 // apply headers, etc
152 let request = self.make_request(stream::once(ready(request)));
153
154 let mut response_stream = self.inner.handshake(request).await?.into_inner();
155
156 if let Some(response) = response_stream.next().await.transpose()? {
157 // check if there is another response
158 if response_stream.next().await.is_some() {
159 return Err(FlightError::protocol(
160 "Got unexpected second response from handshake",
161 ));
162 }
163
164 Ok(response.payload)
165 } else {
166 Err(FlightError::protocol("No response from handshake"))
167 }
168 }
169
170 /// Make a `DoGet` call to the server with the provided ticket,
171 /// returning a [`FlightRecordBatchStream`] for reading
172 /// [`RecordBatch`](arrow_array::RecordBatch)es.
173 ///
174 /// # Note
175 ///
176 /// To access the returned [`FlightData`] use
177 /// [`FlightRecordBatchStream::into_inner()`]
178 ///
179 /// # Example:
180 /// ```no_run
181 /// # async fn run() {
182 /// # use bytes::Bytes;
183 /// # use arrow_flight::FlightClient;
184 /// # use arrow_flight::Ticket;
185 /// # use arrow_array::RecordBatch;
186 /// # use futures::stream::TryStreamExt;
187 /// # let channel: tonic::transport::Channel = unimplemented!();
188 /// # let ticket = Ticket { ticket: Bytes::from("foo") };
189 /// let mut client = FlightClient::new(channel);
190 ///
191 /// // Invoke a do_get request on the server with a previously
192 /// // received Ticket
193 ///
194 /// let response = client
195 /// .do_get(ticket)
196 /// .await
197 /// .expect("error invoking do_get");
198 ///
199 /// // Use try_collect to get the RecordBatches from the server
200 /// let batches: Vec<RecordBatch> = response
201 /// .try_collect()
202 /// .await
203 /// .expect("no stream errors");
204 /// # }
205 /// ```
206 pub async fn do_get(&mut self, ticket: Ticket) -> Result<FlightRecordBatchStream> {
207 let request = self.make_request(ticket);
208
209 let (md, response_stream, _ext) = self.inner.do_get(request).await?.into_parts();
210 let (response_stream, trailers) = extract_lazy_trailers(response_stream);
211
212 Ok(FlightRecordBatchStream::new_from_flight_data(
213 response_stream.map_err(FlightError::Tonic),
214 )
215 .with_headers(md)
216 .with_trailers(trailers))
217 }
218
219 /// Make a `GetFlightInfo` call to the server with the provided
220 /// [`FlightDescriptor`] and return the [`FlightInfo`] from the
221 /// server. The [`FlightInfo`] can be used with [`Self::do_get`]
222 /// to retrieve the requested batches.
223 ///
224 /// # Example:
225 /// ```no_run
226 /// # async fn run() {
227 /// # use arrow_flight::FlightClient;
228 /// # use arrow_flight::FlightDescriptor;
229 /// # let channel: tonic::transport::Channel = unimplemented!();
230 /// let mut client = FlightClient::new(channel);
231 ///
232 /// // Send a 'CMD' request to the server
233 /// let request = FlightDescriptor::new_cmd(b"MOAR DATA".to_vec());
234 /// let flight_info = client
235 /// .get_flight_info(request)
236 /// .await
237 /// .expect("error handshaking");
238 ///
239 /// // retrieve the first endpoint from the returned flight info
240 /// let ticket = flight_info
241 /// .endpoint[0]
242 /// // Extract the ticket
243 /// .ticket
244 /// .clone()
245 /// .expect("expected ticket");
246 ///
247 /// // Retrieve the corresponding RecordBatch stream with do_get
248 /// let data = client
249 /// .do_get(ticket)
250 /// .await
251 /// .expect("error fetching data");
252 /// # }
253 /// ```
254 pub async fn get_flight_info(&mut self, descriptor: FlightDescriptor) -> Result<FlightInfo> {
255 let request = self.make_request(descriptor);
256
257 let response = self.inner.get_flight_info(request).await?.into_inner();
258 Ok(response)
259 }
260
261 /// Make a `PollFlightInfo` call to the server with the provided
262 /// [`FlightDescriptor`] and return the [`PollInfo`] from the
263 /// server.
264 ///
265 /// The `info` field of the [`PollInfo`] can be used with
266 /// [`Self::do_get`] to retrieve the requested batches.
267 ///
268 /// If the `flight_descriptor` field of the [`PollInfo`] is
269 /// `None` then the `info` field represents the complete results.
270 ///
271 /// If the `flight_descriptor` field is some [`FlightDescriptor`]
272 /// then the `info` field has incomplete results, and the client
273 /// should call this method again with the new `flight_descriptor`
274 /// to get the updated status.
275 ///
276 /// The `expiration_time`, if set, represents the expiration time
277 /// of the `flight_descriptor`, after which the server may not accept
278 /// this retry descriptor and may cancel the query.
279 ///
280 /// # Example:
281 /// ```no_run
282 /// # async fn run() {
283 /// # use arrow_flight::FlightClient;
284 /// # use arrow_flight::FlightDescriptor;
285 /// # let channel: tonic::transport::Channel = unimplemented!();
286 /// let mut client = FlightClient::new(channel);
287 ///
288 /// // Send a 'CMD' request to the server
289 /// let request = FlightDescriptor::new_cmd(b"MOAR DATA".to_vec());
290 /// let poll_info = client
291 /// .poll_flight_info(request)
292 /// .await
293 /// .expect("error handshaking");
294 ///
295 /// // retrieve the first endpoint from the returned poll info
296 /// let ticket = poll_info
297 /// .info
298 /// .expect("expected flight info")
299 /// .endpoint[0]
300 /// // Extract the ticket
301 /// .ticket
302 /// .clone()
303 /// .expect("expected ticket");
304 ///
305 /// // Retrieve the corresponding RecordBatch stream with do_get
306 /// let data = client
307 /// .do_get(ticket)
308 /// .await
309 /// .expect("error fetching data");
310 /// # }
311 /// ```
312 pub async fn poll_flight_info(&mut self, descriptor: FlightDescriptor) -> Result<PollInfo> {
313 let request = self.make_request(descriptor);
314
315 let response = self.inner.poll_flight_info(request).await?.into_inner();
316 Ok(response)
317 }
318
319 /// Make a `DoPut` call to the server with the provided
320 /// [`Stream`] of [`FlightData`] and returning a
321 /// stream of [`PutResult`].
322 ///
323 /// # Note
324 ///
325 /// The input stream is [`Result`] so that this can be connected
326 /// to a streaming data source, such as [`FlightDataEncoder`](crate::encode::FlightDataEncoder),
327 /// without having to buffer. If the input stream returns an error
328 /// that error will not be sent to the server, instead it will be
329 /// placed into the result stream and the server connection
330 /// terminated.
331 ///
332 /// # Example:
333 /// ```no_run
334 /// # async fn run() {
335 /// # use futures::{TryStreamExt, StreamExt};
336 /// # use std::sync::Arc;
337 /// # use arrow_array::UInt64Array;
338 /// # use arrow_array::RecordBatch;
339 /// # use arrow_flight::{FlightClient, FlightDescriptor, PutResult};
340 /// # use arrow_flight::encode::FlightDataEncoderBuilder;
341 /// # let batch = RecordBatch::try_from_iter(vec![
342 /// # ("col2", Arc::new(UInt64Array::from_iter([10, 23, 33])) as _)
343 /// # ]).unwrap();
344 /// # let channel: tonic::transport::Channel = unimplemented!();
345 /// let mut client = FlightClient::new(channel);
346 ///
347 /// // encode the batch as a stream of `FlightData`
348 /// let flight_data_stream = FlightDataEncoderBuilder::new()
349 /// .build(futures::stream::iter(vec![Ok(batch)]));
350 ///
351 /// // send the stream and get the results as `PutResult`
352 /// let response: Vec<PutResult>= client
353 /// .do_put(flight_data_stream)
354 /// .await
355 /// .unwrap()
356 /// .try_collect() // use TryStreamExt to collect stream
357 /// .await
358 /// .expect("error calling do_put");
359 /// # }
360 /// ```
361 pub async fn do_put<S: Stream<Item = Result<FlightData>> + Send + 'static>(
362 &mut self,
363 request: S,
364 ) -> Result<BoxStream<'static, Result<PutResult>>> {
365 let (sender, receiver) = futures::channel::oneshot::channel();
366
367 // Intercepts client errors and sends them to the oneshot channel above
368 let request = Box::pin(request); // Pin to heap
369 let request_stream = FallibleRequestStream::new(sender, request);
370
371 let request = self.make_request(request_stream);
372 let response_stream = self.inner.do_put(request).await?.into_inner();
373
374 // Forwards errors from the error oneshot with priority over responses from server
375 let response_stream = Box::pin(response_stream);
376 let error_stream = FallibleTonicResponseStream::new(receiver, response_stream);
377
378 // combine the response from the server and any error from the client
379 Ok(error_stream.boxed())
380 }
381
382 /// Make a `DoExchange` call to the server with the provided
383 /// [`Stream`] of [`FlightData`] and returning a
384 /// stream of [`FlightData`].
385 ///
386 /// # Example:
387 /// ```no_run
388 /// # async fn run() {
389 /// # use futures::{TryStreamExt, StreamExt};
390 /// # use std::sync::Arc;
391 /// # use arrow_array::UInt64Array;
392 /// # use arrow_array::RecordBatch;
393 /// # use arrow_flight::{FlightClient, FlightDescriptor, PutResult};
394 /// # use arrow_flight::encode::FlightDataEncoderBuilder;
395 /// # let batch = RecordBatch::try_from_iter(vec![
396 /// # ("col2", Arc::new(UInt64Array::from_iter([10, 23, 33])) as _)
397 /// # ]).unwrap();
398 /// # let channel: tonic::transport::Channel = unimplemented!();
399 /// let mut client = FlightClient::new(channel);
400 ///
401 /// // encode the batch as a stream of `FlightData`
402 /// let flight_data_stream = FlightDataEncoderBuilder::new()
403 /// .build(futures::stream::iter(vec![Ok(batch)]));
404 ///
405 /// // send the stream and get the results as `RecordBatches`
406 /// let response: Vec<RecordBatch> = client
407 /// .do_exchange(flight_data_stream)
408 /// .await
409 /// .unwrap()
410 /// .try_collect() // use TryStreamExt to collect stream
411 /// .await
412 /// .expect("error calling do_exchange");
413 /// # }
414 /// ```
415 pub async fn do_exchange<S: Stream<Item = Result<FlightData>> + Send + 'static>(
416 &mut self,
417 request: S,
418 ) -> Result<FlightRecordBatchStream> {
419 let (sender, receiver) = futures::channel::oneshot::channel();
420
421 let request = Box::pin(request);
422 // Intercepts client errors and sends them to the oneshot channel above
423 let request_stream = FallibleRequestStream::new(sender, request);
424
425 let request = self.make_request(request_stream);
426 let response_stream = self.inner.do_exchange(request).await?.into_inner();
427
428 let response_stream = Box::pin(response_stream);
429 let error_stream = FallibleTonicResponseStream::new(receiver, response_stream);
430
431 // combine the response from the server and any error from the client
432 Ok(FlightRecordBatchStream::new_from_flight_data(error_stream))
433 }
434
435 /// Make a `ListFlights` call to the server with the provided
436 /// criteria and returning a [`Stream`] of [`FlightInfo`].
437 ///
438 /// # Example:
439 /// ```no_run
440 /// # async fn run() {
441 /// # use futures::TryStreamExt;
442 /// # use bytes::Bytes;
443 /// # use arrow_flight::{FlightInfo, FlightClient};
444 /// # let channel: tonic::transport::Channel = unimplemented!();
445 /// let mut client = FlightClient::new(channel);
446 ///
447 /// // Send 'Name=Foo' bytes as the "expression" to the server
448 /// // and gather the returned FlightInfo
449 /// let responses: Vec<FlightInfo> = client
450 /// .list_flights(Bytes::from("Name=Foo"))
451 /// .await
452 /// .expect("error listing flights")
453 /// .try_collect() // use TryStreamExt to collect stream
454 /// .await
455 /// .expect("error gathering flights");
456 /// # }
457 /// ```
458 pub async fn list_flights(
459 &mut self,
460 expression: impl Into<Bytes>,
461 ) -> Result<BoxStream<'static, Result<FlightInfo>>> {
462 let request = Criteria {
463 expression: expression.into(),
464 };
465
466 let request = self.make_request(request);
467
468 let response = self
469 .inner
470 .list_flights(request)
471 .await?
472 .into_inner()
473 .map_err(FlightError::Tonic);
474
475 Ok(response.boxed())
476 }
477
478 /// Make a `GetSchema` call to the server with the provided
479 /// [`FlightDescriptor`] and returning the associated [`Schema`].
480 ///
481 /// # Example:
482 /// ```no_run
483 /// # async fn run() {
484 /// # use bytes::Bytes;
485 /// # use arrow_flight::{FlightDescriptor, FlightClient};
486 /// # use arrow_schema::Schema;
487 /// # let channel: tonic::transport::Channel = unimplemented!();
488 /// let mut client = FlightClient::new(channel);
489 ///
490 /// // Request the schema result of a 'CMD' request to the server
491 /// let request = FlightDescriptor::new_cmd(b"MOAR DATA".to_vec());
492 ///
493 /// let schema: Schema = client
494 /// .get_schema(request)
495 /// .await
496 /// .expect("error making request");
497 /// # }
498 /// ```
499 pub async fn get_schema(&mut self, flight_descriptor: FlightDescriptor) -> Result<Schema> {
500 let request = self.make_request(flight_descriptor);
501
502 let schema_result = self.inner.get_schema(request).await?.into_inner();
503
504 // attempt decode from IPC
505 let schema: Schema = schema_result.try_into()?;
506
507 Ok(schema)
508 }
509
510 /// Make a `ListActions` call to the server and returning a
511 /// [`Stream`] of [`ActionType`].
512 ///
513 /// # Example:
514 /// ```no_run
515 /// # async fn run() {
516 /// # use futures::TryStreamExt;
517 /// # use arrow_flight::{ActionType, FlightClient};
518 /// # use arrow_schema::Schema;
519 /// # let channel: tonic::transport::Channel = unimplemented!();
520 /// let mut client = FlightClient::new(channel);
521 ///
522 /// // List available actions on the server:
523 /// let actions: Vec<ActionType> = client
524 /// .list_actions()
525 /// .await
526 /// .expect("error listing actions")
527 /// .try_collect() // use TryStreamExt to collect stream
528 /// .await
529 /// .expect("error gathering actions");
530 /// # }
531 /// ```
532 pub async fn list_actions(&mut self) -> Result<BoxStream<'static, Result<ActionType>>> {
533 let request = self.make_request(Empty {});
534
535 let action_stream = self
536 .inner
537 .list_actions(request)
538 .await?
539 .into_inner()
540 .map_err(FlightError::Tonic);
541
542 Ok(action_stream.boxed())
543 }
544
545 /// Make a `DoAction` call to the server and returning a
546 /// [`Stream`] of opaque [`Bytes`].
547 ///
548 /// # Example:
549 /// ```no_run
550 /// # async fn run() {
551 /// # use bytes::Bytes;
552 /// # use futures::TryStreamExt;
553 /// # use arrow_flight::{Action, FlightClient};
554 /// # use arrow_schema::Schema;
555 /// # let channel: tonic::transport::Channel = unimplemented!();
556 /// let mut client = FlightClient::new(channel);
557 ///
558 /// let request = Action::new("my_action", "the body");
559 ///
560 /// // Make a request to run the action on the server
561 /// let results: Vec<Bytes> = client
562 /// .do_action(request)
563 /// .await
564 /// .expect("error executing acton")
565 /// .try_collect() // use TryStreamExt to collect stream
566 /// .await
567 /// .expect("error gathering action results");
568 /// # }
569 /// ```
570 pub async fn do_action(&mut self, action: Action) -> Result<BoxStream<'static, Result<Bytes>>> {
571 let request = self.make_request(action);
572
573 let result_stream = self
574 .inner
575 .do_action(request)
576 .await?
577 .into_inner()
578 .map_err(FlightError::Tonic)
579 .map(|r| {
580 r.map(|r| {
581 // unwrap inner bytes
582 let crate::Result { body } = r;
583 body
584 })
585 });
586
587 Ok(result_stream.boxed())
588 }
589
590 /// Make a `CancelFlightInfo` call to the server and return
591 /// a [`CancelFlightInfoResult`].
592 ///
593 /// # Example:
594 /// ```no_run
595 /// # async fn run() {
596 /// # use arrow_flight::{CancelFlightInfoRequest, FlightClient, FlightDescriptor};
597 /// # let channel: tonic::transport::Channel = unimplemented!();
598 /// let mut client = FlightClient::new(channel);
599 ///
600 /// // Send a 'CMD' request to the server
601 /// let request = FlightDescriptor::new_cmd(b"MOAR DATA".to_vec());
602 /// let flight_info = client
603 /// .get_flight_info(request)
604 /// .await
605 /// .expect("error handshaking");
606 ///
607 /// // Cancel the query
608 /// let request = CancelFlightInfoRequest::new(flight_info);
609 /// let result = client
610 /// .cancel_flight_info(request)
611 /// .await
612 /// .expect("error cancelling");
613 /// # }
614 /// ```
615 pub async fn cancel_flight_info(
616 &mut self,
617 request: CancelFlightInfoRequest,
618 ) -> Result<CancelFlightInfoResult> {
619 let action = Action::new("CancelFlightInfo", request.encode_to_vec());
620 let response = self.do_action(action).await?.try_next().await?;
621 let response = response.ok_or(FlightError::protocol(
622 "Received no response for cancel_flight_info call",
623 ))?;
624 CancelFlightInfoResult::decode(response)
625 .map_err(|e| FlightError::DecodeError(e.to_string()))
626 }
627
628 /// Make a `RenewFlightEndpoint` call to the server and return
629 /// the renewed [`FlightEndpoint`].
630 ///
631 /// # Example:
632 /// ```no_run
633 /// # async fn run() {
634 /// # use arrow_flight::{FlightClient, FlightDescriptor, RenewFlightEndpointRequest};
635 /// # let channel: tonic::transport::Channel = unimplemented!();
636 /// let mut client = FlightClient::new(channel);
637 ///
638 /// // Send a 'CMD' request to the server
639 /// let request = FlightDescriptor::new_cmd(b"MOAR DATA".to_vec());
640 /// let flight_endpoint = client
641 /// .get_flight_info(request)
642 /// .await
643 /// .expect("error handshaking")
644 /// .endpoint[0];
645 ///
646 /// // Renew the endpoint
647 /// let request = RenewFlightEndpointRequest::new(flight_endpoint);
648 /// let flight_endpoint = client
649 /// .renew_flight_endpoint(request)
650 /// .await
651 /// .expect("error renewing");
652 /// # }
653 /// ```
654 pub async fn renew_flight_endpoint(
655 &mut self,
656 request: RenewFlightEndpointRequest,
657 ) -> Result<FlightEndpoint> {
658 let action = Action::new("RenewFlightEndpoint", request.encode_to_vec());
659 let response = self.do_action(action).await?.try_next().await?;
660 let response = response.ok_or(FlightError::protocol(
661 "Received no response for renew_flight_endpoint call",
662 ))?;
663 FlightEndpoint::decode(response).map_err(|e| FlightError::DecodeError(e.to_string()))
664 }
665
666 /// return a Request, adding any configured metadata
667 fn make_request<T>(&self, t: T) -> tonic::Request<T> {
668 // Pass along metadata
669 let mut request = tonic::Request::new(t);
670 *request.metadata_mut() = self.metadata.clone();
671 request
672 }
673}