arrow_flight/
client.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use crate::{
19    decode::FlightRecordBatchStream,
20    flight_service_client::FlightServiceClient,
21    gen::{CancelFlightInfoRequest, CancelFlightInfoResult, RenewFlightEndpointRequest},
22    trailers::extract_lazy_trailers,
23    Action, ActionType, Criteria, Empty, FlightData, FlightDescriptor, FlightEndpoint, FlightInfo,
24    HandshakeRequest, PollInfo, PutResult, Ticket,
25};
26use arrow_schema::Schema;
27use bytes::Bytes;
28use futures::{
29    future::ready,
30    stream::{self, BoxStream},
31    Stream, StreamExt, TryStreamExt,
32};
33use prost::Message;
34use tonic::{metadata::MetadataMap, transport::Channel};
35
36use crate::error::{FlightError, Result};
37use crate::streams::{FallibleRequestStream, FallibleTonicResponseStream};
38
39/// A "Mid level" [Apache Arrow Flight](https://arrow.apache.org/docs/format/Flight.html) client.
40///
41/// [`FlightClient`] is intended as a convenience for interactions
42/// with Arrow Flight servers. For more direct control, such as access
43/// to the response headers, use  [`FlightServiceClient`] directly
44/// via methods such as [`Self::inner`] or [`Self::into_inner`].
45///
46/// # Example:
47/// ```no_run
48/// # async fn run() {
49/// # use arrow_flight::FlightClient;
50/// # use bytes::Bytes;
51/// use tonic::transport::Channel;
52/// let channel = Channel::from_static("http://localhost:1234")
53///   .connect()
54///   .await
55///   .expect("error connecting");
56///
57/// let mut client = FlightClient::new(channel);
58///
59/// // Send 'Hi' bytes as the handshake request to the server
60/// let response = client
61///   .handshake(Bytes::from("Hi"))
62///   .await
63///   .expect("error handshaking");
64///
65/// // Expect the server responded with 'Ho'
66/// assert_eq!(response, Bytes::from("Ho"));
67/// # }
68/// ```
69#[derive(Debug)]
70pub struct FlightClient {
71    /// Optional grpc header metadata to include with each request
72    metadata: MetadataMap,
73
74    /// The inner client
75    inner: FlightServiceClient<Channel>,
76}
77
78impl FlightClient {
79    /// Creates a client client with the provided [`Channel`]
80    pub fn new(channel: Channel) -> Self {
81        Self::new_from_inner(FlightServiceClient::new(channel))
82    }
83
84    /// Creates a new higher level client with the provided lower level client
85    pub fn new_from_inner(inner: FlightServiceClient<Channel>) -> Self {
86        Self {
87            metadata: MetadataMap::new(),
88            inner,
89        }
90    }
91
92    /// Return a reference to gRPC metadata included with each request
93    pub fn metadata(&self) -> &MetadataMap {
94        &self.metadata
95    }
96
97    /// Return a reference to gRPC metadata included with each request
98    ///
99    /// These headers can be used, for example, to include
100    /// authorization or other application specific headers.
101    pub fn metadata_mut(&mut self) -> &mut MetadataMap {
102        &mut self.metadata
103    }
104
105    /// Add the specified header with value to all subsequent
106    /// requests. See [`Self::metadata_mut`] for fine grained control.
107    pub fn add_header(&mut self, key: &str, value: &str) -> Result<()> {
108        let key = tonic::metadata::MetadataKey::<_>::from_bytes(key.as_bytes())
109            .map_err(|e| FlightError::ExternalError(Box::new(e)))?;
110
111        let value = value
112            .parse()
113            .map_err(|e| FlightError::ExternalError(Box::new(e)))?;
114
115        // ignore previous value
116        self.metadata.insert(key, value);
117
118        Ok(())
119    }
120
121    /// Return a reference to the underlying tonic
122    /// [`FlightServiceClient`]
123    pub fn inner(&self) -> &FlightServiceClient<Channel> {
124        &self.inner
125    }
126
127    /// Return a mutable reference to the underlying tonic
128    /// [`FlightServiceClient`]
129    pub fn inner_mut(&mut self) -> &mut FlightServiceClient<Channel> {
130        &mut self.inner
131    }
132
133    /// Consume this client and return the underlying tonic
134    /// [`FlightServiceClient`]
135    pub fn into_inner(self) -> FlightServiceClient<Channel> {
136        self.inner
137    }
138
139    /// Perform an Arrow Flight handshake with the server, sending
140    /// `payload` as the [`HandshakeRequest`] payload and returning
141    /// the [`HandshakeResponse`](crate::HandshakeResponse)
142    /// bytes returned from the server
143    ///
144    /// See [`FlightClient`] docs for an example.
145    pub async fn handshake(&mut self, payload: impl Into<Bytes>) -> Result<Bytes> {
146        let request = HandshakeRequest {
147            protocol_version: 0,
148            payload: payload.into(),
149        };
150
151        // apply headers, etc
152        let request = self.make_request(stream::once(ready(request)));
153
154        let mut response_stream = self.inner.handshake(request).await?.into_inner();
155
156        if let Some(response) = response_stream.next().await.transpose()? {
157            // check if there is another response
158            if response_stream.next().await.is_some() {
159                return Err(FlightError::protocol(
160                    "Got unexpected second response from handshake",
161                ));
162            }
163
164            Ok(response.payload)
165        } else {
166            Err(FlightError::protocol("No response from handshake"))
167        }
168    }
169
170    /// Make a `DoGet` call to the server with the provided ticket,
171    /// returning a [`FlightRecordBatchStream`] for reading
172    /// [`RecordBatch`](arrow_array::RecordBatch)es.
173    ///
174    /// # Note
175    ///
176    /// To access the returned [`FlightData`] use
177    /// [`FlightRecordBatchStream::into_inner()`]
178    ///
179    /// # Example:
180    /// ```no_run
181    /// # async fn run() {
182    /// # use bytes::Bytes;
183    /// # use arrow_flight::FlightClient;
184    /// # use arrow_flight::Ticket;
185    /// # use arrow_array::RecordBatch;
186    /// # use futures::stream::TryStreamExt;
187    /// # let channel: tonic::transport::Channel = unimplemented!();
188    /// # let ticket = Ticket { ticket: Bytes::from("foo") };
189    /// let mut client = FlightClient::new(channel);
190    ///
191    /// // Invoke a do_get request on the server with a previously
192    /// // received Ticket
193    ///
194    /// let response = client
195    ///    .do_get(ticket)
196    ///    .await
197    ///    .expect("error invoking do_get");
198    ///
199    /// // Use try_collect to get the RecordBatches from the server
200    /// let batches: Vec<RecordBatch> = response
201    ///    .try_collect()
202    ///    .await
203    ///    .expect("no stream errors");
204    /// # }
205    /// ```
206    pub async fn do_get(&mut self, ticket: Ticket) -> Result<FlightRecordBatchStream> {
207        let request = self.make_request(ticket);
208
209        let (md, response_stream, _ext) = self.inner.do_get(request).await?.into_parts();
210        let (response_stream, trailers) = extract_lazy_trailers(response_stream);
211
212        Ok(FlightRecordBatchStream::new_from_flight_data(
213            response_stream.map_err(FlightError::Tonic),
214        )
215        .with_headers(md)
216        .with_trailers(trailers))
217    }
218
219    /// Make a `GetFlightInfo` call to the server with the provided
220    /// [`FlightDescriptor`] and return the [`FlightInfo`] from the
221    /// server. The [`FlightInfo`] can be used with [`Self::do_get`]
222    /// to retrieve the requested batches.
223    ///
224    /// # Example:
225    /// ```no_run
226    /// # async fn run() {
227    /// # use arrow_flight::FlightClient;
228    /// # use arrow_flight::FlightDescriptor;
229    /// # let channel: tonic::transport::Channel = unimplemented!();
230    /// let mut client = FlightClient::new(channel);
231    ///
232    /// // Send a 'CMD' request to the server
233    /// let request = FlightDescriptor::new_cmd(b"MOAR DATA".to_vec());
234    /// let flight_info = client
235    ///   .get_flight_info(request)
236    ///   .await
237    ///   .expect("error handshaking");
238    ///
239    /// // retrieve the first endpoint from the returned flight info
240    /// let ticket = flight_info
241    ///   .endpoint[0]
242    ///   // Extract the ticket
243    ///   .ticket
244    ///   .clone()
245    ///   .expect("expected ticket");
246    ///
247    /// // Retrieve the corresponding RecordBatch stream with do_get
248    /// let data = client
249    ///   .do_get(ticket)
250    ///   .await
251    ///   .expect("error fetching data");
252    /// # }
253    /// ```
254    pub async fn get_flight_info(&mut self, descriptor: FlightDescriptor) -> Result<FlightInfo> {
255        let request = self.make_request(descriptor);
256
257        let response = self.inner.get_flight_info(request).await?.into_inner();
258        Ok(response)
259    }
260
261    /// Make a `PollFlightInfo` call to the server with the provided
262    /// [`FlightDescriptor`] and return the [`PollInfo`] from the
263    /// server.
264    ///
265    /// The `info` field of the [`PollInfo`] can be used with
266    /// [`Self::do_get`] to retrieve the requested batches.
267    ///
268    /// If the `flight_descriptor` field of the [`PollInfo`] is
269    /// `None` then the `info` field represents the complete results.
270    ///
271    /// If the `flight_descriptor` field is some [`FlightDescriptor`]
272    /// then the `info` field has incomplete results, and the client
273    /// should call this method again with the new `flight_descriptor`
274    /// to get the updated status.
275    ///
276    /// The `expiration_time`, if set, represents the expiration time
277    /// of the `flight_descriptor`, after which the server may not accept
278    /// this retry descriptor and may cancel the query.
279    ///
280    /// # Example:
281    /// ```no_run
282    /// # async fn run() {
283    /// # use arrow_flight::FlightClient;
284    /// # use arrow_flight::FlightDescriptor;
285    /// # let channel: tonic::transport::Channel = unimplemented!();
286    /// let mut client = FlightClient::new(channel);
287    ///
288    /// // Send a 'CMD' request to the server
289    /// let request = FlightDescriptor::new_cmd(b"MOAR DATA".to_vec());
290    /// let poll_info = client
291    ///   .poll_flight_info(request)
292    ///   .await
293    ///   .expect("error handshaking");
294    ///
295    /// // retrieve the first endpoint from the returned poll info
296    /// let ticket = poll_info
297    ///   .info
298    ///   .expect("expected flight info")
299    ///   .endpoint[0]
300    ///   // Extract the ticket
301    ///   .ticket
302    ///   .clone()
303    ///   .expect("expected ticket");
304    ///
305    /// // Retrieve the corresponding RecordBatch stream with do_get
306    /// let data = client
307    ///   .do_get(ticket)
308    ///   .await
309    ///   .expect("error fetching data");
310    /// # }
311    /// ```
312    pub async fn poll_flight_info(&mut self, descriptor: FlightDescriptor) -> Result<PollInfo> {
313        let request = self.make_request(descriptor);
314
315        let response = self.inner.poll_flight_info(request).await?.into_inner();
316        Ok(response)
317    }
318
319    /// Make a `DoPut` call to the server with the provided
320    /// [`Stream`] of [`FlightData`] and returning a
321    /// stream of [`PutResult`].
322    ///
323    /// # Note
324    ///
325    /// The input stream is [`Result`] so that this can be connected
326    /// to a streaming data source, such as [`FlightDataEncoder`](crate::encode::FlightDataEncoder),
327    /// without having to buffer. If the input stream returns an error
328    /// that error will not be sent to the server, instead it will be
329    /// placed into the result stream and the server connection
330    /// terminated.
331    ///
332    /// # Example:
333    /// ```no_run
334    /// # async fn run() {
335    /// # use futures::{TryStreamExt, StreamExt};
336    /// # use std::sync::Arc;
337    /// # use arrow_array::UInt64Array;
338    /// # use arrow_array::RecordBatch;
339    /// # use arrow_flight::{FlightClient, FlightDescriptor, PutResult};
340    /// # use arrow_flight::encode::FlightDataEncoderBuilder;
341    /// # let batch = RecordBatch::try_from_iter(vec![
342    /// #  ("col2", Arc::new(UInt64Array::from_iter([10, 23, 33])) as _)
343    /// # ]).unwrap();
344    /// # let channel: tonic::transport::Channel = unimplemented!();
345    /// let mut client = FlightClient::new(channel);
346    ///
347    /// // encode the batch as a stream of `FlightData`
348    /// let flight_data_stream = FlightDataEncoderBuilder::new()
349    ///   .build(futures::stream::iter(vec![Ok(batch)]));
350    ///
351    /// // send the stream and get the results as `PutResult`
352    /// let response: Vec<PutResult>= client
353    ///   .do_put(flight_data_stream)
354    ///   .await
355    ///   .unwrap()
356    ///   .try_collect() // use TryStreamExt to collect stream
357    ///   .await
358    ///   .expect("error calling do_put");
359    /// # }
360    /// ```
361    pub async fn do_put<S: Stream<Item = Result<FlightData>> + Send + 'static>(
362        &mut self,
363        request: S,
364    ) -> Result<BoxStream<'static, Result<PutResult>>> {
365        let (sender, receiver) = futures::channel::oneshot::channel();
366
367        // Intercepts client errors and sends them to the oneshot channel above
368        let request = Box::pin(request); // Pin to heap
369        let request_stream = FallibleRequestStream::new(sender, request);
370
371        let request = self.make_request(request_stream);
372        let response_stream = self.inner.do_put(request).await?.into_inner();
373
374        // Forwards errors from the error oneshot with priority over responses from server
375        let response_stream = Box::pin(response_stream);
376        let error_stream = FallibleTonicResponseStream::new(receiver, response_stream);
377
378        // combine the response from the server and any error from the client
379        Ok(error_stream.boxed())
380    }
381
382    /// Make a `DoExchange` call to the server with the provided
383    /// [`Stream`] of [`FlightData`] and returning a
384    /// stream of [`FlightData`].
385    ///
386    /// # Example:
387    /// ```no_run
388    /// # async fn run() {
389    /// # use futures::{TryStreamExt, StreamExt};
390    /// # use std::sync::Arc;
391    /// # use arrow_array::UInt64Array;
392    /// # use arrow_array::RecordBatch;
393    /// # use arrow_flight::{FlightClient, FlightDescriptor, PutResult};
394    /// # use arrow_flight::encode::FlightDataEncoderBuilder;
395    /// # let batch = RecordBatch::try_from_iter(vec![
396    /// #  ("col2", Arc::new(UInt64Array::from_iter([10, 23, 33])) as _)
397    /// # ]).unwrap();
398    /// # let channel: tonic::transport::Channel = unimplemented!();
399    /// let mut client = FlightClient::new(channel);
400    ///
401    /// // encode the batch as a stream of `FlightData`
402    /// let flight_data_stream = FlightDataEncoderBuilder::new()
403    ///   .build(futures::stream::iter(vec![Ok(batch)]));
404    ///
405    /// // send the stream and get the results as `RecordBatches`
406    /// let response: Vec<RecordBatch> = client
407    ///   .do_exchange(flight_data_stream)
408    ///   .await
409    ///   .unwrap()
410    ///   .try_collect() // use TryStreamExt to collect stream
411    ///   .await
412    ///   .expect("error calling do_exchange");
413    /// # }
414    /// ```
415    pub async fn do_exchange<S: Stream<Item = Result<FlightData>> + Send + 'static>(
416        &mut self,
417        request: S,
418    ) -> Result<FlightRecordBatchStream> {
419        let (sender, receiver) = futures::channel::oneshot::channel();
420
421        let request = Box::pin(request);
422        // Intercepts client errors and sends them to the oneshot channel above
423        let request_stream = FallibleRequestStream::new(sender, request);
424
425        let request = self.make_request(request_stream);
426        let response_stream = self.inner.do_exchange(request).await?.into_inner();
427
428        let response_stream = Box::pin(response_stream);
429        let error_stream = FallibleTonicResponseStream::new(receiver, response_stream);
430
431        // combine the response from the server and any error from the client
432        Ok(FlightRecordBatchStream::new_from_flight_data(error_stream))
433    }
434
435    /// Make a `ListFlights` call to the server with the provided
436    /// criteria and returning a [`Stream`] of [`FlightInfo`].
437    ///
438    /// # Example:
439    /// ```no_run
440    /// # async fn run() {
441    /// # use futures::TryStreamExt;
442    /// # use bytes::Bytes;
443    /// # use arrow_flight::{FlightInfo, FlightClient};
444    /// # let channel: tonic::transport::Channel = unimplemented!();
445    /// let mut client = FlightClient::new(channel);
446    ///
447    /// // Send 'Name=Foo' bytes as the "expression" to the server
448    /// // and gather the returned FlightInfo
449    /// let responses: Vec<FlightInfo> = client
450    ///   .list_flights(Bytes::from("Name=Foo"))
451    ///   .await
452    ///   .expect("error listing flights")
453    ///   .try_collect() // use TryStreamExt to collect stream
454    ///   .await
455    ///   .expect("error gathering flights");
456    /// # }
457    /// ```
458    pub async fn list_flights(
459        &mut self,
460        expression: impl Into<Bytes>,
461    ) -> Result<BoxStream<'static, Result<FlightInfo>>> {
462        let request = Criteria {
463            expression: expression.into(),
464        };
465
466        let request = self.make_request(request);
467
468        let response = self
469            .inner
470            .list_flights(request)
471            .await?
472            .into_inner()
473            .map_err(FlightError::Tonic);
474
475        Ok(response.boxed())
476    }
477
478    /// Make a `GetSchema` call to the server with the provided
479    /// [`FlightDescriptor`] and returning the associated [`Schema`].
480    ///
481    /// # Example:
482    /// ```no_run
483    /// # async fn run() {
484    /// # use bytes::Bytes;
485    /// # use arrow_flight::{FlightDescriptor, FlightClient};
486    /// # use arrow_schema::Schema;
487    /// # let channel: tonic::transport::Channel = unimplemented!();
488    /// let mut client = FlightClient::new(channel);
489    ///
490    /// // Request the schema result of a 'CMD' request to the server
491    /// let request = FlightDescriptor::new_cmd(b"MOAR DATA".to_vec());
492    ///
493    /// let schema: Schema = client
494    ///   .get_schema(request)
495    ///   .await
496    ///   .expect("error making request");
497    /// # }
498    /// ```
499    pub async fn get_schema(&mut self, flight_descriptor: FlightDescriptor) -> Result<Schema> {
500        let request = self.make_request(flight_descriptor);
501
502        let schema_result = self.inner.get_schema(request).await?.into_inner();
503
504        // attempt decode from IPC
505        let schema: Schema = schema_result.try_into()?;
506
507        Ok(schema)
508    }
509
510    /// Make a `ListActions` call to the server and returning a
511    /// [`Stream`] of [`ActionType`].
512    ///
513    /// # Example:
514    /// ```no_run
515    /// # async fn run() {
516    /// # use futures::TryStreamExt;
517    /// # use arrow_flight::{ActionType, FlightClient};
518    /// # use arrow_schema::Schema;
519    /// # let channel: tonic::transport::Channel = unimplemented!();
520    /// let mut client = FlightClient::new(channel);
521    ///
522    /// // List available actions on the server:
523    /// let actions: Vec<ActionType> = client
524    ///   .list_actions()
525    ///   .await
526    ///   .expect("error listing actions")
527    ///   .try_collect() // use TryStreamExt to collect stream
528    ///   .await
529    ///   .expect("error gathering actions");
530    /// # }
531    /// ```
532    pub async fn list_actions(&mut self) -> Result<BoxStream<'static, Result<ActionType>>> {
533        let request = self.make_request(Empty {});
534
535        let action_stream = self
536            .inner
537            .list_actions(request)
538            .await?
539            .into_inner()
540            .map_err(FlightError::Tonic);
541
542        Ok(action_stream.boxed())
543    }
544
545    /// Make a `DoAction` call to the server and returning a
546    /// [`Stream`] of opaque [`Bytes`].
547    ///
548    /// # Example:
549    /// ```no_run
550    /// # async fn run() {
551    /// # use bytes::Bytes;
552    /// # use futures::TryStreamExt;
553    /// # use arrow_flight::{Action, FlightClient};
554    /// # use arrow_schema::Schema;
555    /// # let channel: tonic::transport::Channel = unimplemented!();
556    /// let mut client = FlightClient::new(channel);
557    ///
558    /// let request = Action::new("my_action", "the body");
559    ///
560    /// // Make a request to run the action on the server
561    /// let results: Vec<Bytes> = client
562    ///   .do_action(request)
563    ///   .await
564    ///   .expect("error executing acton")
565    ///   .try_collect() // use TryStreamExt to collect stream
566    ///   .await
567    ///   .expect("error gathering action results");
568    /// # }
569    /// ```
570    pub async fn do_action(&mut self, action: Action) -> Result<BoxStream<'static, Result<Bytes>>> {
571        let request = self.make_request(action);
572
573        let result_stream = self
574            .inner
575            .do_action(request)
576            .await?
577            .into_inner()
578            .map_err(FlightError::Tonic)
579            .map(|r| {
580                r.map(|r| {
581                    // unwrap inner bytes
582                    let crate::Result { body } = r;
583                    body
584                })
585            });
586
587        Ok(result_stream.boxed())
588    }
589
590    /// Make a `CancelFlightInfo` call to the server and return
591    /// a [`CancelFlightInfoResult`].
592    ///
593    /// # Example:
594    /// ```no_run
595    /// # async fn run() {
596    /// # use arrow_flight::{CancelFlightInfoRequest, FlightClient, FlightDescriptor};
597    /// # let channel: tonic::transport::Channel = unimplemented!();
598    /// let mut client = FlightClient::new(channel);
599    ///
600    /// // Send a 'CMD' request to the server
601    /// let request = FlightDescriptor::new_cmd(b"MOAR DATA".to_vec());
602    /// let flight_info = client
603    ///   .get_flight_info(request)
604    ///   .await
605    ///   .expect("error handshaking");
606    ///
607    /// // Cancel the query
608    /// let request = CancelFlightInfoRequest::new(flight_info);
609    /// let result = client
610    ///   .cancel_flight_info(request)
611    ///   .await
612    ///   .expect("error cancelling");
613    /// # }
614    /// ```
615    pub async fn cancel_flight_info(
616        &mut self,
617        request: CancelFlightInfoRequest,
618    ) -> Result<CancelFlightInfoResult> {
619        let action = Action::new("CancelFlightInfo", request.encode_to_vec());
620        let response = self.do_action(action).await?.try_next().await?;
621        let response = response.ok_or(FlightError::protocol(
622            "Received no response for cancel_flight_info call",
623        ))?;
624        CancelFlightInfoResult::decode(response)
625            .map_err(|e| FlightError::DecodeError(e.to_string()))
626    }
627
628    /// Make a `RenewFlightEndpoint` call to the server and return
629    /// the renewed [`FlightEndpoint`].
630    ///
631    /// # Example:
632    /// ```no_run
633    /// # async fn run() {
634    /// # use arrow_flight::{FlightClient, FlightDescriptor, RenewFlightEndpointRequest};
635    /// # let channel: tonic::transport::Channel = unimplemented!();
636    /// let mut client = FlightClient::new(channel);
637    ///
638    /// // Send a 'CMD' request to the server
639    /// let request = FlightDescriptor::new_cmd(b"MOAR DATA".to_vec());
640    /// let flight_endpoint = client
641    ///   .get_flight_info(request)
642    ///   .await
643    ///   .expect("error handshaking")
644    ///   .endpoint[0];
645    ///
646    /// // Renew the endpoint
647    /// let request = RenewFlightEndpointRequest::new(flight_endpoint);
648    /// let flight_endpoint = client
649    ///   .renew_flight_endpoint(request)
650    ///   .await
651    ///   .expect("error renewing");
652    /// # }
653    /// ```
654    pub async fn renew_flight_endpoint(
655        &mut self,
656        request: RenewFlightEndpointRequest,
657    ) -> Result<FlightEndpoint> {
658        let action = Action::new("RenewFlightEndpoint", request.encode_to_vec());
659        let response = self.do_action(action).await?.try_next().await?;
660        let response = response.ok_or(FlightError::protocol(
661            "Received no response for renew_flight_endpoint call",
662        ))?;
663        FlightEndpoint::decode(response).map_err(|e| FlightError::DecodeError(e.to_string()))
664    }
665
666    /// return a Request, adding any configured metadata
667    fn make_request<T>(&self, t: T) -> tonic::Request<T> {
668        // Pass along metadata
669        let mut request = tonic::Request::new(t);
670        *request.metadata_mut() = self.metadata.clone();
671        request
672    }
673}