arrow_flight/
trailers.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::{
19    pin::Pin,
20    sync::{Arc, Mutex},
21    task::{Context, Poll},
22};
23
24use futures::{ready, FutureExt, Stream, StreamExt};
25use tonic::{metadata::MetadataMap, Status, Streaming};
26
27/// Extract [`LazyTrailers`] from [`Streaming`] [tonic] response.
28///
29/// Note that [`LazyTrailers`] has inner mutability and will only hold actual data after [`ExtractTrailersStream`] is
30/// fully consumed (dropping it is not required though).
31pub fn extract_lazy_trailers<T>(s: Streaming<T>) -> (ExtractTrailersStream<T>, LazyTrailers) {
32    let trailers: SharedTrailers = Default::default();
33    let stream = ExtractTrailersStream {
34        inner: s,
35        trailers: Arc::clone(&trailers),
36    };
37    let lazy_trailers = LazyTrailers { trailers };
38    (stream, lazy_trailers)
39}
40
41type SharedTrailers = Arc<Mutex<Option<MetadataMap>>>;
42
43/// [Stream] that stores the gRPC trailers into [`LazyTrailers`].
44///
45/// See [`extract_lazy_trailers`] for construction.
46#[derive(Debug)]
47pub struct ExtractTrailersStream<T> {
48    inner: Streaming<T>,
49    trailers: SharedTrailers,
50}
51
52impl<T> Stream for ExtractTrailersStream<T> {
53    type Item = Result<T, Status>;
54
55    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
56        let res = ready!(self.inner.poll_next_unpin(cx));
57
58        if res.is_none() {
59            // stream exhausted => trailers should available
60            if let Some(trailers) = self
61                .inner
62                .trailers()
63                .now_or_never()
64                .and_then(|res| res.ok())
65                .flatten()
66            {
67                *self.trailers.lock().expect("poisoned") = Some(trailers);
68            }
69        }
70
71        Poll::Ready(res)
72    }
73
74    fn size_hint(&self) -> (usize, Option<usize>) {
75        self.inner.size_hint()
76    }
77}
78
79/// gRPC trailers that are extracted by [`ExtractTrailersStream`].
80///
81/// See [`extract_lazy_trailers`] for construction.
82#[derive(Debug)]
83pub struct LazyTrailers {
84    trailers: SharedTrailers,
85}
86
87impl LazyTrailers {
88    /// gRPC trailers that are known at the end of a stream.
89    pub fn get(&self) -> Option<MetadataMap> {
90        self.trailers.lock().expect("poisoned").clone()
91    }
92}