Struct arrow_flight::client::FlightClient

source ·
pub struct FlightClient {
    metadata: MetadataMap,
    inner: FlightServiceClient<Channel>,
}
Expand description

A “Mid level” Apache Arrow Flight client.

FlightClient is intended as a convenience for interactions with Arrow Flight servers. For more direct control, such as access to the response headers, use FlightServiceClient directly via methods such as Self::inner or Self::into_inner.

§Example:

use tonic::transport::Channel;
let channel = Channel::from_static("http://localhost:1234")
  .connect()
  .await
  .expect("error connecting");

let mut client = FlightClient::new(channel);

// Send 'Hi' bytes as the handshake request to the server
let response = client
  .handshake(Bytes::from("Hi"))
  .await
  .expect("error handshaking");

// Expect the server responded with 'Ho'
assert_eq!(response, Bytes::from("Ho"));

Fields§

§metadata: MetadataMap

Optional grpc header metadata to include with each request

§inner: FlightServiceClient<Channel>

The inner client

Implementations§

source§

impl FlightClient

source

pub fn new(channel: Channel) -> Self

Creates a client client with the provided Channel

source

pub fn new_from_inner(inner: FlightServiceClient<Channel>) -> Self

Creates a new higher level client with the provided lower level client

source

pub fn metadata(&self) -> &MetadataMap

Return a reference to gRPC metadata included with each request

source

pub fn metadata_mut(&mut self) -> &mut MetadataMap

Return a reference to gRPC metadata included with each request

These headers can be used, for example, to include authorization or other application specific headers.

source

pub fn add_header(&mut self, key: &str, value: &str) -> Result<()>

Add the specified header with value to all subsequent requests. See Self::metadata_mut for fine grained control.

source

pub fn inner(&self) -> &FlightServiceClient<Channel>

Return a reference to the underlying tonic FlightServiceClient

source

pub fn inner_mut(&mut self) -> &mut FlightServiceClient<Channel>

Return a mutable reference to the underlying tonic FlightServiceClient

source

pub fn into_inner(self) -> FlightServiceClient<Channel>

Consume this client and return the underlying tonic FlightServiceClient

source

pub async fn handshake(&mut self, payload: impl Into<Bytes>) -> Result<Bytes>

Perform an Arrow Flight handshake with the server, sending payload as the HandshakeRequest payload and returning the HandshakeResponse bytes returned from the server

See FlightClient docs for an example.

source

pub async fn do_get( &mut self, ticket: Ticket ) -> Result<FlightRecordBatchStream>

Make a DoGet call to the server with the provided ticket, returning a FlightRecordBatchStream for reading RecordBatches.

§Note

To access the returned FlightData use FlightRecordBatchStream::into_inner()

§Example:
let mut client = FlightClient::new(channel);

// Invoke a do_get request on the server with a previously
// received Ticket

let response = client
   .do_get(ticket)
   .await
   .expect("error invoking do_get");

// Use try_collect to get the RecordBatches from the server
let batches: Vec<RecordBatch> = response
   .try_collect()
   .await
   .expect("no stream errors");
source

pub async fn get_flight_info( &mut self, descriptor: FlightDescriptor ) -> Result<FlightInfo>

Make a GetFlightInfo call to the server with the provided FlightDescriptor and return the FlightInfo from the server. The FlightInfo can be used with Self::do_get to retrieve the requested batches.

§Example:
let mut client = FlightClient::new(channel);

// Send a 'CMD' request to the server
let request = FlightDescriptor::new_cmd(b"MOAR DATA".to_vec());
let flight_info = client
  .get_flight_info(request)
  .await
  .expect("error handshaking");

// retrieve the first endpoint from the returned flight info
let ticket = flight_info
  .endpoint[0]
  // Extract the ticket
  .ticket
  .clone()
  .expect("expected ticket");

// Retrieve the corresponding RecordBatch stream with do_get
let data = client
  .do_get(ticket)
  .await
  .expect("error fetching data");
source

pub async fn poll_flight_info( &mut self, descriptor: FlightDescriptor ) -> Result<PollInfo>

Make a PollFlightInfo call to the server with the provided FlightDescriptor and return the PollInfo from the server.

The info field of the PollInfo can be used with Self::do_get to retrieve the requested batches.

If the flight_descriptor field of the PollInfo is None then the info field represents the complete results.

If the flight_descriptor field is some FlightDescriptor then the info field has incomplete results, and the client should call this method again with the new flight_descriptor to get the updated status.

The expiration_time, if set, represents the expiration time of the flight_descriptor, after which the server may not accept this retry descriptor and may cancel the query.

§Example:
let mut client = FlightClient::new(channel);

// Send a 'CMD' request to the server
let request = FlightDescriptor::new_cmd(b"MOAR DATA".to_vec());
let poll_info = client
  .poll_flight_info(request)
  .await
  .expect("error handshaking");

// retrieve the first endpoint from the returned poll info
let ticket = poll_info
  .info
  .expect("expected flight info")
  .endpoint[0]
  // Extract the ticket
  .ticket
  .clone()
  .expect("expected ticket");

// Retrieve the corresponding RecordBatch stream with do_get
let data = client
  .do_get(ticket)
  .await
  .expect("error fetching data");
source

pub async fn do_put<S: Stream<Item = Result<FlightData>> + Send + 'static>( &mut self, request: S ) -> Result<BoxStream<'static, Result<PutResult>>>

Make a DoPut call to the server with the provided [Stream] of FlightData and returning a stream of PutResult.

§Note

The input stream is Result so that this can be connected to a streaming data source, such as FlightDataEncoder, without having to buffer. If the input stream returns an error that error will not be sent to the server, instead it will be placed into the result stream and the server connection terminated.

§Example:
let mut client = FlightClient::new(channel);

// encode the batch as a stream of `FlightData`
let flight_data_stream = FlightDataEncoderBuilder::new()
  .build(futures::stream::iter(vec![Ok(batch)]));

// send the stream and get the results as `PutResult`
let response: Vec<PutResult>= client
  .do_put(flight_data_stream)
  .await
  .unwrap()
  .try_collect() // use TryStreamExt to collect stream
  .await
  .expect("error calling do_put");
source

pub async fn do_exchange<S: Stream<Item = Result<FlightData>> + Send + 'static>( &mut self, request: S ) -> Result<FlightRecordBatchStream>

Make a DoExchange call to the server with the provided [Stream] of FlightData and returning a stream of FlightData.

§Example:
let mut client = FlightClient::new(channel);

// encode the batch as a stream of `FlightData`
let flight_data_stream = FlightDataEncoderBuilder::new()
  .build(futures::stream::iter(vec![Ok(batch)]));

// send the stream and get the results as `RecordBatches`
let response: Vec<RecordBatch> = client
  .do_exchange(flight_data_stream)
  .await
  .unwrap()
  .try_collect() // use TryStreamExt to collect stream
  .await
  .expect("error calling do_exchange");
source

pub async fn list_flights( &mut self, expression: impl Into<Bytes> ) -> Result<BoxStream<'static, Result<FlightInfo>>>

Make a ListFlights call to the server with the provided criteria and returning a [Stream] of FlightInfo.

§Example:
let mut client = FlightClient::new(channel);

// Send 'Name=Foo' bytes as the "expression" to the server
// and gather the returned FlightInfo
let responses: Vec<FlightInfo> = client
  .list_flights(Bytes::from("Name=Foo"))
  .await
  .expect("error listing flights")
  .try_collect() // use TryStreamExt to collect stream
  .await
  .expect("error gathering flights");
source

pub async fn get_schema( &mut self, flight_descriptor: FlightDescriptor ) -> Result<Schema>

Make a GetSchema call to the server with the provided FlightDescriptor and returning the associated [Schema].

§Example:
let mut client = FlightClient::new(channel);

// Request the schema result of a 'CMD' request to the server
let request = FlightDescriptor::new_cmd(b"MOAR DATA".to_vec());

let schema: Schema = client
  .get_schema(request)
  .await
  .expect("error making request");
source

pub async fn list_actions( &mut self ) -> Result<BoxStream<'static, Result<ActionType>>>

Make a ListActions call to the server and returning a [Stream] of ActionType.

§Example:
let mut client = FlightClient::new(channel);

// List available actions on the server:
let actions: Vec<ActionType> = client
  .list_actions()
  .await
  .expect("error listing actions")
  .try_collect() // use TryStreamExt to collect stream
  .await
  .expect("error gathering actions");
source

pub async fn do_action( &mut self, action: Action ) -> Result<BoxStream<'static, Result<Bytes>>>

Make a DoAction call to the server and returning a [Stream] of opaque [Bytes].

§Example:
let mut client = FlightClient::new(channel);

let request = Action::new("my_action", "the body");

// Make a request to run the action on the server
let results: Vec<Bytes> = client
  .do_action(request)
  .await
  .expect("error executing acton")
  .try_collect() // use TryStreamExt to collect stream
  .await
  .expect("error gathering action results");
source

pub async fn cancel_flight_info( &mut self, request: CancelFlightInfoRequest ) -> Result<CancelFlightInfoResult>

Make a CancelFlightInfo call to the server and return a CancelFlightInfoResult.

§Example:
let mut client = FlightClient::new(channel);

// Send a 'CMD' request to the server
let request = FlightDescriptor::new_cmd(b"MOAR DATA".to_vec());
let flight_info = client
  .get_flight_info(request)
  .await
  .expect("error handshaking");

// Cancel the query
let request = CancelFlightInfoRequest::new(flight_info);
let result = client
  .cancel_flight_info(request)
  .await
  .expect("error cancelling");
source

pub async fn renew_flight_endpoint( &mut self, request: RenewFlightEndpointRequest ) -> Result<FlightEndpoint>

Make a RenewFlightEndpoint call to the server and return the renewed FlightEndpoint.

§Example:
let mut client = FlightClient::new(channel);

// Send a 'CMD' request to the server
let request = FlightDescriptor::new_cmd(b"MOAR DATA".to_vec());
let flight_endpoint = client
  .get_flight_info(request)
  .await
  .expect("error handshaking")
  .endpoint[0];

// Renew the endpoint
let request = RenewFlightEndpointRequest::new(flight_endpoint);
let flight_endpoint = client
  .renew_flight_endpoint(request)
  .await
  .expect("error renewing");
source

fn make_request<T>(&self, t: T) -> Request<T>

return a Request, adding any configured metadata

Trait Implementations§

source§

impl Debug for FlightClient

source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

Blanket Implementations§

source§

impl<T> Any for T
where T: 'static + ?Sized,

source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
source§

impl<T> Borrow<T> for T
where T: ?Sized,

source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
source§

impl<T> From<T> for T

source§

fn from(t: T) -> T

Returns the argument unchanged.

§

impl<T> Instrument for T

§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided [Span], returning an Instrumented wrapper. Read more
§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
source§

impl<T, U> Into<U> for T
where U: From<T>,

source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

source§

impl<T> IntoRequest<T> for T

source§

fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request
source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

§

type Error = Infallible

The type returned in the event of a conversion error.
source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

§

fn vzip(self) -> V

§

impl<T> WithSubscriber for T

§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a [WithDispatch] wrapper. Read more