1#![doc(
41 html_logo_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_white-bg.svg",
42 html_favicon_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_transparent-bg.svg"
43)]
44#![cfg_attr(docsrs, feature(doc_auto_cfg))]
45#![allow(rustdoc::invalid_html_tags)]
46#![warn(missing_docs)]
47#![allow(unused_crate_dependencies)]
49
50use arrow_ipc::{convert, writer, writer::EncodedData, writer::IpcWriteOptions};
51use arrow_schema::{ArrowError, Schema};
52
53use arrow_ipc::convert::try_schema_from_ipc_buffer;
54use base64::prelude::BASE64_STANDARD;
55use base64::Engine;
56use bytes::Bytes;
57use prost_types::Timestamp;
58use std::{fmt, ops::Deref};
59
60type ArrowResult<T> = std::result::Result<T, ArrowError>;
61
62#[allow(clippy::all)]
63mod gen {
64 #![allow(missing_docs)]
66 include!("arrow.flight.protocol.rs");
67}
68
69pub mod flight_descriptor {
71 use super::gen;
72 pub use gen::flight_descriptor::DescriptorType;
73}
74
75pub mod flight_service_client {
77 use super::gen;
78 pub use gen::flight_service_client::FlightServiceClient;
79}
80
81pub mod flight_service_server {
84 use super::gen;
85 pub use gen::flight_service_server::FlightService;
86 pub use gen::flight_service_server::FlightServiceServer;
87}
88
89pub mod client;
91pub use client::FlightClient;
92
93pub mod decode;
96
97pub mod encode;
100
101pub mod error;
103
104pub use gen::Action;
105pub use gen::ActionType;
106pub use gen::BasicAuth;
107pub use gen::CancelFlightInfoRequest;
108pub use gen::CancelFlightInfoResult;
109pub use gen::CancelStatus;
110pub use gen::Criteria;
111pub use gen::Empty;
112pub use gen::FlightData;
113pub use gen::FlightDescriptor;
114pub use gen::FlightEndpoint;
115pub use gen::FlightInfo;
116pub use gen::HandshakeRequest;
117pub use gen::HandshakeResponse;
118pub use gen::Location;
119pub use gen::PollInfo;
120pub use gen::PutResult;
121pub use gen::RenewFlightEndpointRequest;
122pub use gen::Result;
123pub use gen::SchemaResult;
124pub use gen::Ticket;
125
126mod trailers;
128
129pub mod utils;
130
131#[cfg(feature = "flight-sql-experimental")]
132pub mod sql;
133mod streams;
134
135use flight_descriptor::DescriptorType;
136
137pub struct SchemaAsIpc<'a> {
139 pub pair: (&'a Schema, &'a IpcWriteOptions),
141}
142
143#[derive(Debug)]
146pub struct IpcMessage(pub Bytes);
147
148fn flight_schema_as_encoded_data(arrow_schema: &Schema, options: &IpcWriteOptions) -> EncodedData {
151 let data_gen = writer::IpcDataGenerator::default();
152 #[allow(deprecated)]
153 let mut dict_tracker =
154 writer::DictionaryTracker::new_with_preserve_dict_id(false, options.preserve_dict_id());
155 data_gen.schema_to_bytes_with_dictionary_tracker(arrow_schema, &mut dict_tracker, options)
156}
157
158fn flight_schema_as_flatbuffer(schema: &Schema, options: &IpcWriteOptions) -> IpcMessage {
159 let encoded_data = flight_schema_as_encoded_data(schema, options);
160 IpcMessage(encoded_data.ipc_message.into())
161}
162
163impl Deref for IpcMessage {
169 type Target = [u8];
170
171 fn deref(&self) -> &Self::Target {
172 &self.0
173 }
174}
175
176impl<'a> Deref for SchemaAsIpc<'a> {
177 type Target = (&'a Schema, &'a IpcWriteOptions);
178
179 fn deref(&self) -> &Self::Target {
180 &self.pair
181 }
182}
183
184fn limited_fmt(f: &mut fmt::Formatter<'_>, value: &[u8], limit: usize) -> fmt::Result {
188 if value.len() > limit {
189 write!(f, "{:?}", &value[..limit])
190 } else {
191 write!(f, "{:?}", &value)
192 }
193}
194
195impl fmt::Display for FlightData {
196 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
197 write!(f, "FlightData {{")?;
198 write!(f, " descriptor: ")?;
199 match &self.flight_descriptor {
200 Some(d) => write!(f, "{d}")?,
201 None => write!(f, "None")?,
202 };
203 write!(f, ", header: ")?;
204 limited_fmt(f, &self.data_header, 8)?;
205 write!(f, ", metadata: ")?;
206 limited_fmt(f, &self.app_metadata, 8)?;
207 write!(f, ", body: ")?;
208 limited_fmt(f, &self.data_body, 8)?;
209 write!(f, " }}")
210 }
211}
212
213impl fmt::Display for FlightDescriptor {
214 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
215 write!(f, "FlightDescriptor {{")?;
216 write!(f, " type: ")?;
217 match self.r#type() {
218 DescriptorType::Cmd => {
219 write!(f, "cmd, value: ")?;
220 limited_fmt(f, &self.cmd, 8)?;
221 }
222 DescriptorType::Path => {
223 write!(f, "path: [")?;
224 let mut sep = "";
225 for element in &self.path {
226 write!(f, "{sep}{element}")?;
227 sep = ", ";
228 }
229 write!(f, "]")?;
230 }
231 DescriptorType::Unknown => {
232 write!(f, "unknown")?;
233 }
234 }
235 write!(f, " }}")
236 }
237}
238
239impl fmt::Display for FlightEndpoint {
240 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
241 write!(f, "FlightEndpoint {{")?;
242 write!(f, " ticket: ")?;
243 match &self.ticket {
244 Some(value) => write!(f, "{value}"),
245 None => write!(f, " None"),
246 }?;
247 write!(f, ", location: [")?;
248 let mut sep = "";
249 for location in &self.location {
250 write!(f, "{sep}{location}")?;
251 sep = ", ";
252 }
253 write!(f, "]")?;
254 write!(f, ", expiration_time:")?;
255 match &self.expiration_time {
256 Some(value) => write!(f, " {value}"),
257 None => write!(f, " None"),
258 }?;
259 write!(f, ", app_metadata: ")?;
260 limited_fmt(f, &self.app_metadata, 8)?;
261 write!(f, " }}")
262 }
263}
264
265impl fmt::Display for FlightInfo {
266 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
267 let ipc_message = IpcMessage(self.schema.clone());
268 let schema: Schema = ipc_message.try_into().map_err(|_err| fmt::Error)?;
269 write!(f, "FlightInfo {{")?;
270 write!(f, " schema: {schema}")?;
271 write!(f, ", descriptor:")?;
272 match &self.flight_descriptor {
273 Some(d) => write!(f, " {d}"),
274 None => write!(f, " None"),
275 }?;
276 write!(f, ", endpoint: [")?;
277 let mut sep = "";
278 for endpoint in &self.endpoint {
279 write!(f, "{sep}{endpoint}")?;
280 sep = ", ";
281 }
282 write!(f, "], total_records: {}", self.total_records)?;
283 write!(f, ", total_bytes: {}", self.total_bytes)?;
284 write!(f, ", ordered: {}", self.ordered)?;
285 write!(f, ", app_metadata: ")?;
286 limited_fmt(f, &self.app_metadata, 8)?;
287 write!(f, " }}")
288 }
289}
290
291impl fmt::Display for PollInfo {
292 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
293 write!(f, "PollInfo {{")?;
294 write!(f, " info:")?;
295 match &self.info {
296 Some(value) => write!(f, " {value}"),
297 None => write!(f, " None"),
298 }?;
299 write!(f, ", descriptor:")?;
300 match &self.flight_descriptor {
301 Some(d) => write!(f, " {d}"),
302 None => write!(f, " None"),
303 }?;
304 write!(f, ", progress:")?;
305 match &self.progress {
306 Some(value) => write!(f, " {value}"),
307 None => write!(f, " None"),
308 }?;
309 write!(f, ", expiration_time:")?;
310 match &self.expiration_time {
311 Some(value) => write!(f, " {value}"),
312 None => write!(f, " None"),
313 }?;
314 write!(f, " }}")
315 }
316}
317
318impl fmt::Display for CancelFlightInfoRequest {
319 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
320 write!(f, "CancelFlightInfoRequest {{")?;
321 write!(f, " info: ")?;
322 match &self.info {
323 Some(value) => write!(f, "{value}")?,
324 None => write!(f, "None")?,
325 };
326 write!(f, " }}")
327 }
328}
329
330impl fmt::Display for CancelFlightInfoResult {
331 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
332 write!(f, "CancelFlightInfoResult {{")?;
333 write!(f, " status: {}", self.status().as_str_name())?;
334 write!(f, " }}")
335 }
336}
337
338impl fmt::Display for RenewFlightEndpointRequest {
339 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
340 write!(f, "RenewFlightEndpointRequest {{")?;
341 write!(f, " endpoint: ")?;
342 match &self.endpoint {
343 Some(value) => write!(f, "{value}")?,
344 None => write!(f, "None")?,
345 };
346 write!(f, " }}")
347 }
348}
349
350impl fmt::Display for Location {
351 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
352 write!(f, "Location {{")?;
353 write!(f, " uri: ")?;
354 write!(f, "{}", self.uri)
355 }
356}
357
358impl fmt::Display for Ticket {
359 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
360 write!(f, "Ticket {{")?;
361 write!(f, " ticket: ")?;
362 write!(f, "{}", BASE64_STANDARD.encode(&self.ticket))
363 }
364}
365
366impl From<EncodedData> for FlightData {
369 fn from(data: EncodedData) -> Self {
370 FlightData {
371 data_header: data.ipc_message.into(),
372 data_body: data.arrow_data.into(),
373 ..Default::default()
374 }
375 }
376}
377
378impl From<SchemaAsIpc<'_>> for FlightData {
379 fn from(schema_ipc: SchemaAsIpc) -> Self {
380 let IpcMessage(vals) = flight_schema_as_flatbuffer(schema_ipc.0, schema_ipc.1);
381 FlightData {
382 data_header: vals,
383 ..Default::default()
384 }
385 }
386}
387
388impl TryFrom<SchemaAsIpc<'_>> for SchemaResult {
389 type Error = ArrowError;
390
391 fn try_from(schema_ipc: SchemaAsIpc) -> ArrowResult<Self> {
392 let IpcMessage(vals) = schema_to_ipc_format(schema_ipc)?;
398 Ok(SchemaResult { schema: vals })
399 }
400}
401
402impl TryFrom<SchemaAsIpc<'_>> for IpcMessage {
403 type Error = ArrowError;
404
405 fn try_from(schema_ipc: SchemaAsIpc) -> ArrowResult<Self> {
406 schema_to_ipc_format(schema_ipc)
407 }
408}
409
410fn schema_to_ipc_format(schema_ipc: SchemaAsIpc) -> ArrowResult<IpcMessage> {
411 let pair = *schema_ipc;
412 let encoded_data = flight_schema_as_encoded_data(pair.0, pair.1);
413
414 let mut schema = vec![];
415 writer::write_message(&mut schema, encoded_data, pair.1)?;
416 Ok(IpcMessage(schema.into()))
417}
418
419impl TryFrom<&FlightData> for Schema {
420 type Error = ArrowError;
421 fn try_from(data: &FlightData) -> ArrowResult<Self> {
422 convert::try_schema_from_flatbuffer_bytes(&data.data_header[..]).map_err(|err| {
423 ArrowError::ParseError(format!(
424 "Unable to convert flight data to Arrow schema: {err}"
425 ))
426 })
427 }
428}
429
430impl TryFrom<FlightInfo> for Schema {
431 type Error = ArrowError;
432
433 fn try_from(value: FlightInfo) -> ArrowResult<Self> {
434 value.try_decode_schema()
435 }
436}
437
438impl TryFrom<IpcMessage> for Schema {
439 type Error = ArrowError;
440
441 fn try_from(value: IpcMessage) -> ArrowResult<Self> {
442 try_schema_from_ipc_buffer(&value)
443 }
444}
445
446impl TryFrom<&SchemaResult> for Schema {
447 type Error = ArrowError;
448 fn try_from(data: &SchemaResult) -> ArrowResult<Self> {
449 try_schema_from_ipc_buffer(&data.schema)
450 }
451}
452
453impl TryFrom<SchemaResult> for Schema {
454 type Error = ArrowError;
455 fn try_from(data: SchemaResult) -> ArrowResult<Self> {
456 (&data).try_into()
457 }
458}
459
460impl FlightData {
463 pub fn new() -> Self {
488 Default::default()
489 }
490
491 pub fn with_descriptor(mut self, flight_descriptor: FlightDescriptor) -> Self {
493 self.flight_descriptor = Some(flight_descriptor);
494 self
495 }
496
497 pub fn with_data_header(mut self, data_header: impl Into<Bytes>) -> Self {
499 self.data_header = data_header.into();
500 self
501 }
502
503 pub fn with_data_body(mut self, data_body: impl Into<Bytes>) -> Self {
507 self.data_body = data_body.into();
508 self
509 }
510
511 pub fn with_app_metadata(mut self, app_metadata: impl Into<Bytes>) -> Self {
513 self.app_metadata = app_metadata.into();
514 self
515 }
516}
517
518impl FlightDescriptor {
519 pub fn new_cmd(cmd: impl Into<Bytes>) -> Self {
523 FlightDescriptor {
524 r#type: DescriptorType::Cmd.into(),
525 cmd: cmd.into(),
526 ..Default::default()
527 }
528 }
529
530 pub fn new_path(path: Vec<String>) -> Self {
534 FlightDescriptor {
535 r#type: DescriptorType::Path.into(),
536 path,
537 ..Default::default()
538 }
539 }
540}
541
542impl FlightInfo {
543 pub fn new() -> FlightInfo {
569 FlightInfo {
570 schema: Bytes::new(),
571 flight_descriptor: None,
572 endpoint: vec![],
573 ordered: false,
574 total_records: -1,
578 total_bytes: -1,
579 app_metadata: Bytes::new(),
580 }
581 }
582
583 pub fn try_decode_schema(self) -> ArrowResult<Schema> {
585 let msg = IpcMessage(self.schema);
586 msg.try_into()
587 }
588
589 pub fn try_with_schema(mut self, schema: &Schema) -> ArrowResult<Self> {
596 let options = IpcWriteOptions::default();
597 let IpcMessage(schema) = SchemaAsIpc::new(schema, &options).try_into()?;
598 self.schema = schema;
599 Ok(self)
600 }
601
602 pub fn with_endpoint(mut self, endpoint: FlightEndpoint) -> Self {
604 self.endpoint.push(endpoint);
605 self
606 }
607
608 pub fn with_descriptor(mut self, flight_descriptor: FlightDescriptor) -> Self {
610 self.flight_descriptor = Some(flight_descriptor);
611 self
612 }
613
614 pub fn with_total_records(mut self, total_records: i64) -> Self {
616 self.total_records = total_records;
617 self
618 }
619
620 pub fn with_total_bytes(mut self, total_bytes: i64) -> Self {
622 self.total_bytes = total_bytes;
623 self
624 }
625
626 pub fn with_ordered(mut self, ordered: bool) -> Self {
630 self.ordered = ordered;
631 self
632 }
633
634 pub fn with_app_metadata(mut self, app_metadata: impl Into<Bytes>) -> Self {
636 self.app_metadata = app_metadata.into();
637 self
638 }
639}
640
641impl PollInfo {
642 pub fn new() -> Self {
659 Self {
660 info: None,
661 flight_descriptor: None,
662 progress: None,
663 expiration_time: None,
664 }
665 }
666
667 pub fn with_info(mut self, info: FlightInfo) -> Self {
669 self.info = Some(info);
670 self
671 }
672
673 pub fn with_descriptor(mut self, flight_descriptor: FlightDescriptor) -> Self {
676 self.flight_descriptor = Some(flight_descriptor);
677 self
678 }
679
680 pub fn try_with_progress(mut self, progress: f64) -> ArrowResult<Self> {
683 if !(0.0..=1.0).contains(&progress) {
684 return Err(ArrowError::InvalidArgumentError(format!(
685 "PollInfo progress must be in the range [0.0, 1.0], got {progress}"
686 )));
687 }
688 self.progress = Some(progress);
689 Ok(self)
690 }
691
692 pub fn with_expiration_time(mut self, expiration_time: Timestamp) -> Self {
694 self.expiration_time = Some(expiration_time);
695 self
696 }
697}
698
699impl<'a> SchemaAsIpc<'a> {
700 pub fn new(schema: &'a Schema, options: &'a IpcWriteOptions) -> Self {
702 SchemaAsIpc {
703 pair: (schema, options),
704 }
705 }
706}
707
708impl CancelFlightInfoRequest {
709 pub fn new(info: FlightInfo) -> Self {
712 Self { info: Some(info) }
713 }
714}
715
716impl CancelFlightInfoResult {
717 pub fn new(status: CancelStatus) -> Self {
719 Self {
720 status: status as i32,
721 }
722 }
723}
724
725impl RenewFlightEndpointRequest {
726 pub fn new(endpoint: FlightEndpoint) -> Self {
729 Self {
730 endpoint: Some(endpoint),
731 }
732 }
733}
734
735impl Action {
736 pub fn new(action_type: impl Into<String>, body: impl Into<Bytes>) -> Self {
738 Self {
739 r#type: action_type.into(),
740 body: body.into(),
741 }
742 }
743}
744
745impl Result {
746 pub fn new(body: impl Into<Bytes>) -> Self {
748 Self { body: body.into() }
749 }
750}
751
752impl Ticket {
753 pub fn new(ticket: impl Into<Bytes>) -> Self {
762 Self {
763 ticket: ticket.into(),
764 }
765 }
766}
767
768impl FlightEndpoint {
769 pub fn new() -> FlightEndpoint {
788 Default::default()
789 }
790
791 pub fn with_ticket(mut self, ticket: Ticket) -> Self {
793 self.ticket = Some(ticket);
794 self
795 }
796
797 pub fn with_location(mut self, uri: impl Into<String>) -> Self {
809 self.location.push(Location { uri: uri.into() });
810 self
811 }
812
813 pub fn with_expiration_time(mut self, expiration_time: Timestamp) -> Self {
815 self.expiration_time = Some(expiration_time);
816 self
817 }
818
819 pub fn with_app_metadata(mut self, app_metadata: impl Into<Bytes>) -> Self {
821 self.app_metadata = app_metadata.into();
822 self
823 }
824}
825
826#[cfg(test)]
827mod tests {
828 use super::*;
829 use arrow_ipc::MetadataVersion;
830 use arrow_schema::{DataType, Field, TimeUnit};
831
832 struct TestVector(Vec<u8>, usize);
833
834 impl fmt::Display for TestVector {
835 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
836 limited_fmt(f, &self.0, self.1)
837 }
838 }
839
840 #[test]
841 fn it_creates_flight_descriptor_command() {
842 let expected_cmd = "my_command".as_bytes();
843 let fd = FlightDescriptor::new_cmd(expected_cmd.to_vec());
844 assert_eq!(fd.r#type(), DescriptorType::Cmd);
845 assert_eq!(fd.cmd, expected_cmd.to_vec());
846 }
847
848 #[test]
849 fn it_accepts_equal_output() {
850 let input = TestVector(vec![91; 10], 10);
851
852 let actual = format!("{input}");
853 let expected = format!("{:?}", vec![91; 10]);
854 assert_eq!(actual, expected);
855 }
856
857 #[test]
858 fn it_accepts_short_output() {
859 let input = TestVector(vec![91; 6], 10);
860
861 let actual = format!("{input}");
862 let expected = format!("{:?}", vec![91; 6]);
863 assert_eq!(actual, expected);
864 }
865
866 #[test]
867 fn it_accepts_long_output() {
868 let input = TestVector(vec![91; 10], 9);
869
870 let actual = format!("{input}");
871 let expected = format!("{:?}", vec![91; 9]);
872 assert_eq!(actual, expected);
873 }
874
875 #[test]
876 fn ser_deser_schema_result() {
877 let schema = Schema::new(vec![
878 Field::new("c1", DataType::Utf8, false),
879 Field::new("c2", DataType::Float64, true),
880 Field::new("c3", DataType::UInt32, false),
881 Field::new("c4", DataType::Boolean, true),
882 Field::new("c5", DataType::Timestamp(TimeUnit::Millisecond, None), true),
883 Field::new("c6", DataType::Time32(TimeUnit::Second), false),
884 ]);
885 let option = IpcWriteOptions::default();
888 let schema_ipc = SchemaAsIpc::new(&schema, &option);
889 let result: SchemaResult = schema_ipc.try_into().unwrap();
890 let des_schema: Schema = (&result).try_into().unwrap();
891 assert_eq!(schema, des_schema);
892
893 let option = IpcWriteOptions::try_new(8, true, MetadataVersion::V4).unwrap();
896 let schema_ipc = SchemaAsIpc::new(&schema, &option);
897 let result: SchemaResult = schema_ipc.try_into().unwrap();
898 let des_schema: Schema = (&result).try_into().unwrap();
899 assert_eq!(schema, des_schema);
900 }
901}