arrow_flight/trailers.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::{
19 pin::Pin,
20 sync::{Arc, Mutex},
21 task::{Context, Poll},
22};
23
24use futures::{ready, FutureExt, Stream, StreamExt};
25use tonic::{metadata::MetadataMap, Status, Streaming};
26
27/// Extract [`LazyTrailers`] from [`Streaming`] [tonic] response.
28///
29/// Note that [`LazyTrailers`] has inner mutability and will only hold actual data after [`ExtractTrailersStream`] is
30/// fully consumed (dropping it is not required though).
31pub fn extract_lazy_trailers<T>(s: Streaming<T>) -> (ExtractTrailersStream<T>, LazyTrailers) {
32 let trailers: SharedTrailers = Default::default();
33 let stream = ExtractTrailersStream {
34 inner: s,
35 trailers: Arc::clone(&trailers),
36 };
37 let lazy_trailers = LazyTrailers { trailers };
38 (stream, lazy_trailers)
39}
40
41type SharedTrailers = Arc<Mutex<Option<MetadataMap>>>;
42
43/// [Stream] that stores the gRPC trailers into [`LazyTrailers`].
44///
45/// See [`extract_lazy_trailers`] for construction.
46#[derive(Debug)]
47pub struct ExtractTrailersStream<T> {
48 inner: Streaming<T>,
49 trailers: SharedTrailers,
50}
51
52impl<T> Stream for ExtractTrailersStream<T> {
53 type Item = Result<T, Status>;
54
55 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
56 let res = ready!(self.inner.poll_next_unpin(cx));
57
58 if res.is_none() {
59 // stream exhausted => trailers should available
60 if let Some(trailers) = self
61 .inner
62 .trailers()
63 .now_or_never()
64 .and_then(|res| res.ok())
65 .flatten()
66 {
67 *self.trailers.lock().expect("poisoned") = Some(trailers);
68 }
69 }
70
71 Poll::Ready(res)
72 }
73
74 fn size_hint(&self) -> (usize, Option<usize>) {
75 self.inner.size_hint()
76 }
77}
78
79/// gRPC trailers that are extracted by [`ExtractTrailersStream`].
80///
81/// See [`extract_lazy_trailers`] for construction.
82#[derive(Debug)]
83pub struct LazyTrailers {
84 trailers: SharedTrailers,
85}
86
87impl LazyTrailers {
88 /// gRPC trailers that are known at the end of a stream.
89 pub fn get(&self) -> Option<MetadataMap> {
90 self.trailers.lock().expect("poisoned").clone()
91 }
92}