arrow_integration_testing/flight_client_scenarios/
auth_basic_proto.rs1use crate::{AUTH_PASSWORD, AUTH_USERNAME};
21
22use arrow_flight::{flight_service_client::FlightServiceClient, BasicAuth, HandshakeRequest};
23use futures::{stream, StreamExt};
24use prost::Message;
25use tonic::{metadata::MetadataValue, Request, Status};
26
27type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
28type Result<T = (), E = Error> = std::result::Result<T, E>;
29
30type Client = FlightServiceClient<tonic::transport::Channel>;
31
32pub async fn run_scenario(host: &str, port: u16) -> Result {
34 let url = format!("http://{host}:{port}");
35 let mut client = FlightServiceClient::connect(url).await?;
36
37 let action = arrow_flight::Action::default();
38
39 let resp = client.do_action(Request::new(action.clone())).await;
40 match resp {
42 Err(e) => {
43 if e.code() != tonic::Code::Unauthenticated {
44 return Err(Box::new(Status::internal(format!(
45 "Expected UNAUTHENTICATED but got {e:?}"
46 ))));
47 }
48 }
49 Ok(other) => {
50 return Err(Box::new(Status::internal(format!(
51 "Expected UNAUTHENTICATED but got {other:?}"
52 ))));
53 }
54 }
55
56 let token = authenticate(&mut client, AUTH_USERNAME, AUTH_PASSWORD)
57 .await
58 .expect("must respond successfully from handshake");
59
60 let mut request = Request::new(action);
61 let metadata = request.metadata_mut();
62 metadata.insert_bin(
63 "auth-token-bin",
64 MetadataValue::from_bytes(token.as_bytes()),
65 );
66
67 let resp = client.do_action(request).await?;
68 let mut resp = resp.into_inner();
69
70 let r = resp
71 .next()
72 .await
73 .expect("No response received")
74 .expect("Invalid response received");
75
76 let body = std::str::from_utf8(&r.body).unwrap();
77 assert_eq!(body, AUTH_USERNAME);
78
79 Ok(())
80}
81
82async fn authenticate(client: &mut Client, username: &str, password: &str) -> Result<String> {
83 let auth = BasicAuth {
84 username: username.into(),
85 password: password.into(),
86 };
87 let mut payload = vec![];
88 auth.encode(&mut payload)?;
89
90 let req = stream::once(async {
91 HandshakeRequest {
92 payload: payload.into(),
93 ..HandshakeRequest::default()
94 }
95 });
96
97 let rx = client.handshake(Request::new(req)).await?;
98 let mut rx = rx.into_inner();
99
100 let r = rx.next().await.expect("must respond from handshake")?;
101 assert!(rx.next().await.is_none(), "must not respond a second time");
102
103 Ok(std::str::from_utf8(&r.payload).unwrap().into())
104}