arrow_integration_testing/flight_client_scenarios/
auth_basic_proto.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Scenario for testing basic auth.
19
20use crate::{AUTH_PASSWORD, AUTH_USERNAME};
21
22use arrow_flight::{flight_service_client::FlightServiceClient, BasicAuth, HandshakeRequest};
23use futures::{stream, StreamExt};
24use prost::Message;
25use tonic::{metadata::MetadataValue, Request, Status};
26
27type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
28type Result<T = (), E = Error> = std::result::Result<T, E>;
29
30type Client = FlightServiceClient<tonic::transport::Channel>;
31
32/// Run a scenario that tests basic auth.
33pub async fn run_scenario(host: &str, port: u16) -> Result {
34    let url = format!("http://{host}:{port}");
35    let mut client = FlightServiceClient::connect(url).await?;
36
37    let action = arrow_flight::Action::default();
38
39    let resp = client.do_action(Request::new(action.clone())).await;
40    // This client is unauthenticated and should fail.
41    match resp {
42        Err(e) => {
43            if e.code() != tonic::Code::Unauthenticated {
44                return Err(Box::new(Status::internal(format!(
45                    "Expected UNAUTHENTICATED but got {e:?}"
46                ))));
47            }
48        }
49        Ok(other) => {
50            return Err(Box::new(Status::internal(format!(
51                "Expected UNAUTHENTICATED but got {other:?}"
52            ))));
53        }
54    }
55
56    let token = authenticate(&mut client, AUTH_USERNAME, AUTH_PASSWORD)
57        .await
58        .expect("must respond successfully from handshake");
59
60    let mut request = Request::new(action);
61    let metadata = request.metadata_mut();
62    metadata.insert_bin(
63        "auth-token-bin",
64        MetadataValue::from_bytes(token.as_bytes()),
65    );
66
67    let resp = client.do_action(request).await?;
68    let mut resp = resp.into_inner();
69
70    let r = resp
71        .next()
72        .await
73        .expect("No response received")
74        .expect("Invalid response received");
75
76    let body = std::str::from_utf8(&r.body).unwrap();
77    assert_eq!(body, AUTH_USERNAME);
78
79    Ok(())
80}
81
82async fn authenticate(client: &mut Client, username: &str, password: &str) -> Result<String> {
83    let auth = BasicAuth {
84        username: username.into(),
85        password: password.into(),
86    };
87    let mut payload = vec![];
88    auth.encode(&mut payload)?;
89
90    let req = stream::once(async {
91        HandshakeRequest {
92            payload: payload.into(),
93            ..HandshakeRequest::default()
94        }
95    });
96
97    let rx = client.handshake(Request::new(req)).await?;
98    let mut rx = rx.into_inner();
99
100    let r = rx.next().await.expect("must respond from handshake")?;
101    assert!(rx.next().await.is_none(), "must not respond a second time");
102
103    Ok(std::str::from_utf8(&r.payload).unwrap().into())
104}