pub struct FlightClient {
metadata: MetadataMap,
inner: FlightServiceClient<Channel>,
}
Expand description
A “Mid level” Apache Arrow Flight client.
FlightClient
is intended as a convenience for interactions
with Arrow Flight servers. For more direct control, such as access
to the response headers, use FlightServiceClient
directly
via methods such as Self::inner
or Self::into_inner
.
§Example:
use tonic::transport::Channel;
let channel = Channel::from_static("http://localhost:1234")
.connect()
.await
.expect("error connecting");
let mut client = FlightClient::new(channel);
// Send 'Hi' bytes as the handshake request to the server
let response = client
.handshake(Bytes::from("Hi"))
.await
.expect("error handshaking");
// Expect the server responded with 'Ho'
assert_eq!(response, Bytes::from("Ho"));
Fields§
§metadata: MetadataMap
Optional grpc header metadata to include with each request
inner: FlightServiceClient<Channel>
The inner client
Implementations§
Source§impl FlightClient
impl FlightClient
Sourcepub fn new_from_inner(inner: FlightServiceClient<Channel>) -> Self
pub fn new_from_inner(inner: FlightServiceClient<Channel>) -> Self
Creates a new higher level client with the provided lower level client
Sourcepub fn metadata(&self) -> &MetadataMap
pub fn metadata(&self) -> &MetadataMap
Return a reference to gRPC metadata included with each request
Sourcepub fn metadata_mut(&mut self) -> &mut MetadataMap
pub fn metadata_mut(&mut self) -> &mut MetadataMap
Return a reference to gRPC metadata included with each request
These headers can be used, for example, to include authorization or other application specific headers.
Sourcepub fn add_header(&mut self, key: &str, value: &str) -> Result<()>
pub fn add_header(&mut self, key: &str, value: &str) -> Result<()>
Add the specified header with value to all subsequent
requests. See Self::metadata_mut
for fine grained control.
Sourcepub fn inner(&self) -> &FlightServiceClient<Channel>
pub fn inner(&self) -> &FlightServiceClient<Channel>
Return a reference to the underlying tonic
FlightServiceClient
Sourcepub fn inner_mut(&mut self) -> &mut FlightServiceClient<Channel>
pub fn inner_mut(&mut self) -> &mut FlightServiceClient<Channel>
Return a mutable reference to the underlying tonic
FlightServiceClient
Sourcepub fn into_inner(self) -> FlightServiceClient<Channel>
pub fn into_inner(self) -> FlightServiceClient<Channel>
Consume this client and return the underlying tonic
FlightServiceClient
Sourcepub async fn handshake(&mut self, payload: impl Into<Bytes>) -> Result<Bytes>
pub async fn handshake(&mut self, payload: impl Into<Bytes>) -> Result<Bytes>
Perform an Arrow Flight handshake with the server, sending
payload
as the HandshakeRequest
payload and returning
the HandshakeResponse
bytes returned from the server
See FlightClient
docs for an example.
Sourcepub async fn do_get(
&mut self,
ticket: Ticket,
) -> Result<FlightRecordBatchStream>
pub async fn do_get( &mut self, ticket: Ticket, ) -> Result<FlightRecordBatchStream>
Make a DoGet
call to the server with the provided ticket,
returning a FlightRecordBatchStream
for reading
RecordBatch
es.
§Note
To access the returned FlightData
use
FlightRecordBatchStream::into_inner()
§Example:
let mut client = FlightClient::new(channel);
// Invoke a do_get request on the server with a previously
// received Ticket
let response = client
.do_get(ticket)
.await
.expect("error invoking do_get");
// Use try_collect to get the RecordBatches from the server
let batches: Vec<RecordBatch> = response
.try_collect()
.await
.expect("no stream errors");
Sourcepub async fn get_flight_info(
&mut self,
descriptor: FlightDescriptor,
) -> Result<FlightInfo>
pub async fn get_flight_info( &mut self, descriptor: FlightDescriptor, ) -> Result<FlightInfo>
Make a GetFlightInfo
call to the server with the provided
FlightDescriptor
and return the FlightInfo
from the
server. The FlightInfo
can be used with Self::do_get
to retrieve the requested batches.
§Example:
let mut client = FlightClient::new(channel);
// Send a 'CMD' request to the server
let request = FlightDescriptor::new_cmd(b"MOAR DATA".to_vec());
let flight_info = client
.get_flight_info(request)
.await
.expect("error handshaking");
// retrieve the first endpoint from the returned flight info
let ticket = flight_info
.endpoint[0]
// Extract the ticket
.ticket
.clone()
.expect("expected ticket");
// Retrieve the corresponding RecordBatch stream with do_get
let data = client
.do_get(ticket)
.await
.expect("error fetching data");
Sourcepub async fn poll_flight_info(
&mut self,
descriptor: FlightDescriptor,
) -> Result<PollInfo>
pub async fn poll_flight_info( &mut self, descriptor: FlightDescriptor, ) -> Result<PollInfo>
Make a PollFlightInfo
call to the server with the provided
FlightDescriptor
and return the PollInfo
from the
server.
The info
field of the PollInfo
can be used with
Self::do_get
to retrieve the requested batches.
If the flight_descriptor
field of the PollInfo
is
None
then the info
field represents the complete results.
If the flight_descriptor
field is some FlightDescriptor
then the info
field has incomplete results, and the client
should call this method again with the new flight_descriptor
to get the updated status.
The expiration_time
, if set, represents the expiration time
of the flight_descriptor
, after which the server may not accept
this retry descriptor and may cancel the query.
§Example:
let mut client = FlightClient::new(channel);
// Send a 'CMD' request to the server
let request = FlightDescriptor::new_cmd(b"MOAR DATA".to_vec());
let poll_info = client
.poll_flight_info(request)
.await
.expect("error handshaking");
// retrieve the first endpoint from the returned poll info
let ticket = poll_info
.info
.expect("expected flight info")
.endpoint[0]
// Extract the ticket
.ticket
.clone()
.expect("expected ticket");
// Retrieve the corresponding RecordBatch stream with do_get
let data = client
.do_get(ticket)
.await
.expect("error fetching data");
Sourcepub async fn do_put<S: Stream<Item = Result<FlightData>> + Send + 'static>(
&mut self,
request: S,
) -> Result<BoxStream<'static, Result<PutResult>>>
pub async fn do_put<S: Stream<Item = Result<FlightData>> + Send + 'static>( &mut self, request: S, ) -> Result<BoxStream<'static, Result<PutResult>>>
Make a DoPut
call to the server with the provided
[Stream
] of FlightData
and returning a
stream of PutResult
.
§Note
The input stream is Result
so that this can be connected
to a streaming data source, such as FlightDataEncoder
,
without having to buffer. If the input stream returns an error
that error will not be sent to the server, instead it will be
placed into the result stream and the server connection
terminated.
§Example:
let mut client = FlightClient::new(channel);
// encode the batch as a stream of `FlightData`
let flight_data_stream = FlightDataEncoderBuilder::new()
.build(futures::stream::iter(vec![Ok(batch)]));
// send the stream and get the results as `PutResult`
let response: Vec<PutResult>= client
.do_put(flight_data_stream)
.await
.unwrap()
.try_collect() // use TryStreamExt to collect stream
.await
.expect("error calling do_put");
Sourcepub async fn do_exchange<S: Stream<Item = Result<FlightData>> + Send + 'static>(
&mut self,
request: S,
) -> Result<FlightRecordBatchStream>
pub async fn do_exchange<S: Stream<Item = Result<FlightData>> + Send + 'static>( &mut self, request: S, ) -> Result<FlightRecordBatchStream>
Make a DoExchange
call to the server with the provided
[Stream
] of FlightData
and returning a
stream of FlightData
.
§Example:
let mut client = FlightClient::new(channel);
// encode the batch as a stream of `FlightData`
let flight_data_stream = FlightDataEncoderBuilder::new()
.build(futures::stream::iter(vec![Ok(batch)]));
// send the stream and get the results as `RecordBatches`
let response: Vec<RecordBatch> = client
.do_exchange(flight_data_stream)
.await
.unwrap()
.try_collect() // use TryStreamExt to collect stream
.await
.expect("error calling do_exchange");
Sourcepub async fn list_flights(
&mut self,
expression: impl Into<Bytes>,
) -> Result<BoxStream<'static, Result<FlightInfo>>>
pub async fn list_flights( &mut self, expression: impl Into<Bytes>, ) -> Result<BoxStream<'static, Result<FlightInfo>>>
Make a ListFlights
call to the server with the provided
criteria and returning a [Stream
] of FlightInfo
.
§Example:
let mut client = FlightClient::new(channel);
// Send 'Name=Foo' bytes as the "expression" to the server
// and gather the returned FlightInfo
let responses: Vec<FlightInfo> = client
.list_flights(Bytes::from("Name=Foo"))
.await
.expect("error listing flights")
.try_collect() // use TryStreamExt to collect stream
.await
.expect("error gathering flights");
Sourcepub async fn get_schema(
&mut self,
flight_descriptor: FlightDescriptor,
) -> Result<Schema>
pub async fn get_schema( &mut self, flight_descriptor: FlightDescriptor, ) -> Result<Schema>
Make a GetSchema
call to the server with the provided
FlightDescriptor
and returning the associated [Schema
].
§Example:
let mut client = FlightClient::new(channel);
// Request the schema result of a 'CMD' request to the server
let request = FlightDescriptor::new_cmd(b"MOAR DATA".to_vec());
let schema: Schema = client
.get_schema(request)
.await
.expect("error making request");
Sourcepub async fn list_actions(
&mut self,
) -> Result<BoxStream<'static, Result<ActionType>>>
pub async fn list_actions( &mut self, ) -> Result<BoxStream<'static, Result<ActionType>>>
Make a ListActions
call to the server and returning a
[Stream
] of ActionType
.
§Example:
let mut client = FlightClient::new(channel);
// List available actions on the server:
let actions: Vec<ActionType> = client
.list_actions()
.await
.expect("error listing actions")
.try_collect() // use TryStreamExt to collect stream
.await
.expect("error gathering actions");
Sourcepub async fn do_action(
&mut self,
action: Action,
) -> Result<BoxStream<'static, Result<Bytes>>>
pub async fn do_action( &mut self, action: Action, ) -> Result<BoxStream<'static, Result<Bytes>>>
Make a DoAction
call to the server and returning a
[Stream
] of opaque [Bytes
].
§Example:
let mut client = FlightClient::new(channel);
let request = Action::new("my_action", "the body");
// Make a request to run the action on the server
let results: Vec<Bytes> = client
.do_action(request)
.await
.expect("error executing acton")
.try_collect() // use TryStreamExt to collect stream
.await
.expect("error gathering action results");
Sourcepub async fn cancel_flight_info(
&mut self,
request: CancelFlightInfoRequest,
) -> Result<CancelFlightInfoResult>
pub async fn cancel_flight_info( &mut self, request: CancelFlightInfoRequest, ) -> Result<CancelFlightInfoResult>
Make a CancelFlightInfo
call to the server and return
a CancelFlightInfoResult
.
§Example:
let mut client = FlightClient::new(channel);
// Send a 'CMD' request to the server
let request = FlightDescriptor::new_cmd(b"MOAR DATA".to_vec());
let flight_info = client
.get_flight_info(request)
.await
.expect("error handshaking");
// Cancel the query
let request = CancelFlightInfoRequest::new(flight_info);
let result = client
.cancel_flight_info(request)
.await
.expect("error cancelling");
Sourcepub async fn renew_flight_endpoint(
&mut self,
request: RenewFlightEndpointRequest,
) -> Result<FlightEndpoint>
pub async fn renew_flight_endpoint( &mut self, request: RenewFlightEndpointRequest, ) -> Result<FlightEndpoint>
Make a RenewFlightEndpoint
call to the server and return
the renewed FlightEndpoint
.
§Example:
let mut client = FlightClient::new(channel);
// Send a 'CMD' request to the server
let request = FlightDescriptor::new_cmd(b"MOAR DATA".to_vec());
let flight_endpoint = client
.get_flight_info(request)
.await
.expect("error handshaking")
.endpoint[0];
// Renew the endpoint
let request = RenewFlightEndpointRequest::new(flight_endpoint);
let flight_endpoint = client
.renew_flight_endpoint(request)
.await
.expect("error renewing");
Sourcefn make_request<T>(&self, t: T) -> Request<T>
fn make_request<T>(&self, t: T) -> Request<T>
return a Request, adding any configured metadata
Trait Implementations§
Auto Trait Implementations§
impl !Freeze for FlightClient
impl !RefUnwindSafe for FlightClient
impl Send for FlightClient
impl Sync for FlightClient
impl Unpin for FlightClient
impl !UnwindSafe for FlightClient
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
§impl<T> Instrument for T
impl<T> Instrument for T
§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T
in a tonic::Request