arrow_flight/
lib.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! A native Rust implementation of [Apache Arrow Flight](https://arrow.apache.org/docs/format/Flight.html)
19//! for exchanging [Arrow](https://arrow.apache.org) data between processes.
20//!
21//! Please see the [arrow-flight crates.io](https://crates.io/crates/arrow-flight)
22//! page for feature flags and more information.
23//!
24//! # Overview
25//!
26//! This crate contains:
27//!
28//! 1. Low level [prost] generated structs
29//!    for Flight gRPC protobuf messages, such as [`FlightData`], [`FlightInfo`],
30//!    [`Location`] and [`Ticket`].
31//!
32//! 2. Low level [tonic] generated [`flight_service_client`] and
33//!    [`flight_service_server`].
34//!
35//! 3. Experimental support for [Flight SQL] in [`sql`]. Requires the
36//!    `flight-sql-experimental` feature of this crate to be activated.
37//!
38//! [Flight SQL]: https://arrow.apache.org/docs/format/FlightSql.html
39
40#![doc(
41    html_logo_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_white-bg.svg",
42    html_favicon_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_transparent-bg.svg"
43)]
44#![cfg_attr(docsrs, feature(doc_auto_cfg))]
45#![allow(rustdoc::invalid_html_tags)]
46#![warn(missing_docs)]
47// The unused_crate_dependencies lint does not work well for crates defining additional examples/bin targets
48#![allow(unused_crate_dependencies)]
49
50use arrow_ipc::{convert, writer, writer::EncodedData, writer::IpcWriteOptions};
51use arrow_schema::{ArrowError, Schema};
52
53use arrow_ipc::convert::try_schema_from_ipc_buffer;
54use base64::prelude::BASE64_STANDARD;
55use base64::Engine;
56use bytes::Bytes;
57use prost_types::Timestamp;
58use std::{fmt, ops::Deref};
59
60type ArrowResult<T> = std::result::Result<T, ArrowError>;
61
62#[allow(clippy::all)]
63mod gen {
64    // Since this file is auto-generated, we suppress all warnings
65    #![allow(missing_docs)]
66    include!("arrow.flight.protocol.rs");
67}
68
69/// Defines a `Flight` for generation or retrieval.
70pub mod flight_descriptor {
71    use super::gen;
72    pub use gen::flight_descriptor::DescriptorType;
73}
74
75/// Low Level [tonic] [`FlightServiceClient`](gen::flight_service_client::FlightServiceClient).
76pub mod flight_service_client {
77    use super::gen;
78    pub use gen::flight_service_client::FlightServiceClient;
79}
80
81/// Low Level [tonic] [`FlightServiceServer`](gen::flight_service_server::FlightServiceServer)
82/// and [`FlightService`](gen::flight_service_server::FlightService).
83pub mod flight_service_server {
84    use super::gen;
85    pub use gen::flight_service_server::FlightService;
86    pub use gen::flight_service_server::FlightServiceServer;
87}
88
89/// Mid Level [`FlightClient`]
90pub mod client;
91pub use client::FlightClient;
92
93/// Decoder to create [`RecordBatch`](arrow_array::RecordBatch) streams from [`FlightData`] streams.
94/// See [`FlightRecordBatchStream`](decode::FlightRecordBatchStream).
95pub mod decode;
96
97/// Encoder to create [`FlightData`] streams from [`RecordBatch`](arrow_array::RecordBatch) streams.
98/// See [`FlightDataEncoderBuilder`](encode::FlightDataEncoderBuilder).
99pub mod encode;
100
101/// Common error types
102pub mod error;
103
104pub use gen::Action;
105pub use gen::ActionType;
106pub use gen::BasicAuth;
107pub use gen::CancelFlightInfoRequest;
108pub use gen::CancelFlightInfoResult;
109pub use gen::CancelStatus;
110pub use gen::Criteria;
111pub use gen::Empty;
112pub use gen::FlightData;
113pub use gen::FlightDescriptor;
114pub use gen::FlightEndpoint;
115pub use gen::FlightInfo;
116pub use gen::HandshakeRequest;
117pub use gen::HandshakeResponse;
118pub use gen::Location;
119pub use gen::PollInfo;
120pub use gen::PutResult;
121pub use gen::RenewFlightEndpointRequest;
122pub use gen::Result;
123pub use gen::SchemaResult;
124pub use gen::Ticket;
125
126/// Helper to extract HTTP/gRPC trailers from a tonic stream.
127mod trailers;
128
129pub mod utils;
130
131#[cfg(feature = "flight-sql-experimental")]
132pub mod sql;
133mod streams;
134
135use flight_descriptor::DescriptorType;
136
137/// SchemaAsIpc represents a pairing of a `Schema` with IpcWriteOptions
138pub struct SchemaAsIpc<'a> {
139    /// Data type representing a schema and its IPC write options
140    pub pair: (&'a Schema, &'a IpcWriteOptions),
141}
142
143/// IpcMessage represents a `Schema` in the format expected in
144/// `FlightInfo.schema`
145#[derive(Debug)]
146pub struct IpcMessage(pub Bytes);
147
148// Useful conversion functions
149
150fn flight_schema_as_encoded_data(arrow_schema: &Schema, options: &IpcWriteOptions) -> EncodedData {
151    let data_gen = writer::IpcDataGenerator::default();
152    #[allow(deprecated)]
153    let mut dict_tracker =
154        writer::DictionaryTracker::new_with_preserve_dict_id(false, options.preserve_dict_id());
155    data_gen.schema_to_bytes_with_dictionary_tracker(arrow_schema, &mut dict_tracker, options)
156}
157
158fn flight_schema_as_flatbuffer(schema: &Schema, options: &IpcWriteOptions) -> IpcMessage {
159    let encoded_data = flight_schema_as_encoded_data(schema, options);
160    IpcMessage(encoded_data.ipc_message.into())
161}
162
163// Implement a bunch of useful traits for various conversions, displays,
164// etc...
165
166// Deref
167
168impl Deref for IpcMessage {
169    type Target = [u8];
170
171    fn deref(&self) -> &Self::Target {
172        &self.0
173    }
174}
175
176impl<'a> Deref for SchemaAsIpc<'a> {
177    type Target = (&'a Schema, &'a IpcWriteOptions);
178
179    fn deref(&self) -> &Self::Target {
180        &self.pair
181    }
182}
183
184// Display...
185
186/// Limits the output of value to limit...
187fn limited_fmt(f: &mut fmt::Formatter<'_>, value: &[u8], limit: usize) -> fmt::Result {
188    if value.len() > limit {
189        write!(f, "{:?}", &value[..limit])
190    } else {
191        write!(f, "{:?}", &value)
192    }
193}
194
195impl fmt::Display for FlightData {
196    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
197        write!(f, "FlightData {{")?;
198        write!(f, " descriptor: ")?;
199        match &self.flight_descriptor {
200            Some(d) => write!(f, "{d}")?,
201            None => write!(f, "None")?,
202        };
203        write!(f, ", header: ")?;
204        limited_fmt(f, &self.data_header, 8)?;
205        write!(f, ", metadata: ")?;
206        limited_fmt(f, &self.app_metadata, 8)?;
207        write!(f, ", body: ")?;
208        limited_fmt(f, &self.data_body, 8)?;
209        write!(f, " }}")
210    }
211}
212
213impl fmt::Display for FlightDescriptor {
214    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
215        write!(f, "FlightDescriptor {{")?;
216        write!(f, " type: ")?;
217        match self.r#type() {
218            DescriptorType::Cmd => {
219                write!(f, "cmd, value: ")?;
220                limited_fmt(f, &self.cmd, 8)?;
221            }
222            DescriptorType::Path => {
223                write!(f, "path: [")?;
224                let mut sep = "";
225                for element in &self.path {
226                    write!(f, "{sep}{element}")?;
227                    sep = ", ";
228                }
229                write!(f, "]")?;
230            }
231            DescriptorType::Unknown => {
232                write!(f, "unknown")?;
233            }
234        }
235        write!(f, " }}")
236    }
237}
238
239impl fmt::Display for FlightEndpoint {
240    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
241        write!(f, "FlightEndpoint {{")?;
242        write!(f, " ticket: ")?;
243        match &self.ticket {
244            Some(value) => write!(f, "{value}"),
245            None => write!(f, " None"),
246        }?;
247        write!(f, ", location: [")?;
248        let mut sep = "";
249        for location in &self.location {
250            write!(f, "{sep}{location}")?;
251            sep = ", ";
252        }
253        write!(f, "]")?;
254        write!(f, ", expiration_time:")?;
255        match &self.expiration_time {
256            Some(value) => write!(f, " {value}"),
257            None => write!(f, " None"),
258        }?;
259        write!(f, ", app_metadata: ")?;
260        limited_fmt(f, &self.app_metadata, 8)?;
261        write!(f, " }}")
262    }
263}
264
265impl fmt::Display for FlightInfo {
266    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
267        let ipc_message = IpcMessage(self.schema.clone());
268        let schema: Schema = ipc_message.try_into().map_err(|_err| fmt::Error)?;
269        write!(f, "FlightInfo {{")?;
270        write!(f, " schema: {schema}")?;
271        write!(f, ", descriptor:")?;
272        match &self.flight_descriptor {
273            Some(d) => write!(f, " {d}"),
274            None => write!(f, " None"),
275        }?;
276        write!(f, ", endpoint: [")?;
277        let mut sep = "";
278        for endpoint in &self.endpoint {
279            write!(f, "{sep}{endpoint}")?;
280            sep = ", ";
281        }
282        write!(f, "], total_records: {}", self.total_records)?;
283        write!(f, ", total_bytes: {}", self.total_bytes)?;
284        write!(f, ", ordered: {}", self.ordered)?;
285        write!(f, ", app_metadata: ")?;
286        limited_fmt(f, &self.app_metadata, 8)?;
287        write!(f, " }}")
288    }
289}
290
291impl fmt::Display for PollInfo {
292    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
293        write!(f, "PollInfo {{")?;
294        write!(f, " info:")?;
295        match &self.info {
296            Some(value) => write!(f, " {value}"),
297            None => write!(f, " None"),
298        }?;
299        write!(f, ", descriptor:")?;
300        match &self.flight_descriptor {
301            Some(d) => write!(f, " {d}"),
302            None => write!(f, " None"),
303        }?;
304        write!(f, ", progress:")?;
305        match &self.progress {
306            Some(value) => write!(f, " {value}"),
307            None => write!(f, " None"),
308        }?;
309        write!(f, ", expiration_time:")?;
310        match &self.expiration_time {
311            Some(value) => write!(f, " {value}"),
312            None => write!(f, " None"),
313        }?;
314        write!(f, " }}")
315    }
316}
317
318impl fmt::Display for CancelFlightInfoRequest {
319    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
320        write!(f, "CancelFlightInfoRequest {{")?;
321        write!(f, " info: ")?;
322        match &self.info {
323            Some(value) => write!(f, "{value}")?,
324            None => write!(f, "None")?,
325        };
326        write!(f, " }}")
327    }
328}
329
330impl fmt::Display for CancelFlightInfoResult {
331    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
332        write!(f, "CancelFlightInfoResult {{")?;
333        write!(f, " status: {}", self.status().as_str_name())?;
334        write!(f, " }}")
335    }
336}
337
338impl fmt::Display for RenewFlightEndpointRequest {
339    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
340        write!(f, "RenewFlightEndpointRequest {{")?;
341        write!(f, " endpoint: ")?;
342        match &self.endpoint {
343            Some(value) => write!(f, "{value}")?,
344            None => write!(f, "None")?,
345        };
346        write!(f, " }}")
347    }
348}
349
350impl fmt::Display for Location {
351    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
352        write!(f, "Location {{")?;
353        write!(f, " uri: ")?;
354        write!(f, "{}", self.uri)
355    }
356}
357
358impl fmt::Display for Ticket {
359    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
360        write!(f, "Ticket {{")?;
361        write!(f, " ticket: ")?;
362        write!(f, "{}", BASE64_STANDARD.encode(&self.ticket))
363    }
364}
365
366// From...
367
368impl From<EncodedData> for FlightData {
369    fn from(data: EncodedData) -> Self {
370        FlightData {
371            data_header: data.ipc_message.into(),
372            data_body: data.arrow_data.into(),
373            ..Default::default()
374        }
375    }
376}
377
378impl From<SchemaAsIpc<'_>> for FlightData {
379    fn from(schema_ipc: SchemaAsIpc) -> Self {
380        let IpcMessage(vals) = flight_schema_as_flatbuffer(schema_ipc.0, schema_ipc.1);
381        FlightData {
382            data_header: vals,
383            ..Default::default()
384        }
385    }
386}
387
388impl TryFrom<SchemaAsIpc<'_>> for SchemaResult {
389    type Error = ArrowError;
390
391    fn try_from(schema_ipc: SchemaAsIpc) -> ArrowResult<Self> {
392        // According to the definition from `Flight.proto`
393        // The schema of the dataset in its IPC form:
394        //   4 bytes - an optional IPC_CONTINUATION_TOKEN prefix
395        //   4 bytes - the byte length of the payload
396        //   a flatbuffer Message whose header is the Schema
397        let IpcMessage(vals) = schema_to_ipc_format(schema_ipc)?;
398        Ok(SchemaResult { schema: vals })
399    }
400}
401
402impl TryFrom<SchemaAsIpc<'_>> for IpcMessage {
403    type Error = ArrowError;
404
405    fn try_from(schema_ipc: SchemaAsIpc) -> ArrowResult<Self> {
406        schema_to_ipc_format(schema_ipc)
407    }
408}
409
410fn schema_to_ipc_format(schema_ipc: SchemaAsIpc) -> ArrowResult<IpcMessage> {
411    let pair = *schema_ipc;
412    let encoded_data = flight_schema_as_encoded_data(pair.0, pair.1);
413
414    let mut schema = vec![];
415    writer::write_message(&mut schema, encoded_data, pair.1)?;
416    Ok(IpcMessage(schema.into()))
417}
418
419impl TryFrom<&FlightData> for Schema {
420    type Error = ArrowError;
421    fn try_from(data: &FlightData) -> ArrowResult<Self> {
422        convert::try_schema_from_flatbuffer_bytes(&data.data_header[..]).map_err(|err| {
423            ArrowError::ParseError(format!(
424                "Unable to convert flight data to Arrow schema: {err}"
425            ))
426        })
427    }
428}
429
430impl TryFrom<FlightInfo> for Schema {
431    type Error = ArrowError;
432
433    fn try_from(value: FlightInfo) -> ArrowResult<Self> {
434        value.try_decode_schema()
435    }
436}
437
438impl TryFrom<IpcMessage> for Schema {
439    type Error = ArrowError;
440
441    fn try_from(value: IpcMessage) -> ArrowResult<Self> {
442        try_schema_from_ipc_buffer(&value)
443    }
444}
445
446impl TryFrom<&SchemaResult> for Schema {
447    type Error = ArrowError;
448    fn try_from(data: &SchemaResult) -> ArrowResult<Self> {
449        try_schema_from_ipc_buffer(&data.schema)
450    }
451}
452
453impl TryFrom<SchemaResult> for Schema {
454    type Error = ArrowError;
455    fn try_from(data: SchemaResult) -> ArrowResult<Self> {
456        (&data).try_into()
457    }
458}
459
460// FlightData, FlightDescriptor, etc..
461
462impl FlightData {
463    /// Create a new [`FlightData`].
464    ///
465    /// # See Also
466    ///
467    /// See [`FlightDataEncoderBuilder`] for a higher level API to
468    /// convert a stream of [`RecordBatch`]es to [`FlightData`]s
469    ///
470    /// # Example:
471    ///
472    /// ```
473    /// # use bytes::Bytes;
474    /// # use arrow_flight::{FlightData, FlightDescriptor};
475    /// # fn encode_data() -> Bytes { Bytes::new() } // dummy data
476    /// // Get encoded Arrow IPC data:
477    /// let data_body: Bytes = encode_data();
478    /// // Create the FlightData message
479    /// let flight_data = FlightData::new()
480    ///   .with_descriptor(FlightDescriptor::new_cmd("the command"))
481    ///   .with_app_metadata("My apps metadata")
482    ///   .with_data_body(data_body);
483    /// ```
484    ///
485    /// [`FlightDataEncoderBuilder`]: crate::encode::FlightDataEncoderBuilder
486    /// [`RecordBatch`]: arrow_array::RecordBatch
487    pub fn new() -> Self {
488        Default::default()
489    }
490
491    /// Add a [`FlightDescriptor`] describing the data
492    pub fn with_descriptor(mut self, flight_descriptor: FlightDescriptor) -> Self {
493        self.flight_descriptor = Some(flight_descriptor);
494        self
495    }
496
497    /// Add a data header
498    pub fn with_data_header(mut self, data_header: impl Into<Bytes>) -> Self {
499        self.data_header = data_header.into();
500        self
501    }
502
503    /// Add a data body. See [`IpcDataGenerator`] to create this data.
504    ///
505    /// [`IpcDataGenerator`]: arrow_ipc::writer::IpcDataGenerator
506    pub fn with_data_body(mut self, data_body: impl Into<Bytes>) -> Self {
507        self.data_body = data_body.into();
508        self
509    }
510
511    /// Add optional application specific metadata to the message
512    pub fn with_app_metadata(mut self, app_metadata: impl Into<Bytes>) -> Self {
513        self.app_metadata = app_metadata.into();
514        self
515    }
516}
517
518impl FlightDescriptor {
519    /// Create a new opaque command [`CMD`] `FlightDescriptor` to generate a dataset.
520    ///
521    /// [`CMD`]: https://github.com/apache/arrow/blob/6bd31f37ae66bd35594b077cb2f830be57e08acd/format/Flight.proto#L224-L227
522    pub fn new_cmd(cmd: impl Into<Bytes>) -> Self {
523        FlightDescriptor {
524            r#type: DescriptorType::Cmd.into(),
525            cmd: cmd.into(),
526            ..Default::default()
527        }
528    }
529
530    /// Create a new named path [`PATH`] `FlightDescriptor` that identifies a dataset
531    ///
532    /// [`PATH`]: https://github.com/apache/arrow/blob/6bd31f37ae66bd35594b077cb2f830be57e08acd/format/Flight.proto#L217-L222
533    pub fn new_path(path: Vec<String>) -> Self {
534        FlightDescriptor {
535            r#type: DescriptorType::Path.into(),
536            path,
537            ..Default::default()
538        }
539    }
540}
541
542impl FlightInfo {
543    /// Create a new, empty `FlightInfo`, describing where to fetch flight data
544    ///
545    ///
546    /// # Example:
547    /// ```
548    /// # use arrow_flight::{FlightInfo, Ticket, FlightDescriptor, FlightEndpoint};
549    /// # use arrow_schema::{Schema, Field, DataType};
550    /// # fn get_schema() -> Schema {
551    /// #   Schema::new(vec![
552    /// #     Field::new("a", DataType::Utf8, false),
553    /// #   ])
554    /// # }
555    /// #
556    /// // Create a new FlightInfo
557    /// let flight_info = FlightInfo::new()
558    ///   // Encode the Arrow schema
559    ///   .try_with_schema(&get_schema())
560    ///   .expect("encoding failed")
561    ///   .with_endpoint(
562    ///      FlightEndpoint::new()
563    ///        .with_ticket(Ticket::new("ticket contents")
564    ///      )
565    ///    )
566    ///   .with_descriptor(FlightDescriptor::new_cmd("RUN QUERY"));
567    /// ```
568    pub fn new() -> FlightInfo {
569        FlightInfo {
570            schema: Bytes::new(),
571            flight_descriptor: None,
572            endpoint: vec![],
573            ordered: false,
574            // Flight says "Set these to -1 if unknown."
575            //
576            // https://github.com/apache/arrow-rs/blob/17ca4d51d0490f9c65f5adde144f677dbc8300e7/format/Flight.proto#L287-L289
577            total_records: -1,
578            total_bytes: -1,
579            app_metadata: Bytes::new(),
580        }
581    }
582
583    /// Try and convert the data in this  `FlightInfo` into a [`Schema`]
584    pub fn try_decode_schema(self) -> ArrowResult<Schema> {
585        let msg = IpcMessage(self.schema);
586        msg.try_into()
587    }
588
589    /// Specify the schema for the response.
590    ///
591    /// Note this takes the arrow [`Schema`] (not the IPC schema) and
592    /// encodes it using the default IPC options.
593    ///
594    /// Returns an error if `schema` can not be encoded into IPC form.
595    pub fn try_with_schema(mut self, schema: &Schema) -> ArrowResult<Self> {
596        let options = IpcWriteOptions::default();
597        let IpcMessage(schema) = SchemaAsIpc::new(schema, &options).try_into()?;
598        self.schema = schema;
599        Ok(self)
600    }
601
602    /// Add specific a endpoint for fetching the data
603    pub fn with_endpoint(mut self, endpoint: FlightEndpoint) -> Self {
604        self.endpoint.push(endpoint);
605        self
606    }
607
608    /// Add a [`FlightDescriptor`] describing what this data is
609    pub fn with_descriptor(mut self, flight_descriptor: FlightDescriptor) -> Self {
610        self.flight_descriptor = Some(flight_descriptor);
611        self
612    }
613
614    /// Set the number of records in the result, if known
615    pub fn with_total_records(mut self, total_records: i64) -> Self {
616        self.total_records = total_records;
617        self
618    }
619
620    /// Set the number of bytes in the result, if known
621    pub fn with_total_bytes(mut self, total_bytes: i64) -> Self {
622        self.total_bytes = total_bytes;
623        self
624    }
625
626    /// Specify if the response is [ordered] across endpoints
627    ///
628    /// [ordered]: https://github.com/apache/arrow-rs/blob/17ca4d51d0490f9c65f5adde144f677dbc8300e7/format/Flight.proto#L269-L275
629    pub fn with_ordered(mut self, ordered: bool) -> Self {
630        self.ordered = ordered;
631        self
632    }
633
634    /// Add optional application specific metadata to the message
635    pub fn with_app_metadata(mut self, app_metadata: impl Into<Bytes>) -> Self {
636        self.app_metadata = app_metadata.into();
637        self
638    }
639}
640
641impl PollInfo {
642    /// Create a new, empty [`PollInfo`], providing information for a long-running query
643    ///
644    /// # Example:
645    /// ```
646    /// # use arrow_flight::{FlightInfo, PollInfo, FlightDescriptor};
647    /// # use prost_types::Timestamp;
648    /// // Create a new PollInfo
649    /// let poll_info = PollInfo::new()
650    ///   .with_info(FlightInfo::new())
651    ///   .with_descriptor(FlightDescriptor::new_cmd("RUN QUERY"))
652    ///   .try_with_progress(0.5)
653    ///   .expect("progress should've been valid")
654    ///   .with_expiration_time(
655    ///     "1970-01-01".parse().expect("invalid timestamp")
656    ///   );
657    /// ```
658    pub fn new() -> Self {
659        Self {
660            info: None,
661            flight_descriptor: None,
662            progress: None,
663            expiration_time: None,
664        }
665    }
666
667    /// Add the current available results for the poll call as a [`FlightInfo`]
668    pub fn with_info(mut self, info: FlightInfo) -> Self {
669        self.info = Some(info);
670        self
671    }
672
673    /// Add a [`FlightDescriptor`] that the client should use for the next poll call,
674    /// if the query is not yet complete
675    pub fn with_descriptor(mut self, flight_descriptor: FlightDescriptor) -> Self {
676        self.flight_descriptor = Some(flight_descriptor);
677        self
678    }
679
680    /// Set the query progress if known. Must be in the range [0.0, 1.0] else this will
681    /// return an error
682    pub fn try_with_progress(mut self, progress: f64) -> ArrowResult<Self> {
683        if !(0.0..=1.0).contains(&progress) {
684            return Err(ArrowError::InvalidArgumentError(format!(
685                "PollInfo progress must be in the range [0.0, 1.0], got {progress}"
686            )));
687        }
688        self.progress = Some(progress);
689        Ok(self)
690    }
691
692    /// Specify expiration time for this request
693    pub fn with_expiration_time(mut self, expiration_time: Timestamp) -> Self {
694        self.expiration_time = Some(expiration_time);
695        self
696    }
697}
698
699impl<'a> SchemaAsIpc<'a> {
700    /// Create a new `SchemaAsIpc` from a `Schema` and `IpcWriteOptions`
701    pub fn new(schema: &'a Schema, options: &'a IpcWriteOptions) -> Self {
702        SchemaAsIpc {
703            pair: (schema, options),
704        }
705    }
706}
707
708impl CancelFlightInfoRequest {
709    /// Create a new [`CancelFlightInfoRequest`], providing the [`FlightInfo`]
710    /// of the query to cancel.
711    pub fn new(info: FlightInfo) -> Self {
712        Self { info: Some(info) }
713    }
714}
715
716impl CancelFlightInfoResult {
717    /// Create a new [`CancelFlightInfoResult`] from the provided [`CancelStatus`].
718    pub fn new(status: CancelStatus) -> Self {
719        Self {
720            status: status as i32,
721        }
722    }
723}
724
725impl RenewFlightEndpointRequest {
726    /// Create a new [`RenewFlightEndpointRequest`], providing the [`FlightEndpoint`]
727    /// for which is being requested an extension of its expiration.
728    pub fn new(endpoint: FlightEndpoint) -> Self {
729        Self {
730            endpoint: Some(endpoint),
731        }
732    }
733}
734
735impl Action {
736    /// Create a new Action with type and body
737    pub fn new(action_type: impl Into<String>, body: impl Into<Bytes>) -> Self {
738        Self {
739            r#type: action_type.into(),
740            body: body.into(),
741        }
742    }
743}
744
745impl Result {
746    /// Create a new Result with the specified body
747    pub fn new(body: impl Into<Bytes>) -> Self {
748        Self { body: body.into() }
749    }
750}
751
752impl Ticket {
753    /// Create a new `Ticket`
754    ///
755    /// # Example
756    ///
757    /// ```
758    /// # use arrow_flight::Ticket;
759    /// let ticket = Ticket::new("SELECT * from FOO");
760    /// ```
761    pub fn new(ticket: impl Into<Bytes>) -> Self {
762        Self {
763            ticket: ticket.into(),
764        }
765    }
766}
767
768impl FlightEndpoint {
769    /// Create a new, empty `FlightEndpoint` that represents a location
770    /// to retrieve Flight results.
771    ///
772    /// # Example
773    /// ```
774    /// # use arrow_flight::{FlightEndpoint, Ticket};
775    /// #
776    /// // Specify the client should fetch results from this server
777    /// let endpoint = FlightEndpoint::new()
778    ///   .with_ticket(Ticket::new("the ticket"));
779    ///
780    /// // Specify the client should fetch results from either
781    /// // `http://example.com` or `https://example.com`
782    /// let endpoint = FlightEndpoint::new()
783    ///   .with_ticket(Ticket::new("the ticket"))
784    ///   .with_location("http://example.com")
785    ///   .with_location("https://example.com");
786    /// ```
787    pub fn new() -> FlightEndpoint {
788        Default::default()
789    }
790
791    /// Set the [`Ticket`] used to retrieve data from the endpoint
792    pub fn with_ticket(mut self, ticket: Ticket) -> Self {
793        self.ticket = Some(ticket);
794        self
795    }
796
797    /// Add a location `uri` to this endpoint. Note each endpoint can
798    /// have multiple locations.
799    ///
800    /// If no `uri` is specified, the [Flight Spec] says:
801    ///
802    /// ```text
803    /// * If the list is empty, the expectation is that the ticket can only
804    /// * be redeemed on the current service where the ticket was
805    /// * generated.
806    /// ```
807    /// [Flight Spec]: https://github.com/apache/arrow-rs/blob/17ca4d51d0490f9c65f5adde144f677dbc8300e7/format/Flight.proto#L307C2-L312
808    pub fn with_location(mut self, uri: impl Into<String>) -> Self {
809        self.location.push(Location { uri: uri.into() });
810        self
811    }
812
813    /// Specify expiration time for this stream
814    pub fn with_expiration_time(mut self, expiration_time: Timestamp) -> Self {
815        self.expiration_time = Some(expiration_time);
816        self
817    }
818
819    /// Add optional application specific metadata to the message
820    pub fn with_app_metadata(mut self, app_metadata: impl Into<Bytes>) -> Self {
821        self.app_metadata = app_metadata.into();
822        self
823    }
824}
825
826#[cfg(test)]
827mod tests {
828    use super::*;
829    use arrow_ipc::MetadataVersion;
830    use arrow_schema::{DataType, Field, TimeUnit};
831
832    struct TestVector(Vec<u8>, usize);
833
834    impl fmt::Display for TestVector {
835        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
836            limited_fmt(f, &self.0, self.1)
837        }
838    }
839
840    #[test]
841    fn it_creates_flight_descriptor_command() {
842        let expected_cmd = "my_command".as_bytes();
843        let fd = FlightDescriptor::new_cmd(expected_cmd.to_vec());
844        assert_eq!(fd.r#type(), DescriptorType::Cmd);
845        assert_eq!(fd.cmd, expected_cmd.to_vec());
846    }
847
848    #[test]
849    fn it_accepts_equal_output() {
850        let input = TestVector(vec![91; 10], 10);
851
852        let actual = format!("{input}");
853        let expected = format!("{:?}", vec![91; 10]);
854        assert_eq!(actual, expected);
855    }
856
857    #[test]
858    fn it_accepts_short_output() {
859        let input = TestVector(vec![91; 6], 10);
860
861        let actual = format!("{input}");
862        let expected = format!("{:?}", vec![91; 6]);
863        assert_eq!(actual, expected);
864    }
865
866    #[test]
867    fn it_accepts_long_output() {
868        let input = TestVector(vec![91; 10], 9);
869
870        let actual = format!("{input}");
871        let expected = format!("{:?}", vec![91; 9]);
872        assert_eq!(actual, expected);
873    }
874
875    #[test]
876    fn ser_deser_schema_result() {
877        let schema = Schema::new(vec![
878            Field::new("c1", DataType::Utf8, false),
879            Field::new("c2", DataType::Float64, true),
880            Field::new("c3", DataType::UInt32, false),
881            Field::new("c4", DataType::Boolean, true),
882            Field::new("c5", DataType::Timestamp(TimeUnit::Millisecond, None), true),
883            Field::new("c6", DataType::Time32(TimeUnit::Second), false),
884        ]);
885        // V5 with write_legacy_ipc_format = false
886        // this will write the continuation marker
887        let option = IpcWriteOptions::default();
888        let schema_ipc = SchemaAsIpc::new(&schema, &option);
889        let result: SchemaResult = schema_ipc.try_into().unwrap();
890        let des_schema: Schema = (&result).try_into().unwrap();
891        assert_eq!(schema, des_schema);
892
893        // V4 with write_legacy_ipc_format = true
894        // this will not write the continuation marker
895        let option = IpcWriteOptions::try_new(8, true, MetadataVersion::V4).unwrap();
896        let schema_ipc = SchemaAsIpc::new(&schema, &option);
897        let result: SchemaResult = schema_ipc.try_into().unwrap();
898        let des_schema: Schema = (&result).try_into().unwrap();
899        assert_eq!(schema, des_schema);
900    }
901}