arrow_flight::encode

Struct FlightDataEncoderBuilder

Source
pub struct FlightDataEncoderBuilder {
    max_flight_data_size: usize,
    options: IpcWriteOptions,
    app_metadata: Bytes,
    schema: Option<SchemaRef>,
    descriptor: Option<FlightDescriptor>,
    dictionary_handling: DictionaryHandling,
}
Expand description

Creates a [Stream] of FlightDatas from a Stream of Result<[RecordBatch], FlightError>.

This can be used to implement FlightService::do_get in an Arrow Flight implementation;

This structure encodes a stream of Results rather than RecordBatches to propagate errors from streaming execution, where the generation of the RecordBatches is incremental, and an error may occur even after several have already been successfully produced.

§Caveats

  1. When DictionaryHandling is DictionaryHandling::Hydrate, DictionaryArrays are converted to their underlying types prior to transport. When DictionaryHandling is DictionaryHandling::Resend, Dictionary FlightData is sent with every [RecordBatch] that contains a DictionaryArray. See https://github.com/apache/arrow-rs/issues/3389.

§Example

use arrow_flight::encode::FlightDataEncoderBuilder;

// Get an input stream of Result<RecordBatch, FlightError>
let input_stream = futures::stream::iter(vec![Ok(batch)]);

// Build a stream of `Result<FlightData>` (e.g. to return for do_get)
let flight_data_stream = FlightDataEncoderBuilder::new()
 .build(input_stream);

// Create a tonic `Response` that can be returned from a Flight server
let response = tonic::Response::new(flight_data_stream);

§Example: Sending Vec<RecordBatch>

You can create a [Stream] to pass to Self::build from an existing Vec of RecordBatches like this:

use arrow_flight::encode::FlightDataEncoderBuilder;

// Get batches that you want to send via Flight
let batches: Vec<RecordBatch> = make_batches();

// Create an input stream of Result<RecordBatch, FlightError>
let input_stream = futures::stream::iter(
  batches.into_iter().map(Ok)
);

// Build a stream of `Result<FlightData>` (e.g. to return for do_get)
let flight_data_stream = FlightDataEncoderBuilder::new()
 .build(input_stream);

§Example: Determining schema of encoded data

Encoding flight data may hydrate dictionaries, see DictionaryHandling for more information, which changes the schema of the encoded data compared to the input record batches. The fully hydrated schema can be accessed using the FlightDataEncoder::known_schema method and explicitly informing the builder of the schema using FlightDataEncoderBuilder::with_schema.

use arrow_flight::encode::FlightDataEncoderBuilder;

// Get the schema of the input stream
let schema = batch.schema();

// Get an input stream of Result<RecordBatch, FlightError>
let input_stream = futures::stream::iter(vec![Ok(batch)]);

// Build a stream of `Result<FlightData>` (e.g. to return for do_get)
let flight_data_stream = FlightDataEncoderBuilder::new()
 // Inform the builder of the input stream schema
 .with_schema(schema)
 .build(input_stream);

// Retrieve the schema of the encoded data
let encoded_schema = flight_data_stream.known_schema();

Fields§

§max_flight_data_size: usize

The maximum approximate target message size in bytes (see details on Self::with_max_flight_data_size).

§options: IpcWriteOptions

Ipc writer options

§app_metadata: Bytes

Metadata to add to the schema message

§schema: Option<SchemaRef>

Optional schema, if known before data.

§descriptor: Option<FlightDescriptor>

Optional flight descriptor, if known before data.

§dictionary_handling: DictionaryHandling

Deterimines how DictionaryArrays are encoded for transport. See DictionaryHandling for more information.

Implementations§

Source§

impl FlightDataEncoderBuilder

Source

pub fn new() -> Self

Source

pub fn with_max_flight_data_size(self, max_flight_data_size: usize) -> Self

Set the (approximate) maximum size, in bytes, of the FlightData produced by this encoder. Defaults to 2MB.

Since there is often a maximum message size for gRPC messages (typically around 4MB), this encoder splits up [RecordBatch]s (preserving order) into multiple FlightData objects to limit the size individual messages sent via gRPC.

The size is approximate because of the additional encoding overhead on top of the underlying data buffers themselves.

Source

pub fn with_dictionary_handling( self, dictionary_handling: DictionaryHandling, ) -> Self

Set DictionaryHandling for encoder

Source

pub fn with_metadata(self, app_metadata: Bytes) -> Self

Specify application specific metadata included in the FlightData::app_metadata field of the the first Schema message

Source

pub fn with_options(self, options: IpcWriteOptions) -> Self

Set the [IpcWriteOptions] used to encode the [RecordBatch]es for transport.

Source

pub fn with_schema(self, schema: SchemaRef) -> Self

Specify a schema for the RecordBatches being sent. If a schema is not specified, an encoded Schema message will be sent when the first [RecordBatch], if any, is encoded. Some clients expect a Schema message even if there is no data sent.

Source

pub fn with_flight_descriptor( self, descriptor: Option<FlightDescriptor>, ) -> Self

Specify a flight descriptor in the first FlightData message.

Source

pub fn build<S>(self, input: S) -> FlightDataEncoder
where S: Stream<Item = Result<RecordBatch>> + Send + 'static,

Takes a [Stream] of Result<RecordBatch> and returns a [Stream] of FlightData, consuming self.

See example on Self and FlightDataEncoder for more details

Trait Implementations§

Source§

impl Debug for FlightDataEncoderBuilder

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more
Source§

impl Default for FlightDataEncoderBuilder

Source§

fn default() -> Self

Returns the “default value” for a type. Read more

Auto Trait Implementations§

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

§

impl<T> Instrument for T

§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided [Span], returning an Instrumented wrapper. Read more
§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> IntoRequest<T> for T

Source§

fn into_request(self) -> Request<T>

Wrap the input message T in a tonic::Request
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

§

fn vzip(self) -> V

§

impl<T> WithSubscriber for T

§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a [WithDispatch] wrapper. Read more
§

impl<T> Allocation for T
where T: RefUnwindSafe + Send + Sync,