1#![doc(
43 html_logo_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_white-bg.svg",
44 html_favicon_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_transparent-bg.svg"
45)]
46#![cfg_attr(docsrs, feature(doc_auto_cfg))]
47#![allow(rustdoc::invalid_html_tags)]
48#![warn(missing_docs)]
49#![allow(unused_crate_dependencies)]
51
52use arrow_ipc::{convert, writer, writer::EncodedData, writer::IpcWriteOptions};
53use arrow_schema::{ArrowError, Schema};
54
55use arrow_ipc::convert::try_schema_from_ipc_buffer;
56use base64::prelude::BASE64_STANDARD;
57use base64::Engine;
58use bytes::Bytes;
59use prost_types::Timestamp;
60use std::{fmt, ops::Deref};
61
62type ArrowResult<T> = std::result::Result<T, ArrowError>;
63
64#[allow(clippy::all)]
65mod gen {
66 #![allow(missing_docs)]
68 include!("arrow.flight.protocol.rs");
69}
70
71pub mod flight_descriptor {
73 use super::gen;
74 pub use gen::flight_descriptor::DescriptorType;
75}
76
77pub mod flight_service_client {
79 use super::gen;
80 pub use gen::flight_service_client::FlightServiceClient;
81}
82
83pub mod flight_service_server {
86 use super::gen;
87 pub use gen::flight_service_server::FlightService;
88 pub use gen::flight_service_server::FlightServiceServer;
89}
90
91pub mod client;
93pub use client::FlightClient;
94
95pub mod decode;
98
99pub mod encode;
102
103pub mod error;
105
106pub use gen::Action;
107pub use gen::ActionType;
108pub use gen::BasicAuth;
109pub use gen::CancelFlightInfoRequest;
110pub use gen::CancelFlightInfoResult;
111pub use gen::CancelStatus;
112pub use gen::Criteria;
113pub use gen::Empty;
114pub use gen::FlightData;
115pub use gen::FlightDescriptor;
116pub use gen::FlightEndpoint;
117pub use gen::FlightInfo;
118pub use gen::HandshakeRequest;
119pub use gen::HandshakeResponse;
120pub use gen::Location;
121pub use gen::PollInfo;
122pub use gen::PutResult;
123pub use gen::RenewFlightEndpointRequest;
124pub use gen::Result;
125pub use gen::SchemaResult;
126pub use gen::Ticket;
127
128mod trailers;
130
131pub mod utils;
132
133#[cfg(feature = "flight-sql")]
134pub mod sql;
135mod streams;
136
137use flight_descriptor::DescriptorType;
138
139pub struct SchemaAsIpc<'a> {
141 pub pair: (&'a Schema, &'a IpcWriteOptions),
143}
144
145#[derive(Debug)]
148pub struct IpcMessage(pub Bytes);
149
150fn flight_schema_as_encoded_data(arrow_schema: &Schema, options: &IpcWriteOptions) -> EncodedData {
153 let data_gen = writer::IpcDataGenerator::default();
154 #[allow(deprecated)]
155 let mut dict_tracker =
156 writer::DictionaryTracker::new_with_preserve_dict_id(false, options.preserve_dict_id());
157 data_gen.schema_to_bytes_with_dictionary_tracker(arrow_schema, &mut dict_tracker, options)
158}
159
160fn flight_schema_as_flatbuffer(schema: &Schema, options: &IpcWriteOptions) -> IpcMessage {
161 let encoded_data = flight_schema_as_encoded_data(schema, options);
162 IpcMessage(encoded_data.ipc_message.into())
163}
164
165impl Deref for IpcMessage {
171 type Target = [u8];
172
173 fn deref(&self) -> &Self::Target {
174 &self.0
175 }
176}
177
178impl<'a> Deref for SchemaAsIpc<'a> {
179 type Target = (&'a Schema, &'a IpcWriteOptions);
180
181 fn deref(&self) -> &Self::Target {
182 &self.pair
183 }
184}
185
186fn limited_fmt(f: &mut fmt::Formatter<'_>, value: &[u8], limit: usize) -> fmt::Result {
190 if value.len() > limit {
191 write!(f, "{:?}", &value[..limit])
192 } else {
193 write!(f, "{:?}", &value)
194 }
195}
196
197impl fmt::Display for FlightData {
198 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
199 write!(f, "FlightData {{")?;
200 write!(f, " descriptor: ")?;
201 match &self.flight_descriptor {
202 Some(d) => write!(f, "{d}")?,
203 None => write!(f, "None")?,
204 };
205 write!(f, ", header: ")?;
206 limited_fmt(f, &self.data_header, 8)?;
207 write!(f, ", metadata: ")?;
208 limited_fmt(f, &self.app_metadata, 8)?;
209 write!(f, ", body: ")?;
210 limited_fmt(f, &self.data_body, 8)?;
211 write!(f, " }}")
212 }
213}
214
215impl fmt::Display for FlightDescriptor {
216 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
217 write!(f, "FlightDescriptor {{")?;
218 write!(f, " type: ")?;
219 match self.r#type() {
220 DescriptorType::Cmd => {
221 write!(f, "cmd, value: ")?;
222 limited_fmt(f, &self.cmd, 8)?;
223 }
224 DescriptorType::Path => {
225 write!(f, "path: [")?;
226 let mut sep = "";
227 for element in &self.path {
228 write!(f, "{sep}{element}")?;
229 sep = ", ";
230 }
231 write!(f, "]")?;
232 }
233 DescriptorType::Unknown => {
234 write!(f, "unknown")?;
235 }
236 }
237 write!(f, " }}")
238 }
239}
240
241impl fmt::Display for FlightEndpoint {
242 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
243 write!(f, "FlightEndpoint {{")?;
244 write!(f, " ticket: ")?;
245 match &self.ticket {
246 Some(value) => write!(f, "{value}"),
247 None => write!(f, " None"),
248 }?;
249 write!(f, ", location: [")?;
250 let mut sep = "";
251 for location in &self.location {
252 write!(f, "{sep}{location}")?;
253 sep = ", ";
254 }
255 write!(f, "]")?;
256 write!(f, ", expiration_time:")?;
257 match &self.expiration_time {
258 Some(value) => write!(f, " {value}"),
259 None => write!(f, " None"),
260 }?;
261 write!(f, ", app_metadata: ")?;
262 limited_fmt(f, &self.app_metadata, 8)?;
263 write!(f, " }}")
264 }
265}
266
267impl fmt::Display for FlightInfo {
268 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
269 let ipc_message = IpcMessage(self.schema.clone());
270 let schema: Schema = ipc_message.try_into().map_err(|_err| fmt::Error)?;
271 write!(f, "FlightInfo {{")?;
272 write!(f, " schema: {schema}")?;
273 write!(f, ", descriptor:")?;
274 match &self.flight_descriptor {
275 Some(d) => write!(f, " {d}"),
276 None => write!(f, " None"),
277 }?;
278 write!(f, ", endpoint: [")?;
279 let mut sep = "";
280 for endpoint in &self.endpoint {
281 write!(f, "{sep}{endpoint}")?;
282 sep = ", ";
283 }
284 write!(f, "], total_records: {}", self.total_records)?;
285 write!(f, ", total_bytes: {}", self.total_bytes)?;
286 write!(f, ", ordered: {}", self.ordered)?;
287 write!(f, ", app_metadata: ")?;
288 limited_fmt(f, &self.app_metadata, 8)?;
289 write!(f, " }}")
290 }
291}
292
293impl fmt::Display for PollInfo {
294 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
295 write!(f, "PollInfo {{")?;
296 write!(f, " info:")?;
297 match &self.info {
298 Some(value) => write!(f, " {value}"),
299 None => write!(f, " None"),
300 }?;
301 write!(f, ", descriptor:")?;
302 match &self.flight_descriptor {
303 Some(d) => write!(f, " {d}"),
304 None => write!(f, " None"),
305 }?;
306 write!(f, ", progress:")?;
307 match &self.progress {
308 Some(value) => write!(f, " {value}"),
309 None => write!(f, " None"),
310 }?;
311 write!(f, ", expiration_time:")?;
312 match &self.expiration_time {
313 Some(value) => write!(f, " {value}"),
314 None => write!(f, " None"),
315 }?;
316 write!(f, " }}")
317 }
318}
319
320impl fmt::Display for CancelFlightInfoRequest {
321 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
322 write!(f, "CancelFlightInfoRequest {{")?;
323 write!(f, " info: ")?;
324 match &self.info {
325 Some(value) => write!(f, "{value}")?,
326 None => write!(f, "None")?,
327 };
328 write!(f, " }}")
329 }
330}
331
332impl fmt::Display for CancelFlightInfoResult {
333 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
334 write!(f, "CancelFlightInfoResult {{")?;
335 write!(f, " status: {}", self.status().as_str_name())?;
336 write!(f, " }}")
337 }
338}
339
340impl fmt::Display for RenewFlightEndpointRequest {
341 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
342 write!(f, "RenewFlightEndpointRequest {{")?;
343 write!(f, " endpoint: ")?;
344 match &self.endpoint {
345 Some(value) => write!(f, "{value}")?,
346 None => write!(f, "None")?,
347 };
348 write!(f, " }}")
349 }
350}
351
352impl fmt::Display for Location {
353 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
354 write!(f, "Location {{")?;
355 write!(f, " uri: ")?;
356 write!(f, "{}", self.uri)
357 }
358}
359
360impl fmt::Display for Ticket {
361 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
362 write!(f, "Ticket {{")?;
363 write!(f, " ticket: ")?;
364 write!(f, "{}", BASE64_STANDARD.encode(&self.ticket))
365 }
366}
367
368impl From<EncodedData> for FlightData {
371 fn from(data: EncodedData) -> Self {
372 FlightData {
373 data_header: data.ipc_message.into(),
374 data_body: data.arrow_data.into(),
375 ..Default::default()
376 }
377 }
378}
379
380impl From<SchemaAsIpc<'_>> for FlightData {
381 fn from(schema_ipc: SchemaAsIpc) -> Self {
382 let IpcMessage(vals) = flight_schema_as_flatbuffer(schema_ipc.0, schema_ipc.1);
383 FlightData {
384 data_header: vals,
385 ..Default::default()
386 }
387 }
388}
389
390impl TryFrom<SchemaAsIpc<'_>> for SchemaResult {
391 type Error = ArrowError;
392
393 fn try_from(schema_ipc: SchemaAsIpc) -> ArrowResult<Self> {
394 let IpcMessage(vals) = schema_to_ipc_format(schema_ipc)?;
400 Ok(SchemaResult { schema: vals })
401 }
402}
403
404impl TryFrom<SchemaAsIpc<'_>> for IpcMessage {
405 type Error = ArrowError;
406
407 fn try_from(schema_ipc: SchemaAsIpc) -> ArrowResult<Self> {
408 schema_to_ipc_format(schema_ipc)
409 }
410}
411
412fn schema_to_ipc_format(schema_ipc: SchemaAsIpc) -> ArrowResult<IpcMessage> {
413 let pair = *schema_ipc;
414 let encoded_data = flight_schema_as_encoded_data(pair.0, pair.1);
415
416 let mut schema = vec![];
417 writer::write_message(&mut schema, encoded_data, pair.1)?;
418 Ok(IpcMessage(schema.into()))
419}
420
421impl TryFrom<&FlightData> for Schema {
422 type Error = ArrowError;
423 fn try_from(data: &FlightData) -> ArrowResult<Self> {
424 convert::try_schema_from_flatbuffer_bytes(&data.data_header[..]).map_err(|err| {
425 ArrowError::ParseError(format!(
426 "Unable to convert flight data to Arrow schema: {err}"
427 ))
428 })
429 }
430}
431
432impl TryFrom<FlightInfo> for Schema {
433 type Error = ArrowError;
434
435 fn try_from(value: FlightInfo) -> ArrowResult<Self> {
436 value.try_decode_schema()
437 }
438}
439
440impl TryFrom<IpcMessage> for Schema {
441 type Error = ArrowError;
442
443 fn try_from(value: IpcMessage) -> ArrowResult<Self> {
444 try_schema_from_ipc_buffer(&value)
445 }
446}
447
448impl TryFrom<&SchemaResult> for Schema {
449 type Error = ArrowError;
450 fn try_from(data: &SchemaResult) -> ArrowResult<Self> {
451 try_schema_from_ipc_buffer(&data.schema)
452 }
453}
454
455impl TryFrom<SchemaResult> for Schema {
456 type Error = ArrowError;
457 fn try_from(data: SchemaResult) -> ArrowResult<Self> {
458 (&data).try_into()
459 }
460}
461
462impl FlightData {
465 pub fn new() -> Self {
490 Default::default()
491 }
492
493 pub fn with_descriptor(mut self, flight_descriptor: FlightDescriptor) -> Self {
495 self.flight_descriptor = Some(flight_descriptor);
496 self
497 }
498
499 pub fn with_data_header(mut self, data_header: impl Into<Bytes>) -> Self {
501 self.data_header = data_header.into();
502 self
503 }
504
505 pub fn with_data_body(mut self, data_body: impl Into<Bytes>) -> Self {
509 self.data_body = data_body.into();
510 self
511 }
512
513 pub fn with_app_metadata(mut self, app_metadata: impl Into<Bytes>) -> Self {
515 self.app_metadata = app_metadata.into();
516 self
517 }
518}
519
520impl FlightDescriptor {
521 pub fn new_cmd(cmd: impl Into<Bytes>) -> Self {
525 FlightDescriptor {
526 r#type: DescriptorType::Cmd.into(),
527 cmd: cmd.into(),
528 ..Default::default()
529 }
530 }
531
532 pub fn new_path(path: Vec<String>) -> Self {
536 FlightDescriptor {
537 r#type: DescriptorType::Path.into(),
538 path,
539 ..Default::default()
540 }
541 }
542}
543
544impl FlightInfo {
545 pub fn new() -> FlightInfo {
571 FlightInfo {
572 schema: Bytes::new(),
573 flight_descriptor: None,
574 endpoint: vec![],
575 ordered: false,
576 total_records: -1,
580 total_bytes: -1,
581 app_metadata: Bytes::new(),
582 }
583 }
584
585 pub fn try_decode_schema(self) -> ArrowResult<Schema> {
587 let msg = IpcMessage(self.schema);
588 msg.try_into()
589 }
590
591 pub fn try_with_schema(mut self, schema: &Schema) -> ArrowResult<Self> {
598 let options = IpcWriteOptions::default();
599 let IpcMessage(schema) = SchemaAsIpc::new(schema, &options).try_into()?;
600 self.schema = schema;
601 Ok(self)
602 }
603
604 pub fn with_endpoint(mut self, endpoint: FlightEndpoint) -> Self {
606 self.endpoint.push(endpoint);
607 self
608 }
609
610 pub fn with_descriptor(mut self, flight_descriptor: FlightDescriptor) -> Self {
612 self.flight_descriptor = Some(flight_descriptor);
613 self
614 }
615
616 pub fn with_total_records(mut self, total_records: i64) -> Self {
618 self.total_records = total_records;
619 self
620 }
621
622 pub fn with_total_bytes(mut self, total_bytes: i64) -> Self {
624 self.total_bytes = total_bytes;
625 self
626 }
627
628 pub fn with_ordered(mut self, ordered: bool) -> Self {
632 self.ordered = ordered;
633 self
634 }
635
636 pub fn with_app_metadata(mut self, app_metadata: impl Into<Bytes>) -> Self {
638 self.app_metadata = app_metadata.into();
639 self
640 }
641}
642
643impl PollInfo {
644 pub fn new() -> Self {
661 Self {
662 info: None,
663 flight_descriptor: None,
664 progress: None,
665 expiration_time: None,
666 }
667 }
668
669 pub fn with_info(mut self, info: FlightInfo) -> Self {
671 self.info = Some(info);
672 self
673 }
674
675 pub fn with_descriptor(mut self, flight_descriptor: FlightDescriptor) -> Self {
678 self.flight_descriptor = Some(flight_descriptor);
679 self
680 }
681
682 pub fn try_with_progress(mut self, progress: f64) -> ArrowResult<Self> {
685 if !(0.0..=1.0).contains(&progress) {
686 return Err(ArrowError::InvalidArgumentError(format!(
687 "PollInfo progress must be in the range [0.0, 1.0], got {progress}"
688 )));
689 }
690 self.progress = Some(progress);
691 Ok(self)
692 }
693
694 pub fn with_expiration_time(mut self, expiration_time: Timestamp) -> Self {
696 self.expiration_time = Some(expiration_time);
697 self
698 }
699}
700
701impl<'a> SchemaAsIpc<'a> {
702 pub fn new(schema: &'a Schema, options: &'a IpcWriteOptions) -> Self {
704 SchemaAsIpc {
705 pair: (schema, options),
706 }
707 }
708}
709
710impl CancelFlightInfoRequest {
711 pub fn new(info: FlightInfo) -> Self {
714 Self { info: Some(info) }
715 }
716}
717
718impl CancelFlightInfoResult {
719 pub fn new(status: CancelStatus) -> Self {
721 Self {
722 status: status as i32,
723 }
724 }
725}
726
727impl RenewFlightEndpointRequest {
728 pub fn new(endpoint: FlightEndpoint) -> Self {
731 Self {
732 endpoint: Some(endpoint),
733 }
734 }
735}
736
737impl Action {
738 pub fn new(action_type: impl Into<String>, body: impl Into<Bytes>) -> Self {
740 Self {
741 r#type: action_type.into(),
742 body: body.into(),
743 }
744 }
745}
746
747impl Result {
748 pub fn new(body: impl Into<Bytes>) -> Self {
750 Self { body: body.into() }
751 }
752}
753
754impl Ticket {
755 pub fn new(ticket: impl Into<Bytes>) -> Self {
764 Self {
765 ticket: ticket.into(),
766 }
767 }
768}
769
770impl FlightEndpoint {
771 pub fn new() -> FlightEndpoint {
790 Default::default()
791 }
792
793 pub fn with_ticket(mut self, ticket: Ticket) -> Self {
795 self.ticket = Some(ticket);
796 self
797 }
798
799 pub fn with_location(mut self, uri: impl Into<String>) -> Self {
811 self.location.push(Location { uri: uri.into() });
812 self
813 }
814
815 pub fn with_expiration_time(mut self, expiration_time: Timestamp) -> Self {
817 self.expiration_time = Some(expiration_time);
818 self
819 }
820
821 pub fn with_app_metadata(mut self, app_metadata: impl Into<Bytes>) -> Self {
823 self.app_metadata = app_metadata.into();
824 self
825 }
826}
827
828#[cfg(test)]
829mod tests {
830 use super::*;
831 use arrow_ipc::MetadataVersion;
832 use arrow_schema::{DataType, Field, TimeUnit};
833
834 struct TestVector(Vec<u8>, usize);
835
836 impl fmt::Display for TestVector {
837 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
838 limited_fmt(f, &self.0, self.1)
839 }
840 }
841
842 #[test]
843 fn it_creates_flight_descriptor_command() {
844 let expected_cmd = "my_command".as_bytes();
845 let fd = FlightDescriptor::new_cmd(expected_cmd.to_vec());
846 assert_eq!(fd.r#type(), DescriptorType::Cmd);
847 assert_eq!(fd.cmd, expected_cmd.to_vec());
848 }
849
850 #[test]
851 fn it_accepts_equal_output() {
852 let input = TestVector(vec![91; 10], 10);
853
854 let actual = format!("{input}");
855 let expected = format!("{:?}", vec![91; 10]);
856 assert_eq!(actual, expected);
857 }
858
859 #[test]
860 fn it_accepts_short_output() {
861 let input = TestVector(vec![91; 6], 10);
862
863 let actual = format!("{input}");
864 let expected = format!("{:?}", vec![91; 6]);
865 assert_eq!(actual, expected);
866 }
867
868 #[test]
869 fn it_accepts_long_output() {
870 let input = TestVector(vec![91; 10], 9);
871
872 let actual = format!("{input}");
873 let expected = format!("{:?}", vec![91; 9]);
874 assert_eq!(actual, expected);
875 }
876
877 #[test]
878 fn ser_deser_schema_result() {
879 let schema = Schema::new(vec![
880 Field::new("c1", DataType::Utf8, false),
881 Field::new("c2", DataType::Float64, true),
882 Field::new("c3", DataType::UInt32, false),
883 Field::new("c4", DataType::Boolean, true),
884 Field::new("c5", DataType::Timestamp(TimeUnit::Millisecond, None), true),
885 Field::new("c6", DataType::Time32(TimeUnit::Second), false),
886 ]);
887 let option = IpcWriteOptions::default();
890 let schema_ipc = SchemaAsIpc::new(&schema, &option);
891 let result: SchemaResult = schema_ipc.try_into().unwrap();
892 let des_schema: Schema = (&result).try_into().unwrap();
893 assert_eq!(schema, des_schema);
894
895 let option = IpcWriteOptions::try_new(8, true, MetadataVersion::V4).unwrap();
898 let schema_ipc = SchemaAsIpc::new(&schema, &option);
899 let result: SchemaResult = schema_ipc.try_into().unwrap();
900 let des_schema: Schema = (&result).try_into().unwrap();
901 assert_eq!(schema, des_schema);
902 }
903
904 #[test]
905 fn test_dict_schema() {
906 let schema = Schema::new(vec![
908 Field::new(
909 "a",
910 DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)),
911 false,
912 ),
913 Field::new(
914 "b",
915 DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)),
916 false,
917 ),
918 ]);
919
920 let flight_info = FlightInfo::new().try_with_schema(&schema).unwrap();
921
922 let new_schema = Schema::try_from(flight_info).unwrap();
923 assert_eq!(schema, new_schema);
924 }
925}