arrow_flight/
lib.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! A native Rust implementation of [Apache Arrow Flight](https://arrow.apache.org/docs/format/Flight.html)
19//! for exchanging [Arrow](https://arrow.apache.org) data between processes.
20//!
21//! Please see the [arrow-flight crates.io](https://crates.io/crates/arrow-flight)
22//! page for feature flags and more information.
23//!
24//! # Overview
25//!
26//! This crate contains:
27//!
28//! 1. Low level [prost] generated structs
29//!    for Flight gRPC protobuf messages, such as [`FlightData`], [`FlightInfo`],
30//!    [`Location`] and [`Ticket`].
31//!
32//! 2. Low level [tonic] generated [`flight_service_client`] and
33//!    [`flight_service_server`].
34//!
35//! 3. Support for [Flight SQL] in [`sql`]. Requires the
36//!    `flight-sql` feature of this crate to be activated.
37//!
38//! [Flight SQL]: https://arrow.apache.org/docs/format/FlightSql.html
39
40#![doc(
41    html_logo_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_white-bg.svg",
42    html_favicon_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_transparent-bg.svg"
43)]
44#![cfg_attr(docsrs, feature(doc_cfg))]
45#![allow(rustdoc::invalid_html_tags)]
46#![warn(missing_docs)]
47// The unused_crate_dependencies lint does not work well for crates defining additional examples/bin targets
48#![allow(unused_crate_dependencies)]
49
50use arrow_ipc::{convert, writer, writer::EncodedData, writer::IpcWriteOptions};
51use arrow_schema::{ArrowError, Schema};
52
53use arrow_ipc::convert::try_schema_from_ipc_buffer;
54use base64::Engine;
55use base64::prelude::BASE64_STANDARD;
56use bytes::Bytes;
57use prost_types::Timestamp;
58use std::{fmt, ops::Deref};
59
60type ArrowResult<T> = std::result::Result<T, ArrowError>;
61
62#[allow(clippy::all)]
63mod r#gen {
64    // Since this file is auto-generated, we suppress all warnings
65    #![allow(missing_docs)]
66    include!("arrow.flight.protocol.rs");
67}
68
69/// Defines a `Flight` for generation or retrieval.
70pub mod flight_descriptor {
71    use super::r#gen;
72    pub use r#gen::flight_descriptor::DescriptorType;
73}
74
75/// Low Level [tonic] [`FlightServiceClient`](gen::flight_service_client::FlightServiceClient).
76pub mod flight_service_client {
77    use super::r#gen;
78    pub use r#gen::flight_service_client::FlightServiceClient;
79}
80
81/// Low Level [tonic] [`FlightServiceServer`](gen::flight_service_server::FlightServiceServer)
82/// and [`FlightService`](gen::flight_service_server::FlightService).
83pub mod flight_service_server {
84    use super::r#gen;
85    pub use r#gen::flight_service_server::FlightService;
86    pub use r#gen::flight_service_server::FlightServiceServer;
87}
88
89/// Mid Level [`FlightClient`]
90pub mod client;
91pub use client::FlightClient;
92
93/// Decoder to create [`RecordBatch`](arrow_array::RecordBatch) streams from [`FlightData`] streams.
94/// See [`FlightRecordBatchStream`](decode::FlightRecordBatchStream).
95pub mod decode;
96
97/// Encoder to create [`FlightData`] streams from [`RecordBatch`](arrow_array::RecordBatch) streams.
98/// See [`FlightDataEncoderBuilder`](encode::FlightDataEncoderBuilder).
99pub mod encode;
100
101/// Common error types
102pub mod error;
103
104pub use r#gen::Action;
105pub use r#gen::ActionType;
106pub use r#gen::BasicAuth;
107pub use r#gen::CancelFlightInfoRequest;
108pub use r#gen::CancelFlightInfoResult;
109pub use r#gen::CancelStatus;
110pub use r#gen::Criteria;
111pub use r#gen::Empty;
112pub use r#gen::FlightData;
113pub use r#gen::FlightDescriptor;
114pub use r#gen::FlightEndpoint;
115pub use r#gen::FlightInfo;
116pub use r#gen::HandshakeRequest;
117pub use r#gen::HandshakeResponse;
118pub use r#gen::Location;
119pub use r#gen::PollInfo;
120pub use r#gen::PutResult;
121pub use r#gen::RenewFlightEndpointRequest;
122pub use r#gen::Result;
123pub use r#gen::SchemaResult;
124pub use r#gen::Ticket;
125
126/// Helper to extract HTTP/gRPC trailers from a tonic stream.
127mod trailers;
128
129pub mod utils;
130
131#[cfg(feature = "flight-sql")]
132pub mod sql;
133mod streams;
134
135use flight_descriptor::DescriptorType;
136
137/// SchemaAsIpc represents a pairing of a `Schema` with IpcWriteOptions
138pub struct SchemaAsIpc<'a> {
139    /// Data type representing a schema and its IPC write options
140    pub pair: (&'a Schema, &'a IpcWriteOptions),
141}
142
143/// IpcMessage represents a `Schema` in the format expected in
144/// `FlightInfo.schema`
145#[derive(Debug)]
146pub struct IpcMessage(pub Bytes);
147
148// Useful conversion functions
149
150fn flight_schema_as_encoded_data(arrow_schema: &Schema, options: &IpcWriteOptions) -> EncodedData {
151    let data_gen = writer::IpcDataGenerator::default();
152    let mut dict_tracker = writer::DictionaryTracker::new(false);
153    data_gen.schema_to_bytes_with_dictionary_tracker(arrow_schema, &mut dict_tracker, options)
154}
155
156fn flight_schema_as_flatbuffer(schema: &Schema, options: &IpcWriteOptions) -> IpcMessage {
157    let encoded_data = flight_schema_as_encoded_data(schema, options);
158    IpcMessage(encoded_data.ipc_message.into())
159}
160
161// Implement a bunch of useful traits for various conversions, displays,
162// etc...
163
164// Deref
165
166impl Deref for IpcMessage {
167    type Target = [u8];
168
169    fn deref(&self) -> &Self::Target {
170        &self.0
171    }
172}
173
174impl<'a> Deref for SchemaAsIpc<'a> {
175    type Target = (&'a Schema, &'a IpcWriteOptions);
176
177    fn deref(&self) -> &Self::Target {
178        &self.pair
179    }
180}
181
182// Display...
183
184/// Limits the output of value to limit...
185fn limited_fmt(f: &mut fmt::Formatter<'_>, value: &[u8], limit: usize) -> fmt::Result {
186    if value.len() > limit {
187        write!(f, "{:?}", &value[..limit])
188    } else {
189        write!(f, "{:?}", &value)
190    }
191}
192
193impl fmt::Display for FlightData {
194    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
195        write!(f, "FlightData {{")?;
196        write!(f, " descriptor: ")?;
197        match &self.flight_descriptor {
198            Some(d) => write!(f, "{d}")?,
199            None => write!(f, "None")?,
200        };
201        write!(f, ", header: ")?;
202        limited_fmt(f, &self.data_header, 8)?;
203        write!(f, ", metadata: ")?;
204        limited_fmt(f, &self.app_metadata, 8)?;
205        write!(f, ", body: ")?;
206        limited_fmt(f, &self.data_body, 8)?;
207        write!(f, " }}")
208    }
209}
210
211impl fmt::Display for FlightDescriptor {
212    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
213        write!(f, "FlightDescriptor {{")?;
214        write!(f, " type: ")?;
215        match self.r#type() {
216            DescriptorType::Cmd => {
217                write!(f, "cmd, value: ")?;
218                limited_fmt(f, &self.cmd, 8)?;
219            }
220            DescriptorType::Path => {
221                write!(f, "path: [")?;
222                let mut sep = "";
223                for element in &self.path {
224                    write!(f, "{sep}{element}")?;
225                    sep = ", ";
226                }
227                write!(f, "]")?;
228            }
229            DescriptorType::Unknown => {
230                write!(f, "unknown")?;
231            }
232        }
233        write!(f, " }}")
234    }
235}
236
237impl fmt::Display for FlightEndpoint {
238    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
239        write!(f, "FlightEndpoint {{")?;
240        write!(f, " ticket: ")?;
241        match &self.ticket {
242            Some(value) => write!(f, "{value}"),
243            None => write!(f, " None"),
244        }?;
245        write!(f, ", location: [")?;
246        let mut sep = "";
247        for location in &self.location {
248            write!(f, "{sep}{location}")?;
249            sep = ", ";
250        }
251        write!(f, "]")?;
252        write!(f, ", expiration_time:")?;
253        match &self.expiration_time {
254            Some(value) => write!(f, " {value}"),
255            None => write!(f, " None"),
256        }?;
257        write!(f, ", app_metadata: ")?;
258        limited_fmt(f, &self.app_metadata, 8)?;
259        write!(f, " }}")
260    }
261}
262
263impl fmt::Display for FlightInfo {
264    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
265        let ipc_message = IpcMessage(self.schema.clone());
266        let schema: Schema = ipc_message.try_into().map_err(|_err| fmt::Error)?;
267        write!(f, "FlightInfo {{")?;
268        write!(f, " schema: {schema}")?;
269        write!(f, ", descriptor:")?;
270        match &self.flight_descriptor {
271            Some(d) => write!(f, " {d}"),
272            None => write!(f, " None"),
273        }?;
274        write!(f, ", endpoint: [")?;
275        let mut sep = "";
276        for endpoint in &self.endpoint {
277            write!(f, "{sep}{endpoint}")?;
278            sep = ", ";
279        }
280        write!(f, "], total_records: {}", self.total_records)?;
281        write!(f, ", total_bytes: {}", self.total_bytes)?;
282        write!(f, ", ordered: {}", self.ordered)?;
283        write!(f, ", app_metadata: ")?;
284        limited_fmt(f, &self.app_metadata, 8)?;
285        write!(f, " }}")
286    }
287}
288
289impl fmt::Display for PollInfo {
290    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
291        write!(f, "PollInfo {{")?;
292        write!(f, " info:")?;
293        match &self.info {
294            Some(value) => write!(f, " {value}"),
295            None => write!(f, " None"),
296        }?;
297        write!(f, ", descriptor:")?;
298        match &self.flight_descriptor {
299            Some(d) => write!(f, " {d}"),
300            None => write!(f, " None"),
301        }?;
302        write!(f, ", progress:")?;
303        match &self.progress {
304            Some(value) => write!(f, " {value}"),
305            None => write!(f, " None"),
306        }?;
307        write!(f, ", expiration_time:")?;
308        match &self.expiration_time {
309            Some(value) => write!(f, " {value}"),
310            None => write!(f, " None"),
311        }?;
312        write!(f, " }}")
313    }
314}
315
316impl fmt::Display for CancelFlightInfoRequest {
317    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
318        write!(f, "CancelFlightInfoRequest {{")?;
319        write!(f, " info: ")?;
320        match &self.info {
321            Some(value) => write!(f, "{value}")?,
322            None => write!(f, "None")?,
323        };
324        write!(f, " }}")
325    }
326}
327
328impl fmt::Display for CancelFlightInfoResult {
329    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
330        write!(f, "CancelFlightInfoResult {{")?;
331        write!(f, " status: {}", self.status().as_str_name())?;
332        write!(f, " }}")
333    }
334}
335
336impl fmt::Display for RenewFlightEndpointRequest {
337    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
338        write!(f, "RenewFlightEndpointRequest {{")?;
339        write!(f, " endpoint: ")?;
340        match &self.endpoint {
341            Some(value) => write!(f, "{value}")?,
342            None => write!(f, "None")?,
343        };
344        write!(f, " }}")
345    }
346}
347
348impl fmt::Display for Location {
349    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
350        write!(f, "Location {{")?;
351        write!(f, " uri: ")?;
352        write!(f, "{}", self.uri)
353    }
354}
355
356impl fmt::Display for Ticket {
357    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
358        write!(f, "Ticket {{")?;
359        write!(f, " ticket: ")?;
360        write!(f, "{}", BASE64_STANDARD.encode(&self.ticket))
361    }
362}
363
364// From...
365
366impl From<EncodedData> for FlightData {
367    fn from(data: EncodedData) -> Self {
368        FlightData {
369            data_header: data.ipc_message.into(),
370            data_body: data.arrow_data.into(),
371            ..Default::default()
372        }
373    }
374}
375
376impl From<SchemaAsIpc<'_>> for FlightData {
377    fn from(schema_ipc: SchemaAsIpc) -> Self {
378        let IpcMessage(vals) = flight_schema_as_flatbuffer(schema_ipc.0, schema_ipc.1);
379        FlightData {
380            data_header: vals,
381            ..Default::default()
382        }
383    }
384}
385
386impl TryFrom<SchemaAsIpc<'_>> for SchemaResult {
387    type Error = ArrowError;
388
389    fn try_from(schema_ipc: SchemaAsIpc) -> ArrowResult<Self> {
390        // According to the definition from `Flight.proto`
391        // The schema of the dataset in its IPC form:
392        //   4 bytes - an optional IPC_CONTINUATION_TOKEN prefix
393        //   4 bytes - the byte length of the payload
394        //   a flatbuffer Message whose header is the Schema
395        let IpcMessage(vals) = schema_to_ipc_format(schema_ipc)?;
396        Ok(SchemaResult { schema: vals })
397    }
398}
399
400impl TryFrom<SchemaAsIpc<'_>> for IpcMessage {
401    type Error = ArrowError;
402
403    fn try_from(schema_ipc: SchemaAsIpc) -> ArrowResult<Self> {
404        schema_to_ipc_format(schema_ipc)
405    }
406}
407
408fn schema_to_ipc_format(schema_ipc: SchemaAsIpc) -> ArrowResult<IpcMessage> {
409    let pair = *schema_ipc;
410    let encoded_data = flight_schema_as_encoded_data(pair.0, pair.1);
411
412    let mut schema = vec![];
413    writer::write_message(&mut schema, encoded_data, pair.1)?;
414    Ok(IpcMessage(schema.into()))
415}
416
417impl TryFrom<&FlightData> for Schema {
418    type Error = ArrowError;
419    fn try_from(data: &FlightData) -> ArrowResult<Self> {
420        convert::try_schema_from_flatbuffer_bytes(&data.data_header[..]).map_err(|err| {
421            ArrowError::ParseError(format!(
422                "Unable to convert flight data to Arrow schema: {err}"
423            ))
424        })
425    }
426}
427
428impl TryFrom<FlightInfo> for Schema {
429    type Error = ArrowError;
430
431    fn try_from(value: FlightInfo) -> ArrowResult<Self> {
432        value.try_decode_schema()
433    }
434}
435
436impl TryFrom<IpcMessage> for Schema {
437    type Error = ArrowError;
438
439    fn try_from(value: IpcMessage) -> ArrowResult<Self> {
440        try_schema_from_ipc_buffer(&value)
441    }
442}
443
444impl TryFrom<&SchemaResult> for Schema {
445    type Error = ArrowError;
446    fn try_from(data: &SchemaResult) -> ArrowResult<Self> {
447        try_schema_from_ipc_buffer(&data.schema)
448    }
449}
450
451impl TryFrom<SchemaResult> for Schema {
452    type Error = ArrowError;
453    fn try_from(data: SchemaResult) -> ArrowResult<Self> {
454        (&data).try_into()
455    }
456}
457
458// FlightData, FlightDescriptor, etc..
459
460impl FlightData {
461    /// Create a new [`FlightData`].
462    ///
463    /// # See Also
464    ///
465    /// See [`FlightDataEncoderBuilder`] for a higher level API to
466    /// convert a stream of [`RecordBatch`]es to [`FlightData`]s
467    ///
468    /// # Example:
469    ///
470    /// ```
471    /// # use bytes::Bytes;
472    /// # use arrow_flight::{FlightData, FlightDescriptor};
473    /// # fn encode_data() -> Bytes { Bytes::new() } // dummy data
474    /// // Get encoded Arrow IPC data:
475    /// let data_body: Bytes = encode_data();
476    /// // Create the FlightData message
477    /// let flight_data = FlightData::new()
478    ///   .with_descriptor(FlightDescriptor::new_cmd("the command"))
479    ///   .with_app_metadata("My apps metadata")
480    ///   .with_data_body(data_body);
481    /// ```
482    ///
483    /// [`FlightDataEncoderBuilder`]: crate::encode::FlightDataEncoderBuilder
484    /// [`RecordBatch`]: arrow_array::RecordBatch
485    pub fn new() -> Self {
486        Default::default()
487    }
488
489    /// Add a [`FlightDescriptor`] describing the data
490    pub fn with_descriptor(mut self, flight_descriptor: FlightDescriptor) -> Self {
491        self.flight_descriptor = Some(flight_descriptor);
492        self
493    }
494
495    /// Add a data header
496    pub fn with_data_header(mut self, data_header: impl Into<Bytes>) -> Self {
497        self.data_header = data_header.into();
498        self
499    }
500
501    /// Add a data body. See [`IpcDataGenerator`] to create this data.
502    ///
503    /// [`IpcDataGenerator`]: arrow_ipc::writer::IpcDataGenerator
504    pub fn with_data_body(mut self, data_body: impl Into<Bytes>) -> Self {
505        self.data_body = data_body.into();
506        self
507    }
508
509    /// Add optional application specific metadata to the message
510    pub fn with_app_metadata(mut self, app_metadata: impl Into<Bytes>) -> Self {
511        self.app_metadata = app_metadata.into();
512        self
513    }
514}
515
516impl FlightDescriptor {
517    /// Create a new opaque command [`CMD`] `FlightDescriptor` to generate a dataset.
518    ///
519    /// [`CMD`]: https://github.com/apache/arrow/blob/6bd31f37ae66bd35594b077cb2f830be57e08acd/format/Flight.proto#L224-L227
520    pub fn new_cmd(cmd: impl Into<Bytes>) -> Self {
521        FlightDescriptor {
522            r#type: DescriptorType::Cmd.into(),
523            cmd: cmd.into(),
524            ..Default::default()
525        }
526    }
527
528    /// Create a new named path [`PATH`] `FlightDescriptor` that identifies a dataset
529    ///
530    /// [`PATH`]: https://github.com/apache/arrow/blob/6bd31f37ae66bd35594b077cb2f830be57e08acd/format/Flight.proto#L217-L222
531    pub fn new_path(path: Vec<String>) -> Self {
532        FlightDescriptor {
533            r#type: DescriptorType::Path.into(),
534            path,
535            ..Default::default()
536        }
537    }
538}
539
540impl FlightInfo {
541    /// Create a new, empty `FlightInfo`, describing where to fetch flight data
542    ///
543    ///
544    /// # Example:
545    /// ```
546    /// # use arrow_flight::{FlightInfo, Ticket, FlightDescriptor, FlightEndpoint};
547    /// # use arrow_schema::{Schema, Field, DataType};
548    /// # fn get_schema() -> Schema {
549    /// #   Schema::new(vec![
550    /// #     Field::new("a", DataType::Utf8, false),
551    /// #   ])
552    /// # }
553    /// #
554    /// // Create a new FlightInfo
555    /// let flight_info = FlightInfo::new()
556    ///   // Encode the Arrow schema
557    ///   .try_with_schema(&get_schema())
558    ///   .expect("encoding failed")
559    ///   .with_endpoint(
560    ///      FlightEndpoint::new()
561    ///        .with_ticket(Ticket::new("ticket contents")
562    ///      )
563    ///    )
564    ///   .with_descriptor(FlightDescriptor::new_cmd("RUN QUERY"));
565    /// ```
566    pub fn new() -> FlightInfo {
567        FlightInfo {
568            schema: Bytes::new(),
569            flight_descriptor: None,
570            endpoint: vec![],
571            ordered: false,
572            // Flight says "Set these to -1 if unknown."
573            //
574            // https://github.com/apache/arrow-rs/blob/17ca4d51d0490f9c65f5adde144f677dbc8300e7/format/Flight.proto#L287-L289
575            total_records: -1,
576            total_bytes: -1,
577            app_metadata: Bytes::new(),
578        }
579    }
580
581    /// Try and convert the data in this  `FlightInfo` into a [`Schema`]
582    pub fn try_decode_schema(self) -> ArrowResult<Schema> {
583        let msg = IpcMessage(self.schema);
584        msg.try_into()
585    }
586
587    /// Specify the schema for the response.
588    ///
589    /// Note this takes the arrow [`Schema`] (not the IPC schema) and
590    /// encodes it using the default IPC options.
591    ///
592    /// Returns an error if `schema` can not be encoded into IPC form.
593    pub fn try_with_schema(mut self, schema: &Schema) -> ArrowResult<Self> {
594        let options = IpcWriteOptions::default();
595        let IpcMessage(schema) = SchemaAsIpc::new(schema, &options).try_into()?;
596        self.schema = schema;
597        Ok(self)
598    }
599
600    /// Add specific a endpoint for fetching the data
601    pub fn with_endpoint(mut self, endpoint: FlightEndpoint) -> Self {
602        self.endpoint.push(endpoint);
603        self
604    }
605
606    /// Add endpoints for fetching all data
607    pub fn with_endpoints(mut self, endpoints: Vec<FlightEndpoint>) -> Self {
608        self.endpoint = endpoints;
609        self
610    }
611
612    /// Add a [`FlightDescriptor`] describing what this data is
613    pub fn with_descriptor(mut self, flight_descriptor: FlightDescriptor) -> Self {
614        self.flight_descriptor = Some(flight_descriptor);
615        self
616    }
617
618    /// Set the number of records in the result, if known
619    pub fn with_total_records(mut self, total_records: i64) -> Self {
620        self.total_records = total_records;
621        self
622    }
623
624    /// Set the number of bytes in the result, if known
625    pub fn with_total_bytes(mut self, total_bytes: i64) -> Self {
626        self.total_bytes = total_bytes;
627        self
628    }
629
630    /// Specify if the response is [ordered] across endpoints
631    ///
632    /// [ordered]: https://github.com/apache/arrow-rs/blob/17ca4d51d0490f9c65f5adde144f677dbc8300e7/format/Flight.proto#L269-L275
633    pub fn with_ordered(mut self, ordered: bool) -> Self {
634        self.ordered = ordered;
635        self
636    }
637
638    /// Add optional application specific metadata to the message
639    pub fn with_app_metadata(mut self, app_metadata: impl Into<Bytes>) -> Self {
640        self.app_metadata = app_metadata.into();
641        self
642    }
643}
644
645impl PollInfo {
646    /// Create a new, empty [`PollInfo`], providing information for a long-running query
647    ///
648    /// # Example:
649    /// ```
650    /// # use arrow_flight::{FlightInfo, PollInfo, FlightDescriptor};
651    /// # use prost_types::Timestamp;
652    /// // Create a new PollInfo
653    /// let poll_info = PollInfo::new()
654    ///   .with_info(FlightInfo::new())
655    ///   .with_descriptor(FlightDescriptor::new_cmd("RUN QUERY"))
656    ///   .try_with_progress(0.5)
657    ///   .expect("progress should've been valid")
658    ///   .with_expiration_time(
659    ///     "1970-01-01".parse().expect("invalid timestamp")
660    ///   );
661    /// ```
662    pub fn new() -> Self {
663        Self {
664            info: None,
665            flight_descriptor: None,
666            progress: None,
667            expiration_time: None,
668        }
669    }
670
671    /// Add the current available results for the poll call as a [`FlightInfo`]
672    pub fn with_info(mut self, info: FlightInfo) -> Self {
673        self.info = Some(info);
674        self
675    }
676
677    /// Add a [`FlightDescriptor`] that the client should use for the next poll call,
678    /// if the query is not yet complete
679    pub fn with_descriptor(mut self, flight_descriptor: FlightDescriptor) -> Self {
680        self.flight_descriptor = Some(flight_descriptor);
681        self
682    }
683
684    /// Set the query progress if known. Must be in the range [0.0, 1.0] else this will
685    /// return an error
686    pub fn try_with_progress(mut self, progress: f64) -> ArrowResult<Self> {
687        if !(0.0..=1.0).contains(&progress) {
688            return Err(ArrowError::InvalidArgumentError(format!(
689                "PollInfo progress must be in the range [0.0, 1.0], got {progress}"
690            )));
691        }
692        self.progress = Some(progress);
693        Ok(self)
694    }
695
696    /// Specify expiration time for this request
697    pub fn with_expiration_time(mut self, expiration_time: Timestamp) -> Self {
698        self.expiration_time = Some(expiration_time);
699        self
700    }
701}
702
703impl<'a> SchemaAsIpc<'a> {
704    /// Create a new `SchemaAsIpc` from a `Schema` and `IpcWriteOptions`
705    pub fn new(schema: &'a Schema, options: &'a IpcWriteOptions) -> Self {
706        SchemaAsIpc {
707            pair: (schema, options),
708        }
709    }
710}
711
712impl CancelFlightInfoRequest {
713    /// Create a new [`CancelFlightInfoRequest`], providing the [`FlightInfo`]
714    /// of the query to cancel.
715    pub fn new(info: FlightInfo) -> Self {
716        Self { info: Some(info) }
717    }
718}
719
720impl CancelFlightInfoResult {
721    /// Create a new [`CancelFlightInfoResult`] from the provided [`CancelStatus`].
722    pub fn new(status: CancelStatus) -> Self {
723        Self {
724            status: status as i32,
725        }
726    }
727}
728
729impl RenewFlightEndpointRequest {
730    /// Create a new [`RenewFlightEndpointRequest`], providing the [`FlightEndpoint`]
731    /// for which is being requested an extension of its expiration.
732    pub fn new(endpoint: FlightEndpoint) -> Self {
733        Self {
734            endpoint: Some(endpoint),
735        }
736    }
737}
738
739impl Action {
740    /// Create a new Action with type and body
741    pub fn new(action_type: impl Into<String>, body: impl Into<Bytes>) -> Self {
742        Self {
743            r#type: action_type.into(),
744            body: body.into(),
745        }
746    }
747}
748
749impl Result {
750    /// Create a new Result with the specified body
751    pub fn new(body: impl Into<Bytes>) -> Self {
752        Self { body: body.into() }
753    }
754}
755
756impl Ticket {
757    /// Create a new `Ticket`
758    ///
759    /// # Example
760    ///
761    /// ```
762    /// # use arrow_flight::Ticket;
763    /// let ticket = Ticket::new("SELECT * from FOO");
764    /// ```
765    pub fn new(ticket: impl Into<Bytes>) -> Self {
766        Self {
767            ticket: ticket.into(),
768        }
769    }
770}
771
772impl FlightEndpoint {
773    /// Create a new, empty `FlightEndpoint` that represents a location
774    /// to retrieve Flight results.
775    ///
776    /// # Example
777    /// ```
778    /// # use arrow_flight::{FlightEndpoint, Ticket};
779    /// #
780    /// // Specify the client should fetch results from this server
781    /// let endpoint = FlightEndpoint::new()
782    ///   .with_ticket(Ticket::new("the ticket"));
783    ///
784    /// // Specify the client should fetch results from either
785    /// // `http://example.com` or `https://example.com`
786    /// let endpoint = FlightEndpoint::new()
787    ///   .with_ticket(Ticket::new("the ticket"))
788    ///   .with_location("http://example.com")
789    ///   .with_location("https://example.com");
790    /// ```
791    pub fn new() -> FlightEndpoint {
792        Default::default()
793    }
794
795    /// Set the [`Ticket`] used to retrieve data from the endpoint
796    pub fn with_ticket(mut self, ticket: Ticket) -> Self {
797        self.ticket = Some(ticket);
798        self
799    }
800
801    /// Add a location `uri` to this endpoint. Note each endpoint can
802    /// have multiple locations.
803    ///
804    /// If no `uri` is specified, the [Flight Spec] says:
805    ///
806    /// ```text
807    /// * If the list is empty, the expectation is that the ticket can only
808    /// * be redeemed on the current service where the ticket was
809    /// * generated.
810    /// ```
811    /// [Flight Spec]: https://github.com/apache/arrow-rs/blob/17ca4d51d0490f9c65f5adde144f677dbc8300e7/format/Flight.proto#L307C2-L312
812    pub fn with_location(mut self, uri: impl Into<String>) -> Self {
813        self.location.push(Location { uri: uri.into() });
814        self
815    }
816
817    /// Specify expiration time for this stream
818    pub fn with_expiration_time(mut self, expiration_time: Timestamp) -> Self {
819        self.expiration_time = Some(expiration_time);
820        self
821    }
822
823    /// Add optional application specific metadata to the message
824    pub fn with_app_metadata(mut self, app_metadata: impl Into<Bytes>) -> Self {
825        self.app_metadata = app_metadata.into();
826        self
827    }
828}
829
830#[cfg(test)]
831mod tests {
832    use super::*;
833    use arrow_ipc::MetadataVersion;
834    use arrow_schema::{DataType, Field, TimeUnit};
835
836    struct TestVector(Vec<u8>, usize);
837
838    impl fmt::Display for TestVector {
839        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
840            limited_fmt(f, &self.0, self.1)
841        }
842    }
843
844    #[test]
845    fn it_creates_flight_descriptor_command() {
846        let expected_cmd = "my_command".as_bytes();
847        let fd = FlightDescriptor::new_cmd(expected_cmd.to_vec());
848        assert_eq!(fd.r#type(), DescriptorType::Cmd);
849        assert_eq!(fd.cmd, expected_cmd.to_vec());
850    }
851
852    #[test]
853    fn it_accepts_equal_output() {
854        let input = TestVector(vec![91; 10], 10);
855
856        let actual = format!("{input}");
857        let expected = format!("{:?}", vec![91; 10]);
858        assert_eq!(actual, expected);
859    }
860
861    #[test]
862    fn it_accepts_short_output() {
863        let input = TestVector(vec![91; 6], 10);
864
865        let actual = format!("{input}");
866        let expected = format!("{:?}", vec![91; 6]);
867        assert_eq!(actual, expected);
868    }
869
870    #[test]
871    fn it_accepts_long_output() {
872        let input = TestVector(vec![91; 10], 9);
873
874        let actual = format!("{input}");
875        let expected = format!("{:?}", vec![91; 9]);
876        assert_eq!(actual, expected);
877    }
878
879    #[test]
880    fn ser_deser_schema_result() {
881        let schema = Schema::new(vec![
882            Field::new("c1", DataType::Utf8, false),
883            Field::new("c2", DataType::Float64, true),
884            Field::new("c3", DataType::UInt32, false),
885            Field::new("c4", DataType::Boolean, true),
886            Field::new("c5", DataType::Timestamp(TimeUnit::Millisecond, None), true),
887            Field::new("c6", DataType::Time32(TimeUnit::Second), false),
888        ]);
889        // V5 with write_legacy_ipc_format = false
890        // this will write the continuation marker
891        let option = IpcWriteOptions::default();
892        let schema_ipc = SchemaAsIpc::new(&schema, &option);
893        let result: SchemaResult = schema_ipc.try_into().unwrap();
894        let des_schema: Schema = (&result).try_into().unwrap();
895        assert_eq!(schema, des_schema);
896
897        // V4 with write_legacy_ipc_format = true
898        // this will not write the continuation marker
899        let option = IpcWriteOptions::try_new(8, true, MetadataVersion::V4).unwrap();
900        let schema_ipc = SchemaAsIpc::new(&schema, &option);
901        let result: SchemaResult = schema_ipc.try_into().unwrap();
902        let des_schema: Schema = (&result).try_into().unwrap();
903        assert_eq!(schema, des_schema);
904    }
905
906    #[test]
907    fn test_dict_schema() {
908        // Test for https://github.com/apache/arrow-rs/issues/7058
909        let schema = Schema::new(vec![
910            Field::new(
911                "a",
912                DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)),
913                false,
914            ),
915            Field::new(
916                "b",
917                DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)),
918                false,
919            ),
920        ]);
921
922        let flight_info = FlightInfo::new().try_with_schema(&schema).unwrap();
923
924        let new_schema = Schema::try_from(flight_info).unwrap();
925        assert_eq!(schema, new_schema);
926    }
927}