arrow_flight/
lib.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! A native Rust implementation of [Apache Arrow Flight](https://arrow.apache.org/docs/format/Flight.html)
19//! for exchanging [Arrow](https://arrow.apache.org) data between processes.
20//!
21//! Please see the [arrow-flight crates.io](https://crates.io/crates/arrow-flight)
22//! page for feature flags and more information.
23//!
24//! # Overview
25//!
26//! This crate contains:
27//!
28//! 1. Low level [prost] generated structs
29//!    for Flight gRPC protobuf messages, such as [`FlightData`], [`FlightInfo`],
30//!    [`Location`] and [`Ticket`].
31//!
32//! 2. Low level [tonic] generated [`flight_service_client`] and
33//!    [`flight_service_server`].
34//!
35//! 3. Support for [Flight SQL] in [`sql`]. Requires the
36//!    `flight-sql` feature of this crate to be activated.
37//!
38//! 4. The feature [`flight-sql-experimental`] is deprecated and will be removed in a future release.
39//!
40//! [Flight SQL]: https://arrow.apache.org/docs/format/FlightSql.html
41
42#![doc(
43    html_logo_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_white-bg.svg",
44    html_favicon_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_transparent-bg.svg"
45)]
46#![cfg_attr(docsrs, feature(doc_auto_cfg))]
47#![allow(rustdoc::invalid_html_tags)]
48#![warn(missing_docs)]
49// The unused_crate_dependencies lint does not work well for crates defining additional examples/bin targets
50#![allow(unused_crate_dependencies)]
51
52use arrow_ipc::{convert, writer, writer::EncodedData, writer::IpcWriteOptions};
53use arrow_schema::{ArrowError, Schema};
54
55use arrow_ipc::convert::try_schema_from_ipc_buffer;
56use base64::prelude::BASE64_STANDARD;
57use base64::Engine;
58use bytes::Bytes;
59use prost_types::Timestamp;
60use std::{fmt, ops::Deref};
61
62type ArrowResult<T> = std::result::Result<T, ArrowError>;
63
64#[allow(clippy::all)]
65mod gen {
66    // Since this file is auto-generated, we suppress all warnings
67    #![allow(missing_docs)]
68    include!("arrow.flight.protocol.rs");
69}
70
71/// Defines a `Flight` for generation or retrieval.
72pub mod flight_descriptor {
73    use super::gen;
74    pub use gen::flight_descriptor::DescriptorType;
75}
76
77/// Low Level [tonic] [`FlightServiceClient`](gen::flight_service_client::FlightServiceClient).
78pub mod flight_service_client {
79    use super::gen;
80    pub use gen::flight_service_client::FlightServiceClient;
81}
82
83/// Low Level [tonic] [`FlightServiceServer`](gen::flight_service_server::FlightServiceServer)
84/// and [`FlightService`](gen::flight_service_server::FlightService).
85pub mod flight_service_server {
86    use super::gen;
87    pub use gen::flight_service_server::FlightService;
88    pub use gen::flight_service_server::FlightServiceServer;
89}
90
91/// Mid Level [`FlightClient`]
92pub mod client;
93pub use client::FlightClient;
94
95/// Decoder to create [`RecordBatch`](arrow_array::RecordBatch) streams from [`FlightData`] streams.
96/// See [`FlightRecordBatchStream`](decode::FlightRecordBatchStream).
97pub mod decode;
98
99/// Encoder to create [`FlightData`] streams from [`RecordBatch`](arrow_array::RecordBatch) streams.
100/// See [`FlightDataEncoderBuilder`](encode::FlightDataEncoderBuilder).
101pub mod encode;
102
103/// Common error types
104pub mod error;
105
106pub use gen::Action;
107pub use gen::ActionType;
108pub use gen::BasicAuth;
109pub use gen::CancelFlightInfoRequest;
110pub use gen::CancelFlightInfoResult;
111pub use gen::CancelStatus;
112pub use gen::Criteria;
113pub use gen::Empty;
114pub use gen::FlightData;
115pub use gen::FlightDescriptor;
116pub use gen::FlightEndpoint;
117pub use gen::FlightInfo;
118pub use gen::HandshakeRequest;
119pub use gen::HandshakeResponse;
120pub use gen::Location;
121pub use gen::PollInfo;
122pub use gen::PutResult;
123pub use gen::RenewFlightEndpointRequest;
124pub use gen::Result;
125pub use gen::SchemaResult;
126pub use gen::Ticket;
127
128/// Helper to extract HTTP/gRPC trailers from a tonic stream.
129mod trailers;
130
131pub mod utils;
132
133#[cfg(feature = "flight-sql")]
134pub mod sql;
135mod streams;
136
137use flight_descriptor::DescriptorType;
138
139/// SchemaAsIpc represents a pairing of a `Schema` with IpcWriteOptions
140pub struct SchemaAsIpc<'a> {
141    /// Data type representing a schema and its IPC write options
142    pub pair: (&'a Schema, &'a IpcWriteOptions),
143}
144
145/// IpcMessage represents a `Schema` in the format expected in
146/// `FlightInfo.schema`
147#[derive(Debug)]
148pub struct IpcMessage(pub Bytes);
149
150// Useful conversion functions
151
152fn flight_schema_as_encoded_data(arrow_schema: &Schema, options: &IpcWriteOptions) -> EncodedData {
153    let data_gen = writer::IpcDataGenerator::default();
154    #[allow(deprecated)]
155    let mut dict_tracker =
156        writer::DictionaryTracker::new_with_preserve_dict_id(false, options.preserve_dict_id());
157    data_gen.schema_to_bytes_with_dictionary_tracker(arrow_schema, &mut dict_tracker, options)
158}
159
160fn flight_schema_as_flatbuffer(schema: &Schema, options: &IpcWriteOptions) -> IpcMessage {
161    let encoded_data = flight_schema_as_encoded_data(schema, options);
162    IpcMessage(encoded_data.ipc_message.into())
163}
164
165// Implement a bunch of useful traits for various conversions, displays,
166// etc...
167
168// Deref
169
170impl Deref for IpcMessage {
171    type Target = [u8];
172
173    fn deref(&self) -> &Self::Target {
174        &self.0
175    }
176}
177
178impl<'a> Deref for SchemaAsIpc<'a> {
179    type Target = (&'a Schema, &'a IpcWriteOptions);
180
181    fn deref(&self) -> &Self::Target {
182        &self.pair
183    }
184}
185
186// Display...
187
188/// Limits the output of value to limit...
189fn limited_fmt(f: &mut fmt::Formatter<'_>, value: &[u8], limit: usize) -> fmt::Result {
190    if value.len() > limit {
191        write!(f, "{:?}", &value[..limit])
192    } else {
193        write!(f, "{:?}", &value)
194    }
195}
196
197impl fmt::Display for FlightData {
198    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
199        write!(f, "FlightData {{")?;
200        write!(f, " descriptor: ")?;
201        match &self.flight_descriptor {
202            Some(d) => write!(f, "{d}")?,
203            None => write!(f, "None")?,
204        };
205        write!(f, ", header: ")?;
206        limited_fmt(f, &self.data_header, 8)?;
207        write!(f, ", metadata: ")?;
208        limited_fmt(f, &self.app_metadata, 8)?;
209        write!(f, ", body: ")?;
210        limited_fmt(f, &self.data_body, 8)?;
211        write!(f, " }}")
212    }
213}
214
215impl fmt::Display for FlightDescriptor {
216    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
217        write!(f, "FlightDescriptor {{")?;
218        write!(f, " type: ")?;
219        match self.r#type() {
220            DescriptorType::Cmd => {
221                write!(f, "cmd, value: ")?;
222                limited_fmt(f, &self.cmd, 8)?;
223            }
224            DescriptorType::Path => {
225                write!(f, "path: [")?;
226                let mut sep = "";
227                for element in &self.path {
228                    write!(f, "{sep}{element}")?;
229                    sep = ", ";
230                }
231                write!(f, "]")?;
232            }
233            DescriptorType::Unknown => {
234                write!(f, "unknown")?;
235            }
236        }
237        write!(f, " }}")
238    }
239}
240
241impl fmt::Display for FlightEndpoint {
242    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
243        write!(f, "FlightEndpoint {{")?;
244        write!(f, " ticket: ")?;
245        match &self.ticket {
246            Some(value) => write!(f, "{value}"),
247            None => write!(f, " None"),
248        }?;
249        write!(f, ", location: [")?;
250        let mut sep = "";
251        for location in &self.location {
252            write!(f, "{sep}{location}")?;
253            sep = ", ";
254        }
255        write!(f, "]")?;
256        write!(f, ", expiration_time:")?;
257        match &self.expiration_time {
258            Some(value) => write!(f, " {value}"),
259            None => write!(f, " None"),
260        }?;
261        write!(f, ", app_metadata: ")?;
262        limited_fmt(f, &self.app_metadata, 8)?;
263        write!(f, " }}")
264    }
265}
266
267impl fmt::Display for FlightInfo {
268    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
269        let ipc_message = IpcMessage(self.schema.clone());
270        let schema: Schema = ipc_message.try_into().map_err(|_err| fmt::Error)?;
271        write!(f, "FlightInfo {{")?;
272        write!(f, " schema: {schema}")?;
273        write!(f, ", descriptor:")?;
274        match &self.flight_descriptor {
275            Some(d) => write!(f, " {d}"),
276            None => write!(f, " None"),
277        }?;
278        write!(f, ", endpoint: [")?;
279        let mut sep = "";
280        for endpoint in &self.endpoint {
281            write!(f, "{sep}{endpoint}")?;
282            sep = ", ";
283        }
284        write!(f, "], total_records: {}", self.total_records)?;
285        write!(f, ", total_bytes: {}", self.total_bytes)?;
286        write!(f, ", ordered: {}", self.ordered)?;
287        write!(f, ", app_metadata: ")?;
288        limited_fmt(f, &self.app_metadata, 8)?;
289        write!(f, " }}")
290    }
291}
292
293impl fmt::Display for PollInfo {
294    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
295        write!(f, "PollInfo {{")?;
296        write!(f, " info:")?;
297        match &self.info {
298            Some(value) => write!(f, " {value}"),
299            None => write!(f, " None"),
300        }?;
301        write!(f, ", descriptor:")?;
302        match &self.flight_descriptor {
303            Some(d) => write!(f, " {d}"),
304            None => write!(f, " None"),
305        }?;
306        write!(f, ", progress:")?;
307        match &self.progress {
308            Some(value) => write!(f, " {value}"),
309            None => write!(f, " None"),
310        }?;
311        write!(f, ", expiration_time:")?;
312        match &self.expiration_time {
313            Some(value) => write!(f, " {value}"),
314            None => write!(f, " None"),
315        }?;
316        write!(f, " }}")
317    }
318}
319
320impl fmt::Display for CancelFlightInfoRequest {
321    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
322        write!(f, "CancelFlightInfoRequest {{")?;
323        write!(f, " info: ")?;
324        match &self.info {
325            Some(value) => write!(f, "{value}")?,
326            None => write!(f, "None")?,
327        };
328        write!(f, " }}")
329    }
330}
331
332impl fmt::Display for CancelFlightInfoResult {
333    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
334        write!(f, "CancelFlightInfoResult {{")?;
335        write!(f, " status: {}", self.status().as_str_name())?;
336        write!(f, " }}")
337    }
338}
339
340impl fmt::Display for RenewFlightEndpointRequest {
341    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
342        write!(f, "RenewFlightEndpointRequest {{")?;
343        write!(f, " endpoint: ")?;
344        match &self.endpoint {
345            Some(value) => write!(f, "{value}")?,
346            None => write!(f, "None")?,
347        };
348        write!(f, " }}")
349    }
350}
351
352impl fmt::Display for Location {
353    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
354        write!(f, "Location {{")?;
355        write!(f, " uri: ")?;
356        write!(f, "{}", self.uri)
357    }
358}
359
360impl fmt::Display for Ticket {
361    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
362        write!(f, "Ticket {{")?;
363        write!(f, " ticket: ")?;
364        write!(f, "{}", BASE64_STANDARD.encode(&self.ticket))
365    }
366}
367
368// From...
369
370impl From<EncodedData> for FlightData {
371    fn from(data: EncodedData) -> Self {
372        FlightData {
373            data_header: data.ipc_message.into(),
374            data_body: data.arrow_data.into(),
375            ..Default::default()
376        }
377    }
378}
379
380impl From<SchemaAsIpc<'_>> for FlightData {
381    fn from(schema_ipc: SchemaAsIpc) -> Self {
382        let IpcMessage(vals) = flight_schema_as_flatbuffer(schema_ipc.0, schema_ipc.1);
383        FlightData {
384            data_header: vals,
385            ..Default::default()
386        }
387    }
388}
389
390impl TryFrom<SchemaAsIpc<'_>> for SchemaResult {
391    type Error = ArrowError;
392
393    fn try_from(schema_ipc: SchemaAsIpc) -> ArrowResult<Self> {
394        // According to the definition from `Flight.proto`
395        // The schema of the dataset in its IPC form:
396        //   4 bytes - an optional IPC_CONTINUATION_TOKEN prefix
397        //   4 bytes - the byte length of the payload
398        //   a flatbuffer Message whose header is the Schema
399        let IpcMessage(vals) = schema_to_ipc_format(schema_ipc)?;
400        Ok(SchemaResult { schema: vals })
401    }
402}
403
404impl TryFrom<SchemaAsIpc<'_>> for IpcMessage {
405    type Error = ArrowError;
406
407    fn try_from(schema_ipc: SchemaAsIpc) -> ArrowResult<Self> {
408        schema_to_ipc_format(schema_ipc)
409    }
410}
411
412fn schema_to_ipc_format(schema_ipc: SchemaAsIpc) -> ArrowResult<IpcMessage> {
413    let pair = *schema_ipc;
414    let encoded_data = flight_schema_as_encoded_data(pair.0, pair.1);
415
416    let mut schema = vec![];
417    writer::write_message(&mut schema, encoded_data, pair.1)?;
418    Ok(IpcMessage(schema.into()))
419}
420
421impl TryFrom<&FlightData> for Schema {
422    type Error = ArrowError;
423    fn try_from(data: &FlightData) -> ArrowResult<Self> {
424        convert::try_schema_from_flatbuffer_bytes(&data.data_header[..]).map_err(|err| {
425            ArrowError::ParseError(format!(
426                "Unable to convert flight data to Arrow schema: {err}"
427            ))
428        })
429    }
430}
431
432impl TryFrom<FlightInfo> for Schema {
433    type Error = ArrowError;
434
435    fn try_from(value: FlightInfo) -> ArrowResult<Self> {
436        value.try_decode_schema()
437    }
438}
439
440impl TryFrom<IpcMessage> for Schema {
441    type Error = ArrowError;
442
443    fn try_from(value: IpcMessage) -> ArrowResult<Self> {
444        try_schema_from_ipc_buffer(&value)
445    }
446}
447
448impl TryFrom<&SchemaResult> for Schema {
449    type Error = ArrowError;
450    fn try_from(data: &SchemaResult) -> ArrowResult<Self> {
451        try_schema_from_ipc_buffer(&data.schema)
452    }
453}
454
455impl TryFrom<SchemaResult> for Schema {
456    type Error = ArrowError;
457    fn try_from(data: SchemaResult) -> ArrowResult<Self> {
458        (&data).try_into()
459    }
460}
461
462// FlightData, FlightDescriptor, etc..
463
464impl FlightData {
465    /// Create a new [`FlightData`].
466    ///
467    /// # See Also
468    ///
469    /// See [`FlightDataEncoderBuilder`] for a higher level API to
470    /// convert a stream of [`RecordBatch`]es to [`FlightData`]s
471    ///
472    /// # Example:
473    ///
474    /// ```
475    /// # use bytes::Bytes;
476    /// # use arrow_flight::{FlightData, FlightDescriptor};
477    /// # fn encode_data() -> Bytes { Bytes::new() } // dummy data
478    /// // Get encoded Arrow IPC data:
479    /// let data_body: Bytes = encode_data();
480    /// // Create the FlightData message
481    /// let flight_data = FlightData::new()
482    ///   .with_descriptor(FlightDescriptor::new_cmd("the command"))
483    ///   .with_app_metadata("My apps metadata")
484    ///   .with_data_body(data_body);
485    /// ```
486    ///
487    /// [`FlightDataEncoderBuilder`]: crate::encode::FlightDataEncoderBuilder
488    /// [`RecordBatch`]: arrow_array::RecordBatch
489    pub fn new() -> Self {
490        Default::default()
491    }
492
493    /// Add a [`FlightDescriptor`] describing the data
494    pub fn with_descriptor(mut self, flight_descriptor: FlightDescriptor) -> Self {
495        self.flight_descriptor = Some(flight_descriptor);
496        self
497    }
498
499    /// Add a data header
500    pub fn with_data_header(mut self, data_header: impl Into<Bytes>) -> Self {
501        self.data_header = data_header.into();
502        self
503    }
504
505    /// Add a data body. See [`IpcDataGenerator`] to create this data.
506    ///
507    /// [`IpcDataGenerator`]: arrow_ipc::writer::IpcDataGenerator
508    pub fn with_data_body(mut self, data_body: impl Into<Bytes>) -> Self {
509        self.data_body = data_body.into();
510        self
511    }
512
513    /// Add optional application specific metadata to the message
514    pub fn with_app_metadata(mut self, app_metadata: impl Into<Bytes>) -> Self {
515        self.app_metadata = app_metadata.into();
516        self
517    }
518}
519
520impl FlightDescriptor {
521    /// Create a new opaque command [`CMD`] `FlightDescriptor` to generate a dataset.
522    ///
523    /// [`CMD`]: https://github.com/apache/arrow/blob/6bd31f37ae66bd35594b077cb2f830be57e08acd/format/Flight.proto#L224-L227
524    pub fn new_cmd(cmd: impl Into<Bytes>) -> Self {
525        FlightDescriptor {
526            r#type: DescriptorType::Cmd.into(),
527            cmd: cmd.into(),
528            ..Default::default()
529        }
530    }
531
532    /// Create a new named path [`PATH`] `FlightDescriptor` that identifies a dataset
533    ///
534    /// [`PATH`]: https://github.com/apache/arrow/blob/6bd31f37ae66bd35594b077cb2f830be57e08acd/format/Flight.proto#L217-L222
535    pub fn new_path(path: Vec<String>) -> Self {
536        FlightDescriptor {
537            r#type: DescriptorType::Path.into(),
538            path,
539            ..Default::default()
540        }
541    }
542}
543
544impl FlightInfo {
545    /// Create a new, empty `FlightInfo`, describing where to fetch flight data
546    ///
547    ///
548    /// # Example:
549    /// ```
550    /// # use arrow_flight::{FlightInfo, Ticket, FlightDescriptor, FlightEndpoint};
551    /// # use arrow_schema::{Schema, Field, DataType};
552    /// # fn get_schema() -> Schema {
553    /// #   Schema::new(vec![
554    /// #     Field::new("a", DataType::Utf8, false),
555    /// #   ])
556    /// # }
557    /// #
558    /// // Create a new FlightInfo
559    /// let flight_info = FlightInfo::new()
560    ///   // Encode the Arrow schema
561    ///   .try_with_schema(&get_schema())
562    ///   .expect("encoding failed")
563    ///   .with_endpoint(
564    ///      FlightEndpoint::new()
565    ///        .with_ticket(Ticket::new("ticket contents")
566    ///      )
567    ///    )
568    ///   .with_descriptor(FlightDescriptor::new_cmd("RUN QUERY"));
569    /// ```
570    pub fn new() -> FlightInfo {
571        FlightInfo {
572            schema: Bytes::new(),
573            flight_descriptor: None,
574            endpoint: vec![],
575            ordered: false,
576            // Flight says "Set these to -1 if unknown."
577            //
578            // https://github.com/apache/arrow-rs/blob/17ca4d51d0490f9c65f5adde144f677dbc8300e7/format/Flight.proto#L287-L289
579            total_records: -1,
580            total_bytes: -1,
581            app_metadata: Bytes::new(),
582        }
583    }
584
585    /// Try and convert the data in this  `FlightInfo` into a [`Schema`]
586    pub fn try_decode_schema(self) -> ArrowResult<Schema> {
587        let msg = IpcMessage(self.schema);
588        msg.try_into()
589    }
590
591    /// Specify the schema for the response.
592    ///
593    /// Note this takes the arrow [`Schema`] (not the IPC schema) and
594    /// encodes it using the default IPC options.
595    ///
596    /// Returns an error if `schema` can not be encoded into IPC form.
597    pub fn try_with_schema(mut self, schema: &Schema) -> ArrowResult<Self> {
598        let options = IpcWriteOptions::default();
599        let IpcMessage(schema) = SchemaAsIpc::new(schema, &options).try_into()?;
600        self.schema = schema;
601        Ok(self)
602    }
603
604    /// Add specific a endpoint for fetching the data
605    pub fn with_endpoint(mut self, endpoint: FlightEndpoint) -> Self {
606        self.endpoint.push(endpoint);
607        self
608    }
609
610    /// Add a [`FlightDescriptor`] describing what this data is
611    pub fn with_descriptor(mut self, flight_descriptor: FlightDescriptor) -> Self {
612        self.flight_descriptor = Some(flight_descriptor);
613        self
614    }
615
616    /// Set the number of records in the result, if known
617    pub fn with_total_records(mut self, total_records: i64) -> Self {
618        self.total_records = total_records;
619        self
620    }
621
622    /// Set the number of bytes in the result, if known
623    pub fn with_total_bytes(mut self, total_bytes: i64) -> Self {
624        self.total_bytes = total_bytes;
625        self
626    }
627
628    /// Specify if the response is [ordered] across endpoints
629    ///
630    /// [ordered]: https://github.com/apache/arrow-rs/blob/17ca4d51d0490f9c65f5adde144f677dbc8300e7/format/Flight.proto#L269-L275
631    pub fn with_ordered(mut self, ordered: bool) -> Self {
632        self.ordered = ordered;
633        self
634    }
635
636    /// Add optional application specific metadata to the message
637    pub fn with_app_metadata(mut self, app_metadata: impl Into<Bytes>) -> Self {
638        self.app_metadata = app_metadata.into();
639        self
640    }
641}
642
643impl PollInfo {
644    /// Create a new, empty [`PollInfo`], providing information for a long-running query
645    ///
646    /// # Example:
647    /// ```
648    /// # use arrow_flight::{FlightInfo, PollInfo, FlightDescriptor};
649    /// # use prost_types::Timestamp;
650    /// // Create a new PollInfo
651    /// let poll_info = PollInfo::new()
652    ///   .with_info(FlightInfo::new())
653    ///   .with_descriptor(FlightDescriptor::new_cmd("RUN QUERY"))
654    ///   .try_with_progress(0.5)
655    ///   .expect("progress should've been valid")
656    ///   .with_expiration_time(
657    ///     "1970-01-01".parse().expect("invalid timestamp")
658    ///   );
659    /// ```
660    pub fn new() -> Self {
661        Self {
662            info: None,
663            flight_descriptor: None,
664            progress: None,
665            expiration_time: None,
666        }
667    }
668
669    /// Add the current available results for the poll call as a [`FlightInfo`]
670    pub fn with_info(mut self, info: FlightInfo) -> Self {
671        self.info = Some(info);
672        self
673    }
674
675    /// Add a [`FlightDescriptor`] that the client should use for the next poll call,
676    /// if the query is not yet complete
677    pub fn with_descriptor(mut self, flight_descriptor: FlightDescriptor) -> Self {
678        self.flight_descriptor = Some(flight_descriptor);
679        self
680    }
681
682    /// Set the query progress if known. Must be in the range [0.0, 1.0] else this will
683    /// return an error
684    pub fn try_with_progress(mut self, progress: f64) -> ArrowResult<Self> {
685        if !(0.0..=1.0).contains(&progress) {
686            return Err(ArrowError::InvalidArgumentError(format!(
687                "PollInfo progress must be in the range [0.0, 1.0], got {progress}"
688            )));
689        }
690        self.progress = Some(progress);
691        Ok(self)
692    }
693
694    /// Specify expiration time for this request
695    pub fn with_expiration_time(mut self, expiration_time: Timestamp) -> Self {
696        self.expiration_time = Some(expiration_time);
697        self
698    }
699}
700
701impl<'a> SchemaAsIpc<'a> {
702    /// Create a new `SchemaAsIpc` from a `Schema` and `IpcWriteOptions`
703    pub fn new(schema: &'a Schema, options: &'a IpcWriteOptions) -> Self {
704        SchemaAsIpc {
705            pair: (schema, options),
706        }
707    }
708}
709
710impl CancelFlightInfoRequest {
711    /// Create a new [`CancelFlightInfoRequest`], providing the [`FlightInfo`]
712    /// of the query to cancel.
713    pub fn new(info: FlightInfo) -> Self {
714        Self { info: Some(info) }
715    }
716}
717
718impl CancelFlightInfoResult {
719    /// Create a new [`CancelFlightInfoResult`] from the provided [`CancelStatus`].
720    pub fn new(status: CancelStatus) -> Self {
721        Self {
722            status: status as i32,
723        }
724    }
725}
726
727impl RenewFlightEndpointRequest {
728    /// Create a new [`RenewFlightEndpointRequest`], providing the [`FlightEndpoint`]
729    /// for which is being requested an extension of its expiration.
730    pub fn new(endpoint: FlightEndpoint) -> Self {
731        Self {
732            endpoint: Some(endpoint),
733        }
734    }
735}
736
737impl Action {
738    /// Create a new Action with type and body
739    pub fn new(action_type: impl Into<String>, body: impl Into<Bytes>) -> Self {
740        Self {
741            r#type: action_type.into(),
742            body: body.into(),
743        }
744    }
745}
746
747impl Result {
748    /// Create a new Result with the specified body
749    pub fn new(body: impl Into<Bytes>) -> Self {
750        Self { body: body.into() }
751    }
752}
753
754impl Ticket {
755    /// Create a new `Ticket`
756    ///
757    /// # Example
758    ///
759    /// ```
760    /// # use arrow_flight::Ticket;
761    /// let ticket = Ticket::new("SELECT * from FOO");
762    /// ```
763    pub fn new(ticket: impl Into<Bytes>) -> Self {
764        Self {
765            ticket: ticket.into(),
766        }
767    }
768}
769
770impl FlightEndpoint {
771    /// Create a new, empty `FlightEndpoint` that represents a location
772    /// to retrieve Flight results.
773    ///
774    /// # Example
775    /// ```
776    /// # use arrow_flight::{FlightEndpoint, Ticket};
777    /// #
778    /// // Specify the client should fetch results from this server
779    /// let endpoint = FlightEndpoint::new()
780    ///   .with_ticket(Ticket::new("the ticket"));
781    ///
782    /// // Specify the client should fetch results from either
783    /// // `http://example.com` or `https://example.com`
784    /// let endpoint = FlightEndpoint::new()
785    ///   .with_ticket(Ticket::new("the ticket"))
786    ///   .with_location("http://example.com")
787    ///   .with_location("https://example.com");
788    /// ```
789    pub fn new() -> FlightEndpoint {
790        Default::default()
791    }
792
793    /// Set the [`Ticket`] used to retrieve data from the endpoint
794    pub fn with_ticket(mut self, ticket: Ticket) -> Self {
795        self.ticket = Some(ticket);
796        self
797    }
798
799    /// Add a location `uri` to this endpoint. Note each endpoint can
800    /// have multiple locations.
801    ///
802    /// If no `uri` is specified, the [Flight Spec] says:
803    ///
804    /// ```text
805    /// * If the list is empty, the expectation is that the ticket can only
806    /// * be redeemed on the current service where the ticket was
807    /// * generated.
808    /// ```
809    /// [Flight Spec]: https://github.com/apache/arrow-rs/blob/17ca4d51d0490f9c65f5adde144f677dbc8300e7/format/Flight.proto#L307C2-L312
810    pub fn with_location(mut self, uri: impl Into<String>) -> Self {
811        self.location.push(Location { uri: uri.into() });
812        self
813    }
814
815    /// Specify expiration time for this stream
816    pub fn with_expiration_time(mut self, expiration_time: Timestamp) -> Self {
817        self.expiration_time = Some(expiration_time);
818        self
819    }
820
821    /// Add optional application specific metadata to the message
822    pub fn with_app_metadata(mut self, app_metadata: impl Into<Bytes>) -> Self {
823        self.app_metadata = app_metadata.into();
824        self
825    }
826}
827
828#[cfg(test)]
829mod tests {
830    use super::*;
831    use arrow_ipc::MetadataVersion;
832    use arrow_schema::{DataType, Field, TimeUnit};
833
834    struct TestVector(Vec<u8>, usize);
835
836    impl fmt::Display for TestVector {
837        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
838            limited_fmt(f, &self.0, self.1)
839        }
840    }
841
842    #[test]
843    fn it_creates_flight_descriptor_command() {
844        let expected_cmd = "my_command".as_bytes();
845        let fd = FlightDescriptor::new_cmd(expected_cmd.to_vec());
846        assert_eq!(fd.r#type(), DescriptorType::Cmd);
847        assert_eq!(fd.cmd, expected_cmd.to_vec());
848    }
849
850    #[test]
851    fn it_accepts_equal_output() {
852        let input = TestVector(vec![91; 10], 10);
853
854        let actual = format!("{input}");
855        let expected = format!("{:?}", vec![91; 10]);
856        assert_eq!(actual, expected);
857    }
858
859    #[test]
860    fn it_accepts_short_output() {
861        let input = TestVector(vec![91; 6], 10);
862
863        let actual = format!("{input}");
864        let expected = format!("{:?}", vec![91; 6]);
865        assert_eq!(actual, expected);
866    }
867
868    #[test]
869    fn it_accepts_long_output() {
870        let input = TestVector(vec![91; 10], 9);
871
872        let actual = format!("{input}");
873        let expected = format!("{:?}", vec![91; 9]);
874        assert_eq!(actual, expected);
875    }
876
877    #[test]
878    fn ser_deser_schema_result() {
879        let schema = Schema::new(vec![
880            Field::new("c1", DataType::Utf8, false),
881            Field::new("c2", DataType::Float64, true),
882            Field::new("c3", DataType::UInt32, false),
883            Field::new("c4", DataType::Boolean, true),
884            Field::new("c5", DataType::Timestamp(TimeUnit::Millisecond, None), true),
885            Field::new("c6", DataType::Time32(TimeUnit::Second), false),
886        ]);
887        // V5 with write_legacy_ipc_format = false
888        // this will write the continuation marker
889        let option = IpcWriteOptions::default();
890        let schema_ipc = SchemaAsIpc::new(&schema, &option);
891        let result: SchemaResult = schema_ipc.try_into().unwrap();
892        let des_schema: Schema = (&result).try_into().unwrap();
893        assert_eq!(schema, des_schema);
894
895        // V4 with write_legacy_ipc_format = true
896        // this will not write the continuation marker
897        let option = IpcWriteOptions::try_new(8, true, MetadataVersion::V4).unwrap();
898        let schema_ipc = SchemaAsIpc::new(&schema, &option);
899        let result: SchemaResult = schema_ipc.try_into().unwrap();
900        let des_schema: Schema = (&result).try_into().unwrap();
901        assert_eq!(schema, des_schema);
902    }
903
904    #[test]
905    fn test_dict_schema() {
906        // Test for https://github.com/apache/arrow-rs/issues/7058
907        let schema = Schema::new(vec![
908            Field::new(
909                "a",
910                DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)),
911                false,
912            ),
913            Field::new(
914                "b",
915                DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)),
916                false,
917            ),
918        ]);
919
920        let flight_info = FlightInfo::new().try_with_schema(&schema).unwrap();
921
922        let new_schema = Schema::try_from(flight_info).unwrap();
923        assert_eq!(schema, new_schema);
924    }
925}