1#![doc(
41 html_logo_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_white-bg.svg",
42 html_favicon_url = "https://arrow.apache.org/img/arrow-logo_chevrons_black-txt_transparent-bg.svg"
43)]
44#![cfg_attr(docsrs, feature(doc_cfg))]
45#![allow(rustdoc::invalid_html_tags)]
46#![warn(missing_docs)]
47#![allow(unused_crate_dependencies)]
49
50use arrow_ipc::{convert, writer, writer::EncodedData, writer::IpcWriteOptions};
51use arrow_schema::{ArrowError, Schema};
52
53use arrow_ipc::convert::try_schema_from_ipc_buffer;
54use base64::Engine;
55use base64::prelude::BASE64_STANDARD;
56use bytes::Bytes;
57use prost_types::Timestamp;
58use std::{fmt, ops::Deref};
59
60type ArrowResult<T> = std::result::Result<T, ArrowError>;
61
62#[allow(clippy::all)]
63mod r#gen {
64 #![allow(missing_docs)]
66 include!("arrow.flight.protocol.rs");
67}
68
69pub mod flight_descriptor {
71 use super::r#gen;
72 pub use r#gen::flight_descriptor::DescriptorType;
73}
74
75pub mod flight_service_client {
77 use super::r#gen;
78 pub use r#gen::flight_service_client::FlightServiceClient;
79}
80
81pub mod flight_service_server {
84 use super::r#gen;
85 pub use r#gen::flight_service_server::FlightService;
86 pub use r#gen::flight_service_server::FlightServiceServer;
87}
88
89pub mod client;
91pub use client::FlightClient;
92
93pub mod decode;
96
97pub mod encode;
100
101pub mod error;
103
104pub use r#gen::Action;
105pub use r#gen::ActionType;
106pub use r#gen::BasicAuth;
107pub use r#gen::CancelFlightInfoRequest;
108pub use r#gen::CancelFlightInfoResult;
109pub use r#gen::CancelStatus;
110pub use r#gen::Criteria;
111pub use r#gen::Empty;
112pub use r#gen::FlightData;
113pub use r#gen::FlightDescriptor;
114pub use r#gen::FlightEndpoint;
115pub use r#gen::FlightInfo;
116pub use r#gen::HandshakeRequest;
117pub use r#gen::HandshakeResponse;
118pub use r#gen::Location;
119pub use r#gen::PollInfo;
120pub use r#gen::PutResult;
121pub use r#gen::RenewFlightEndpointRequest;
122pub use r#gen::Result;
123pub use r#gen::SchemaResult;
124pub use r#gen::Ticket;
125
126mod trailers;
128
129pub mod utils;
130
131#[cfg(feature = "flight-sql")]
132pub mod sql;
133mod streams;
134
135use flight_descriptor::DescriptorType;
136
137pub struct SchemaAsIpc<'a> {
139 pub pair: (&'a Schema, &'a IpcWriteOptions),
141}
142
143#[derive(Debug)]
146pub struct IpcMessage(pub Bytes);
147
148fn flight_schema_as_encoded_data(arrow_schema: &Schema, options: &IpcWriteOptions) -> EncodedData {
151 let data_gen = writer::IpcDataGenerator::default();
152 let mut dict_tracker = writer::DictionaryTracker::new(false);
153 data_gen.schema_to_bytes_with_dictionary_tracker(arrow_schema, &mut dict_tracker, options)
154}
155
156fn flight_schema_as_flatbuffer(schema: &Schema, options: &IpcWriteOptions) -> IpcMessage {
157 let encoded_data = flight_schema_as_encoded_data(schema, options);
158 IpcMessage(encoded_data.ipc_message.into())
159}
160
161impl Deref for IpcMessage {
167 type Target = [u8];
168
169 fn deref(&self) -> &Self::Target {
170 &self.0
171 }
172}
173
174impl<'a> Deref for SchemaAsIpc<'a> {
175 type Target = (&'a Schema, &'a IpcWriteOptions);
176
177 fn deref(&self) -> &Self::Target {
178 &self.pair
179 }
180}
181
182fn limited_fmt(f: &mut fmt::Formatter<'_>, value: &[u8], limit: usize) -> fmt::Result {
186 if value.len() > limit {
187 write!(f, "{:?}", &value[..limit])
188 } else {
189 write!(f, "{:?}", &value)
190 }
191}
192
193impl fmt::Display for FlightData {
194 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
195 write!(f, "FlightData {{")?;
196 write!(f, " descriptor: ")?;
197 match &self.flight_descriptor {
198 Some(d) => write!(f, "{d}")?,
199 None => write!(f, "None")?,
200 };
201 write!(f, ", header: ")?;
202 limited_fmt(f, &self.data_header, 8)?;
203 write!(f, ", metadata: ")?;
204 limited_fmt(f, &self.app_metadata, 8)?;
205 write!(f, ", body: ")?;
206 limited_fmt(f, &self.data_body, 8)?;
207 write!(f, " }}")
208 }
209}
210
211impl fmt::Display for FlightDescriptor {
212 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
213 write!(f, "FlightDescriptor {{")?;
214 write!(f, " type: ")?;
215 match self.r#type() {
216 DescriptorType::Cmd => {
217 write!(f, "cmd, value: ")?;
218 limited_fmt(f, &self.cmd, 8)?;
219 }
220 DescriptorType::Path => {
221 write!(f, "path: [")?;
222 let mut sep = "";
223 for element in &self.path {
224 write!(f, "{sep}{element}")?;
225 sep = ", ";
226 }
227 write!(f, "]")?;
228 }
229 DescriptorType::Unknown => {
230 write!(f, "unknown")?;
231 }
232 }
233 write!(f, " }}")
234 }
235}
236
237impl fmt::Display for FlightEndpoint {
238 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
239 write!(f, "FlightEndpoint {{")?;
240 write!(f, " ticket: ")?;
241 match &self.ticket {
242 Some(value) => write!(f, "{value}"),
243 None => write!(f, " None"),
244 }?;
245 write!(f, ", location: [")?;
246 let mut sep = "";
247 for location in &self.location {
248 write!(f, "{sep}{location}")?;
249 sep = ", ";
250 }
251 write!(f, "]")?;
252 write!(f, ", expiration_time:")?;
253 match &self.expiration_time {
254 Some(value) => write!(f, " {value}"),
255 None => write!(f, " None"),
256 }?;
257 write!(f, ", app_metadata: ")?;
258 limited_fmt(f, &self.app_metadata, 8)?;
259 write!(f, " }}")
260 }
261}
262
263impl fmt::Display for FlightInfo {
264 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
265 let ipc_message = IpcMessage(self.schema.clone());
266 let schema: Schema = ipc_message.try_into().map_err(|_err| fmt::Error)?;
267 write!(f, "FlightInfo {{")?;
268 write!(f, " schema: {schema}")?;
269 write!(f, ", descriptor:")?;
270 match &self.flight_descriptor {
271 Some(d) => write!(f, " {d}"),
272 None => write!(f, " None"),
273 }?;
274 write!(f, ", endpoint: [")?;
275 let mut sep = "";
276 for endpoint in &self.endpoint {
277 write!(f, "{sep}{endpoint}")?;
278 sep = ", ";
279 }
280 write!(f, "], total_records: {}", self.total_records)?;
281 write!(f, ", total_bytes: {}", self.total_bytes)?;
282 write!(f, ", ordered: {}", self.ordered)?;
283 write!(f, ", app_metadata: ")?;
284 limited_fmt(f, &self.app_metadata, 8)?;
285 write!(f, " }}")
286 }
287}
288
289impl fmt::Display for PollInfo {
290 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
291 write!(f, "PollInfo {{")?;
292 write!(f, " info:")?;
293 match &self.info {
294 Some(value) => write!(f, " {value}"),
295 None => write!(f, " None"),
296 }?;
297 write!(f, ", descriptor:")?;
298 match &self.flight_descriptor {
299 Some(d) => write!(f, " {d}"),
300 None => write!(f, " None"),
301 }?;
302 write!(f, ", progress:")?;
303 match &self.progress {
304 Some(value) => write!(f, " {value}"),
305 None => write!(f, " None"),
306 }?;
307 write!(f, ", expiration_time:")?;
308 match &self.expiration_time {
309 Some(value) => write!(f, " {value}"),
310 None => write!(f, " None"),
311 }?;
312 write!(f, " }}")
313 }
314}
315
316impl fmt::Display for CancelFlightInfoRequest {
317 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
318 write!(f, "CancelFlightInfoRequest {{")?;
319 write!(f, " info: ")?;
320 match &self.info {
321 Some(value) => write!(f, "{value}")?,
322 None => write!(f, "None")?,
323 };
324 write!(f, " }}")
325 }
326}
327
328impl fmt::Display for CancelFlightInfoResult {
329 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
330 write!(f, "CancelFlightInfoResult {{")?;
331 write!(f, " status: {}", self.status().as_str_name())?;
332 write!(f, " }}")
333 }
334}
335
336impl fmt::Display for RenewFlightEndpointRequest {
337 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
338 write!(f, "RenewFlightEndpointRequest {{")?;
339 write!(f, " endpoint: ")?;
340 match &self.endpoint {
341 Some(value) => write!(f, "{value}")?,
342 None => write!(f, "None")?,
343 };
344 write!(f, " }}")
345 }
346}
347
348impl fmt::Display for Location {
349 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
350 write!(f, "Location {{")?;
351 write!(f, " uri: ")?;
352 write!(f, "{}", self.uri)
353 }
354}
355
356impl fmt::Display for Ticket {
357 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
358 write!(f, "Ticket {{")?;
359 write!(f, " ticket: ")?;
360 write!(f, "{}", BASE64_STANDARD.encode(&self.ticket))
361 }
362}
363
364impl From<EncodedData> for FlightData {
367 fn from(data: EncodedData) -> Self {
368 FlightData {
369 data_header: data.ipc_message.into(),
370 data_body: data.arrow_data.into(),
371 ..Default::default()
372 }
373 }
374}
375
376impl From<SchemaAsIpc<'_>> for FlightData {
377 fn from(schema_ipc: SchemaAsIpc) -> Self {
378 let IpcMessage(vals) = flight_schema_as_flatbuffer(schema_ipc.0, schema_ipc.1);
379 FlightData {
380 data_header: vals,
381 ..Default::default()
382 }
383 }
384}
385
386impl TryFrom<SchemaAsIpc<'_>> for SchemaResult {
387 type Error = ArrowError;
388
389 fn try_from(schema_ipc: SchemaAsIpc) -> ArrowResult<Self> {
390 let IpcMessage(vals) = schema_to_ipc_format(schema_ipc)?;
396 Ok(SchemaResult { schema: vals })
397 }
398}
399
400impl TryFrom<SchemaAsIpc<'_>> for IpcMessage {
401 type Error = ArrowError;
402
403 fn try_from(schema_ipc: SchemaAsIpc) -> ArrowResult<Self> {
404 schema_to_ipc_format(schema_ipc)
405 }
406}
407
408fn schema_to_ipc_format(schema_ipc: SchemaAsIpc) -> ArrowResult<IpcMessage> {
409 let pair = *schema_ipc;
410 let encoded_data = flight_schema_as_encoded_data(pair.0, pair.1);
411
412 let mut schema = vec![];
413 writer::write_message(&mut schema, encoded_data, pair.1)?;
414 Ok(IpcMessage(schema.into()))
415}
416
417impl TryFrom<&FlightData> for Schema {
418 type Error = ArrowError;
419 fn try_from(data: &FlightData) -> ArrowResult<Self> {
420 convert::try_schema_from_flatbuffer_bytes(&data.data_header[..]).map_err(|err| {
421 ArrowError::ParseError(format!(
422 "Unable to convert flight data to Arrow schema: {err}"
423 ))
424 })
425 }
426}
427
428impl TryFrom<FlightInfo> for Schema {
429 type Error = ArrowError;
430
431 fn try_from(value: FlightInfo) -> ArrowResult<Self> {
432 value.try_decode_schema()
433 }
434}
435
436impl TryFrom<IpcMessage> for Schema {
437 type Error = ArrowError;
438
439 fn try_from(value: IpcMessage) -> ArrowResult<Self> {
440 try_schema_from_ipc_buffer(&value)
441 }
442}
443
444impl TryFrom<&SchemaResult> for Schema {
445 type Error = ArrowError;
446 fn try_from(data: &SchemaResult) -> ArrowResult<Self> {
447 try_schema_from_ipc_buffer(&data.schema)
448 }
449}
450
451impl TryFrom<SchemaResult> for Schema {
452 type Error = ArrowError;
453 fn try_from(data: SchemaResult) -> ArrowResult<Self> {
454 (&data).try_into()
455 }
456}
457
458impl FlightData {
461 pub fn new() -> Self {
486 Default::default()
487 }
488
489 pub fn with_descriptor(mut self, flight_descriptor: FlightDescriptor) -> Self {
491 self.flight_descriptor = Some(flight_descriptor);
492 self
493 }
494
495 pub fn with_data_header(mut self, data_header: impl Into<Bytes>) -> Self {
497 self.data_header = data_header.into();
498 self
499 }
500
501 pub fn with_data_body(mut self, data_body: impl Into<Bytes>) -> Self {
505 self.data_body = data_body.into();
506 self
507 }
508
509 pub fn with_app_metadata(mut self, app_metadata: impl Into<Bytes>) -> Self {
511 self.app_metadata = app_metadata.into();
512 self
513 }
514}
515
516impl FlightDescriptor {
517 pub fn new_cmd(cmd: impl Into<Bytes>) -> Self {
521 FlightDescriptor {
522 r#type: DescriptorType::Cmd.into(),
523 cmd: cmd.into(),
524 ..Default::default()
525 }
526 }
527
528 pub fn new_path(path: Vec<String>) -> Self {
532 FlightDescriptor {
533 r#type: DescriptorType::Path.into(),
534 path,
535 ..Default::default()
536 }
537 }
538}
539
540impl FlightInfo {
541 pub fn new() -> FlightInfo {
567 FlightInfo {
568 schema: Bytes::new(),
569 flight_descriptor: None,
570 endpoint: vec![],
571 ordered: false,
572 total_records: -1,
576 total_bytes: -1,
577 app_metadata: Bytes::new(),
578 }
579 }
580
581 pub fn try_decode_schema(self) -> ArrowResult<Schema> {
583 let msg = IpcMessage(self.schema);
584 msg.try_into()
585 }
586
587 pub fn try_with_schema(mut self, schema: &Schema) -> ArrowResult<Self> {
594 let options = IpcWriteOptions::default();
595 let IpcMessage(schema) = SchemaAsIpc::new(schema, &options).try_into()?;
596 self.schema = schema;
597 Ok(self)
598 }
599
600 pub fn with_endpoint(mut self, endpoint: FlightEndpoint) -> Self {
602 self.endpoint.push(endpoint);
603 self
604 }
605
606 pub fn with_endpoints(mut self, endpoints: Vec<FlightEndpoint>) -> Self {
608 self.endpoint = endpoints;
609 self
610 }
611
612 pub fn with_descriptor(mut self, flight_descriptor: FlightDescriptor) -> Self {
614 self.flight_descriptor = Some(flight_descriptor);
615 self
616 }
617
618 pub fn with_total_records(mut self, total_records: i64) -> Self {
620 self.total_records = total_records;
621 self
622 }
623
624 pub fn with_total_bytes(mut self, total_bytes: i64) -> Self {
626 self.total_bytes = total_bytes;
627 self
628 }
629
630 pub fn with_ordered(mut self, ordered: bool) -> Self {
634 self.ordered = ordered;
635 self
636 }
637
638 pub fn with_app_metadata(mut self, app_metadata: impl Into<Bytes>) -> Self {
640 self.app_metadata = app_metadata.into();
641 self
642 }
643}
644
645impl PollInfo {
646 pub fn new() -> Self {
663 Self {
664 info: None,
665 flight_descriptor: None,
666 progress: None,
667 expiration_time: None,
668 }
669 }
670
671 pub fn with_info(mut self, info: FlightInfo) -> Self {
673 self.info = Some(info);
674 self
675 }
676
677 pub fn with_descriptor(mut self, flight_descriptor: FlightDescriptor) -> Self {
680 self.flight_descriptor = Some(flight_descriptor);
681 self
682 }
683
684 pub fn try_with_progress(mut self, progress: f64) -> ArrowResult<Self> {
687 if !(0.0..=1.0).contains(&progress) {
688 return Err(ArrowError::InvalidArgumentError(format!(
689 "PollInfo progress must be in the range [0.0, 1.0], got {progress}"
690 )));
691 }
692 self.progress = Some(progress);
693 Ok(self)
694 }
695
696 pub fn with_expiration_time(mut self, expiration_time: Timestamp) -> Self {
698 self.expiration_time = Some(expiration_time);
699 self
700 }
701}
702
703impl<'a> SchemaAsIpc<'a> {
704 pub fn new(schema: &'a Schema, options: &'a IpcWriteOptions) -> Self {
706 SchemaAsIpc {
707 pair: (schema, options),
708 }
709 }
710}
711
712impl CancelFlightInfoRequest {
713 pub fn new(info: FlightInfo) -> Self {
716 Self { info: Some(info) }
717 }
718}
719
720impl CancelFlightInfoResult {
721 pub fn new(status: CancelStatus) -> Self {
723 Self {
724 status: status as i32,
725 }
726 }
727}
728
729impl RenewFlightEndpointRequest {
730 pub fn new(endpoint: FlightEndpoint) -> Self {
733 Self {
734 endpoint: Some(endpoint),
735 }
736 }
737}
738
739impl Action {
740 pub fn new(action_type: impl Into<String>, body: impl Into<Bytes>) -> Self {
742 Self {
743 r#type: action_type.into(),
744 body: body.into(),
745 }
746 }
747}
748
749impl Result {
750 pub fn new(body: impl Into<Bytes>) -> Self {
752 Self { body: body.into() }
753 }
754}
755
756impl Ticket {
757 pub fn new(ticket: impl Into<Bytes>) -> Self {
766 Self {
767 ticket: ticket.into(),
768 }
769 }
770}
771
772impl FlightEndpoint {
773 pub fn new() -> FlightEndpoint {
792 Default::default()
793 }
794
795 pub fn with_ticket(mut self, ticket: Ticket) -> Self {
797 self.ticket = Some(ticket);
798 self
799 }
800
801 pub fn with_location(mut self, uri: impl Into<String>) -> Self {
813 self.location.push(Location { uri: uri.into() });
814 self
815 }
816
817 pub fn with_expiration_time(mut self, expiration_time: Timestamp) -> Self {
819 self.expiration_time = Some(expiration_time);
820 self
821 }
822
823 pub fn with_app_metadata(mut self, app_metadata: impl Into<Bytes>) -> Self {
825 self.app_metadata = app_metadata.into();
826 self
827 }
828}
829
830#[cfg(test)]
831mod tests {
832 use super::*;
833 use arrow_ipc::MetadataVersion;
834 use arrow_schema::{DataType, Field, TimeUnit};
835
836 struct TestVector(Vec<u8>, usize);
837
838 impl fmt::Display for TestVector {
839 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
840 limited_fmt(f, &self.0, self.1)
841 }
842 }
843
844 #[test]
845 fn it_creates_flight_descriptor_command() {
846 let expected_cmd = "my_command".as_bytes();
847 let fd = FlightDescriptor::new_cmd(expected_cmd.to_vec());
848 assert_eq!(fd.r#type(), DescriptorType::Cmd);
849 assert_eq!(fd.cmd, expected_cmd.to_vec());
850 }
851
852 #[test]
853 fn it_accepts_equal_output() {
854 let input = TestVector(vec![91; 10], 10);
855
856 let actual = format!("{input}");
857 let expected = format!("{:?}", vec![91; 10]);
858 assert_eq!(actual, expected);
859 }
860
861 #[test]
862 fn it_accepts_short_output() {
863 let input = TestVector(vec![91; 6], 10);
864
865 let actual = format!("{input}");
866 let expected = format!("{:?}", vec![91; 6]);
867 assert_eq!(actual, expected);
868 }
869
870 #[test]
871 fn it_accepts_long_output() {
872 let input = TestVector(vec![91; 10], 9);
873
874 let actual = format!("{input}");
875 let expected = format!("{:?}", vec![91; 9]);
876 assert_eq!(actual, expected);
877 }
878
879 #[test]
880 fn ser_deser_schema_result() {
881 let schema = Schema::new(vec![
882 Field::new("c1", DataType::Utf8, false),
883 Field::new("c2", DataType::Float64, true),
884 Field::new("c3", DataType::UInt32, false),
885 Field::new("c4", DataType::Boolean, true),
886 Field::new("c5", DataType::Timestamp(TimeUnit::Millisecond, None), true),
887 Field::new("c6", DataType::Time32(TimeUnit::Second), false),
888 ]);
889 let option = IpcWriteOptions::default();
892 let schema_ipc = SchemaAsIpc::new(&schema, &option);
893 let result: SchemaResult = schema_ipc.try_into().unwrap();
894 let des_schema: Schema = (&result).try_into().unwrap();
895 assert_eq!(schema, des_schema);
896
897 let option = IpcWriteOptions::try_new(8, true, MetadataVersion::V4).unwrap();
900 let schema_ipc = SchemaAsIpc::new(&schema, &option);
901 let result: SchemaResult = schema_ipc.try_into().unwrap();
902 let des_schema: Schema = (&result).try_into().unwrap();
903 assert_eq!(schema, des_schema);
904 }
905
906 #[test]
907 fn test_dict_schema() {
908 let schema = Schema::new(vec![
910 Field::new(
911 "a",
912 DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)),
913 false,
914 ),
915 Field::new(
916 "b",
917 DataType::Dictionary(Box::new(DataType::UInt16), Box::new(DataType::Utf8)),
918 false,
919 ),
920 ]);
921
922 let flight_info = FlightInfo::new().try_with_schema(&schema).unwrap();
923
924 let new_schema = Schema::try_from(flight_info).unwrap();
925 assert_eq!(schema, new_schema);
926 }
927}