.. Licensed to the Apache Software Foundation (ASF) under one .. or more contributor license agreements. See the NOTICE file .. distributed with this work for additional information .. regarding copyright ownership. The ASF licenses this file .. to you under the Apache License, Version 2.0 (the .. "License"); you may not use this file except in compliance .. with the License. You may obtain a copy of the License at .. http://www.apache.org/licenses/LICENSE-2.0 .. Unless required by applicable law or agreed to in writing, .. software distributed under the License is distributed on an .. "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY .. KIND, either express or implied. See the License for the .. specific language governing permissions and limitations .. under the License. ============ Arrow Flight ============ Recipes related to leveraging Arrow Flight protocol .. contents:: Simple Parquet storage service with Arrow Flight ================================================ Suppose you want to implement a service that can store, send and receive Parquet files using the Arrow Flight protocol, ``pyarrow`` provides an implementation framework in :mod:`pyarrow.flight` and particularly through the :class:`pyarrow.flight.FlightServerBase` class. .. testcode:: import pathlib import pyarrow as pa import pyarrow.flight import pyarrow.parquet class FlightServer(pa.flight.FlightServerBase): def __init__(self, location="grpc://0.0.0.0:8815", repo=pathlib.Path("./datasets"), **kwargs): super(FlightServer, self).__init__(location, **kwargs) self._location = location self._repo = repo def _make_flight_info(self, dataset): dataset_path = self._repo / dataset schema = pa.parquet.read_schema(dataset_path) metadata = pa.parquet.read_metadata(dataset_path) descriptor = pa.flight.FlightDescriptor.for_path( dataset.encode('utf-8') ) endpoints = [pa.flight.FlightEndpoint(dataset, [self._location])] return pyarrow.flight.FlightInfo(schema, descriptor, endpoints, metadata.num_rows, metadata.serialized_size) def list_flights(self, context, criteria): for dataset in self._repo.iterdir(): yield self._make_flight_info(dataset.name) def get_flight_info(self, context, descriptor): return self._make_flight_info(descriptor.path[0].decode('utf-8')) def do_put(self, context, descriptor, reader, writer): dataset = descriptor.path[0].decode('utf-8') dataset_path = self._repo / dataset data_table = reader.read_all() pa.parquet.write_table(data_table, dataset_path) def do_get(self, context, ticket): dataset = ticket.ticket.decode('utf-8') dataset_path = self._repo / dataset return pa.flight.RecordBatchStream(pa.parquet.read_table(dataset_path)) def list_actions(self, context): return [ ("drop_dataset", "Delete a dataset."), ] def do_action(self, context, action): if action.type == "drop_dataset": self.do_drop_dataset(action.body.to_pybytes().decode('utf-8')) else: raise NotImplementedError def do_drop_dataset(self, dataset): dataset_path = self._repo / dataset dataset_path.unlink() The example server exposes :meth:`pyarrow.flight.FlightServerBase.list_flights` which is the method in charge of returning the list of data streams available for fetching. Likewise, :meth:`pyarrow.flight.FlightServerBase.get_flight_info` provides the information regarding a single specific data stream. Then we expose :meth:`pyarrow.flight.FlightServerBase.do_get` which is in charge of actually fetching the exposed data streams and sending them to the client. Allowing to list and download data streams would be pretty useless if we didn't expose a way to create them, this is the responsibility of :meth:`pyarrow.flight.FlightServerBase.do_put` which is in charge of receiving new data from the client and dealing with it (in this case saving it into a parquet file) This are the most common Arrow Flight requests, if we need to add more functionalities, we can do so using custom actions. In the previous example a ``drop_dataset`` custom action is added. All custom actions are executed through the :meth:`pyarrow.flight.FlightServerBase.do_action` method, thus it's up to the server subclass to dispatch them properly. In this case we invoke the `do_drop_dataset` method when the `action.type` is the one we expect. Our server can then be started with :meth:`pyarrow.flight.FlightServerBase.serve` .. code-block:: if __name__ == '__main__': server = FlightServer() server._repo.mkdir(exist_ok=True) server.serve() .. testcode:: :hide: # Code block to start for real a server in background # and wait for it to be available. # Previous code block is just to show to user how to start it. import tempfile repo = tempfile.TemporaryDirectory(prefix="arrow-cookbook-flight") server = FlightServer(repo=pathlib.Path(repo.name)) pa.flight.connect("grpc://0.0.0.0:8815").wait_for_available() Once the server is started we can build a client to perform requests to it .. testcode:: import pyarrow as pa import pyarrow.flight client = pa.flight.connect("grpc://0.0.0.0:8815") We can create a new table and upload it so that it gets stored in a new parquet file: .. testcode:: # Upload a new dataset data_table = pa.table( [["Mario", "Luigi", "Peach"]], names=["Character"] ) upload_descriptor = pa.flight.FlightDescriptor.for_path("uploaded.parquet") writer, _ = client.do_put(upload_descriptor, data_table.schema) writer.write_table(data_table) writer.close() Once uploaded we should be able to retrieve the metadata for our newly uploaded table: .. testcode:: # Retrieve metadata of newly uploaded dataset flight = client.get_flight_info(upload_descriptor) descriptor = flight.descriptor print("Path:", descriptor.path[0].decode('utf-8'), "Rows:", flight.total_records, "Size:", flight.total_bytes) print("=== Schema ===") print(flight.schema) print("==============") .. testoutput:: Path: uploaded.parquet Rows: 3 Size: ... === Schema === Character: string ============== And we can fetch the content of the dataset: .. testcode:: # Read content of the dataset reader = client.do_get(flight.endpoints[0].ticket) read_table = reader.read_all() print(read_table.to_pandas().head()) .. testoutput:: Character 0 Mario 1 Luigi 2 Peach Once we finished we can invoke our custom action to delete the dataset we newly uploaded: .. testcode:: # Drop the newly uploaded dataset client.do_action(pa.flight.Action("drop_dataset", "uploaded.parquet".encode('utf-8'))) .. testcode:: :hide: # Deal with a bug in do_action, see ARROW-14255 # can be removed once 6.0.0 is released. try: list(client.do_action(pa.flight.Action("drop_dataset", "uploaded.parquet".encode('utf-8')))) except: pass To confirm our dataset was deleted, we might list all parquet files that are currently stored by the server: .. testcode:: # List existing datasets. for flight in client.list_flights(): descriptor = flight.descriptor print("Path:", descriptor.path[0].decode('utf-8'), "Rows:", flight.total_records, "Size:", flight.total_bytes) print("=== Schema ===") print(flight.schema) print("==============") print("") .. testcode:: :hide: # Shutdown the server server.shutdown() repo.cleanup() Streaming Parquet Storage Service ================================= We can improve the Parquet storage service and avoid holding entire datasets in memory by streaming data. Flight readers and writers, like others in PyArrow, can be iterated through, so let's update the server from before to take advantage of this: .. testcode:: import pathlib import pyarrow as pa import pyarrow.flight import pyarrow.parquet class FlightServer(pa.flight.FlightServerBase): def __init__(self, location="grpc://0.0.0.0:8815", repo=pathlib.Path("./datasets"), **kwargs): super(FlightServer, self).__init__(location, **kwargs) self._location = location self._repo = repo def _make_flight_info(self, dataset): dataset_path = self._repo / dataset schema = pa.parquet.read_schema(dataset_path) metadata = pa.parquet.read_metadata(dataset_path) descriptor = pa.flight.FlightDescriptor.for_path( dataset.encode('utf-8') ) endpoints = [pa.flight.FlightEndpoint(dataset, [self._location])] return pyarrow.flight.FlightInfo(schema, descriptor, endpoints, metadata.num_rows, metadata.serialized_size) def list_flights(self, context, criteria): for dataset in self._repo.iterdir(): yield self._make_flight_info(dataset.name) def get_flight_info(self, context, descriptor): return self._make_flight_info(descriptor.path[0].decode('utf-8')) def do_put(self, context, descriptor, reader, writer): dataset = descriptor.path[0].decode('utf-8') dataset_path = self._repo / dataset # Read the uploaded data and write to Parquet incrementally with dataset_path.open("wb") as sink: with pa.parquet.ParquetWriter(sink, reader.schema) as writer: for chunk in reader: writer.write_table(pa.Table.from_batches([chunk.data])) def do_get(self, context, ticket): dataset = ticket.ticket.decode('utf-8') # Stream data from a file dataset_path = self._repo / dataset reader = pa.parquet.ParquetFile(dataset_path) return pa.flight.GeneratorStream( reader.schema_arrow, reader.iter_batches()) def list_actions(self, context): return [ ("drop_dataset", "Delete a dataset."), ] def do_action(self, context, action): if action.type == "drop_dataset": self.do_drop_dataset(action.body.to_pybytes().decode('utf-8')) else: raise NotImplementedError def do_drop_dataset(self, dataset): dataset_path = self._repo / dataset dataset_path.unlink() First, we've modified :meth:`pyarrow.flight.FlightServerBase.do_put`. Instead of reading all the uploaded data into a :class:`pyarrow.Table` before writing, we instead iterate through each batch as it comes and add it to a Parquet file. Then, we've modified :meth:`pyarrow.flight.FlightServerBase.do_get` to stream data to the client. This uses :class:`pyarrow.flight.GeneratorStream`, which takes a schema and any iterable or iterator. Flight then iterates through and sends each record batch to the client, allowing us to handle even large Parquet files that don't fit into memory. While GeneratorStream has the advantage that it can stream data, that means Flight must call back into Python for each record batch to send. In contrast, RecordBatchStream requires that all data is in-memory up front, but once created, all data transfer is handled purely in C++, without needing to call Python code. Let's give the server a spin. As before, we'll start the server: .. code-block:: if __name__ == '__main__': server = FlightServer() server._repo.mkdir(exist_ok=True) server.serve() .. testcode:: :hide: # Code block to start for real a server in background # and wait for it to be available. # Previous code block is just to show to user how to start it. import tempfile repo = tempfile.TemporaryDirectory(prefix="arrow-cookbook-flight") server = FlightServer(repo=pathlib.Path(repo.name)) pa.flight.connect("grpc://0.0.0.0:8815").wait_for_available() We create a client, and this time, we'll write batches to the writer, as if we had a stream of data instead of a table in memory: .. testcode:: import pyarrow as pa import pyarrow.flight client = pa.flight.connect("grpc://0.0.0.0:8815") # Upload a new dataset NUM_BATCHES = 1024 ROWS_PER_BATCH = 4096 upload_descriptor = pa.flight.FlightDescriptor.for_path("streamed.parquet") batch = pa.record_batch([ pa.array(range(ROWS_PER_BATCH)), ], names=["ints"]) writer, _ = client.do_put(upload_descriptor, batch.schema) with writer: for _ in range(NUM_BATCHES): writer.write_batch(batch) As before, we can then read it back. Again, we'll read each batch from the stream as it arrives, instead of reading them all into a table: .. testcode:: # Read content of the dataset flight = client.get_flight_info(upload_descriptor) reader = client.do_get(flight.endpoints[0].ticket) total_rows = 0 for chunk in reader: total_rows += chunk.data.num_rows print("Got", total_rows, "rows total, expected", NUM_BATCHES * ROWS_PER_BATCH) .. testoutput:: Got 4194304 rows total, expected 4194304 .. testcode:: :hide: # Shutdown the server server.shutdown() repo.cleanup() Authentication with user/password ================================= Often, services need a way to authenticate the user and identify who they are. Flight provides :doc:`several ways to implement authentication `; the simplest uses a user-password scheme. At startup, the client authenticates itself with the server using a username and password. The server returns an authorization token to include on future requests. .. warning:: Authentication should only be used over a secure encrypted channel, i.e. TLS should be enabled. .. note:: While the scheme is described as "`(HTTP) basic authentication`_", it does not actually implement HTTP authentication (RFC 7325) per se. While Flight provides some interfaces to implement such a scheme, the server must provide the actual implementation, as demonstrated below. **The implementation here is not secure and is provided as a minimal example only.** .. testcode:: import base64 import secrets import pyarrow as pa import pyarrow.flight class EchoServer(pa.flight.FlightServerBase): """A simple server that just echoes any requests from DoAction.""" def do_action(self, context, action): return [action.type.encode("utf-8"), action.body] class BasicAuthServerMiddlewareFactory(pa.flight.ServerMiddlewareFactory): """ Middleware that implements username-password authentication. Parameters ---------- creds: Dict[str, str] A dictionary of username-password values to accept. """ def __init__(self, creds): self.creds = creds # Map generated bearer tokens to users self.tokens = {} def start_call(self, info, headers): """Validate credentials at the start of every call.""" # Search for the authentication header (case-insensitive) auth_header = None for header in headers: if header.lower() == "authorization": auth_header = headers[header][0] break if not auth_header: raise pa.flight.FlightUnauthenticatedError("No credentials supplied") # The header has the structure "AuthType TokenValue", e.g. # "Basic " or "Bearer ". auth_type, _, value = auth_header.partition(" ") if auth_type == "Basic": # Initial "login". The user provided a username/password # combination encoded in the same way as HTTP Basic Auth. decoded = base64.b64decode(value).decode("utf-8") username, _, password = decoded.partition(':') if not password or password != self.creds.get(username): raise pa.flight.FlightUnauthenticatedError("Unknown user or invalid password") # Generate a secret, random bearer token for future calls. token = secrets.token_urlsafe(32) self.tokens[token] = username return BasicAuthServerMiddleware(token) elif auth_type == "Bearer": # An actual call. Validate the bearer token. username = self.tokens.get(value) if username is None: raise pa.flight.FlightUnauthenticatedError("Invalid token") return BasicAuthServerMiddleware(value) raise pa.flight.FlightUnauthenticatedError("No credentials supplied") class BasicAuthServerMiddleware(pa.flight.ServerMiddleware): """Middleware that implements username-password authentication.""" def __init__(self, token): self.token = token def sending_headers(self): """Return the authentication token to the client.""" return {"authorization": f"Bearer {self.token}"} class NoOpAuthHandler(pa.flight.ServerAuthHandler): """ A handler that implements username-password authentication. This is required only so that the server will respond to the internal Handshake RPC call, which the client calls when authenticate_basic_token is called. Otherwise, it should be a no-op as the actual authentication is implemented in middleware. """ def authenticate(self, outgoing, incoming): pass def is_valid(self, token): return "" We can then start the server: .. code-block:: if __name__ == '__main__': server = EchoServer( auth_handler=NoOpAuthHandler(), location="grpc://0.0.0.0:8816", middleware={ "basic": BasicAuthServerMiddlewareFactory({ "test": "password", }) }, ) server.serve() .. testcode:: :hide: # Code block to start for real a server in background # and wait for it to be available. # Previous code block is just to show to user how to start it. import threading server = EchoServer( auth_handler=NoOpAuthHandler(), location="grpc://0.0.0.0:8816", middleware={ "basic": BasicAuthServerMiddlewareFactory({ "test": "password", }) }, ) t = threading.Thread(target=server.serve) t.start() Then, we can make a client and log in: .. testcode:: import pyarrow as pa import pyarrow.flight client = pa.flight.connect("grpc://0.0.0.0:8816") token_pair = client.authenticate_basic_token(b'test', b'password') print(token_pair) .. testoutput:: (b'authorization', b'Bearer ...') For future calls, we include the authentication token with the call: .. testcode:: action = pa.flight.Action("echo", b"Hello, world!") options = pa.flight.FlightCallOptions(headers=[token_pair]) for response in client.do_action(action=action, options=options): print(response.body.to_pybytes()) .. testoutput:: b'echo' b'Hello, world!' If we fail to do so, we get an authentication error: .. testcode:: try: list(client.do_action(action=action)) except pa.flight.FlightUnauthenticatedError as e: print("Unauthenticated:", e) else: raise RuntimeError("Expected call to fail") .. testoutput:: Unauthenticated: No credentials supplied. Detail: Unauthenticated Or if we use the wrong credentials on login, we also get an error: .. testcode:: try: client.authenticate_basic_token(b'invalid', b'password') except pa.flight.FlightUnauthenticatedError as e: print("Unauthenticated:", e) else: raise RuntimeError("Expected call to fail") .. testoutput:: Unauthenticated: Unknown user or invalid password. Detail: Unauthenticated .. testcode:: :hide: # Shutdown the server server.shutdown() .. _(HTTP) basic authentication: https://developer.mozilla.org/en-US/docs/Web/HTTP/Authentication#basic_authentication_scheme Securing connections with TLS ================================= Following on from the previous scenario where traffic to the server is managed via a username and password, HTTPS (more specifically TLS) communication allows an additional layer of security by encrypting messages between the client and server. This is achieved using certificates. During development, the easiest approach is developing with self-signed certificates. At startup, the server loads the public and private key and the client authenticates the server with the TLS root certificate. .. note:: In production environments it is recommended to make use of a certificate signed by a certificate authority. **Step 1 - Generating the Self Signed Certificate** Generate a self-signed certificate by using dotnet on `Windows`_, or `openssl`_ on Linux or MacOS. Alternatively, the self-signed certificate from the `Arrow testing data repository`_ can be used. Depending on the file generated, you may need to convert it to a .crt and .key file as required for the Arrow server. One method to achieve this is openssl, please visit this `IBM article`_ for more info. **Step 2 - Running a server with TLS enabled** The code below is a minimal working example of an Arrow server used to receive data with TLS. .. testcode:: import argparse import pyarrow import pyarrow.flight class FlightServer(pyarrow.flight.FlightServerBase): def __init__(self, host="localhost", location=None, tls_certificates=None, verify_client=False, root_certificates=None, auth_handler=None): super(FlightServer, self).__init__( location, auth_handler, tls_certificates, verify_client, root_certificates) self.flights = {} @classmethod def descriptor_to_key(self, descriptor): return (descriptor.descriptor_type.value, descriptor.command, tuple(descriptor.path or tuple())) def do_put(self, context, descriptor, reader, writer): key = FlightServer.descriptor_to_key(descriptor) print(key) self.flights[key] = reader.read_all() print(self.flights[key]) def main(): parser = argparse.ArgumentParser() parser.add_argument("--tls", nargs=2, default=None, metavar=('CERTFILE', 'KEYFILE')) args = parser.parse_args() tls_certificates = [] scheme = "grpc+tls" host = "localhost" port = "5005" with open(args.tls[0], "rb") as cert_file: tls_cert_chain = cert_file.read() with open(args.tls[1], "rb") as key_file: tls_private_key = key_file.read() tls_certificates.append((tls_cert_chain, tls_private_key)) location = "{}://{}:{}".format(scheme, host, port) server = FlightServer(host, location, tls_certificates=tls_certificates) print("Serving on", location) server.serve() if __name__ == '__main__': main() Running the server, you should see ``Serving on grpc+tls://localhost:5005``. **Step 3 - Securely Connecting to the Server** Suppose we want to connect to the client and push some data to it. The following code securely sends information to the server using TLS encryption. .. testcode:: import argparse import pyarrow import pyarrow.flight import pandas as pd # Assumes incoming data object is a Pandas Dataframe def push_to_server(name, data, client): object_to_send = pyarrow.Table.from_pandas(data) writer, _ = client.do_put(pyarrow.flight.FlightDescriptor.for_path(name), object_to_send.schema) writer.write_table(object_to_send) writer.close() def main(): parser = argparse.ArgumentParser() parser.add_argument('--tls-roots', default=None, help='Path to trusted TLS certificate(s)') parser.add_argument('--host', default="localhost", help='Host endpoint') parser.add_argument('--port', default=5005, help='Host port') args = parser.parse_args() kwargs = {} with open(args.tls_roots, "rb") as root_certs: kwargs["tls_root_certs"] = root_certs.read() client = pyarrow.flight.FlightClient(f"grpc+tls://{args.host}:{args.port}", **kwargs) data = {'Animal': ['Dog', 'Cat', 'Mouse'], 'Size': ['Big', 'Small', 'Tiny']} df = pd.DataFrame(data, columns=['Animal', 'Size']) push_to_server("AnimalData", df, client) if __name__ == '__main__': try: main() except Exception as e: print(e) Running the client script, you should see the server printing out information about the data it just received. .. _IBM article: https://www.ibm.com/docs/en/arl/9.7?topic=certification-extracting-certificate-keys-from-pfx-file .. _Windows: https://docs.microsoft.com/en-us/dotnet/core/additional-tools/self-signed-certificates-guide .. _Arrow testing data repository: https://github.com/apache/arrow-testing/tree/master/data/flight .. _openssl: https://www.ibm.com/docs/en/api-connect/2018.x?topic=overview-generating-self-signed-certificate-using-openssl Propagating OpenTelemetry Traces ================================ Distributed tracing with OpenTelemetry_ allows collecting call-level performance measurements across a Flight service. In order to correlate spans across a Flight client and server, trace context must be passed between the two. This can be passed manually through headers in :class:`pyarrow.flight.FlightCallOptions`, or can be automatically propagated using middleware. This example shows how to accomplish trace propagation through middleware. The client middleware needs to inject the trace context into the call headers. The server middleware needs to extract the trace context from the headers and pass the context into a new span. Optionally, the client middleware can also create a new span to time the client-side call. .. _OpenTelemetry: https://opentelemetry.io/docs/instrumentation/python/getting-started/ **Step 1: define the client middleware:** .. testcode:: import pyarrow.flight as flight from opentelemetry import trace from opentelemetry.propagate import inject from opentelemetry.trace.status import StatusCode class ClientTracingMiddlewareFactory(flight.ClientMiddlewareFactory): def __init__(self): self._tracer = trace.get_tracer(__name__) def start_call(self, info): span = self._tracer.start_span(f"client.{info.method}") return ClientTracingMiddleware(span) class ClientTracingMiddleware(flight.ClientMiddleware): def __init__(self, span): self._span = span def sending_headers(self): ctx = trace.set_span_in_context(self._span) carrier = {} inject(carrier=carrier, context=ctx) return carrier def call_completed(self, exception): if exception: self._span.record_exception(exception) self._span.set_status(StatusCode.ERROR) print(exception) else: self._span.set_status(StatusCode.OK) self._span.end() **Step 2: define the server middleware:** .. testcode:: import pyarrow.flight as flight from opentelemetry import trace from opentelemetry.propagate import extract from opentelemetry.trace.status import StatusCode class ServerTracingMiddlewareFactory(flight.ServerMiddlewareFactory): def __init__(self): self._tracer = trace.get_tracer(__name__) def start_call(self, info, headers): context = extract(headers) span = self._tracer.start_span(f"server.{info.method}", context=context) return ServerTracingMiddleware(span) class ServerTracingMiddleware(flight.ServerMiddleware): def __init__(self, span): self._span = span def call_completed(self, exception): if exception: self._span.record_exception(exception) self._span.set_status(StatusCode.ERROR) print(exception) else: self._span.set_status(StatusCode.OK) self._span.end() **Step 3: configure the trace exporter, processor, and provider:** Both the server and client will need to be configured with the OpenTelemetry SDK to record spans and export them somewhere. For the sake of the example, we'll collect the spans into a Python list, but this is normally where you would set them up to be exported to some service like `Jaeger`_. See other examples of exporters at `OpenTelemetry Exporters`_. As part of this, you will need to define the resource where spans are running. At a minimum this is the service name, but it could include other information like a hostname, process id, service version, and operating system. .. _Jaeger: https://www.jaegertracing.io/ .. _`OpenTelemetry Exporters`: https://opentelemetry.io/docs/instrumentation/python/exporters/ .. testcode:: from opentelemetry import trace from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import SimpleSpanProcessor from opentelemetry.sdk.resources import SERVICE_NAME, Resource from opentelemetry.sdk.trace.export import SpanExporter, SpanExportResult class TestSpanExporter(SpanExporter): def __init__(self): self.spans = [] def export(self, spans): self.spans.extend(spans) return SpanExportResult.SUCCESS def configure_tracing(): # Service name is required for most backends, # and although it's not necessary for console export, # it's good to set service name anyways. resource = Resource(attributes={ SERVICE_NAME: "my-service" }) exporter = TestSpanExporter() provider = TracerProvider(resource=resource) processor = SimpleSpanProcessor(exporter) provider.add_span_processor(processor) trace.set_tracer_provider(provider) return exporter **Step 4: add the middleware to the server:** We can use the middleware now in our EchoServer from earlier. .. code-block:: if __name__ == '__main__': exporter = configure_tracing() server = EchoServer( location="grpc://0.0.0.0:8816", middleware={ "tracing": ServerTracingMiddlewareFactory() }, ) server.serve() .. testcode:: :hide: # Code block to start for real a server in background # and wait for it to be available. # Previous code block is just to show to user how to start it. import threading exporter = configure_tracing() server = EchoServer( location="grpc://0.0.0.0:8816", middleware={ "tracing": ServerTracingMiddlewareFactory() }, ) t = threading.Thread(target=server.serve) t.start() **Step 5: add the middleware to the client:** .. testcode:: client = pa.flight.connect( "grpc://0.0.0.0:8816", middleware=[ClientTracingMiddlewareFactory()], ) **Step 6: use the client within active spans:** When we make a call with our client within an OpenTelemetry span, our client middleware will create a child span for the client-side Flight call and then propagate the span context to the server. Our server middleware will pick up that trace context and create another child span. .. testcode:: from opentelemetry import trace # Client would normally also need to configure tracing, but for this example # the client and server are running in the same Python process. # exporter = configure_tracing() tracer = trace.get_tracer(__name__) with tracer.start_as_current_span("hello_world") as span: action = pa.flight.Action("echo", b"Hello, world!") # Call list() on do_action to drain all results. list(client.do_action(action=action)) print(f"There are {len(exporter.spans)} spans.") print(f"The span names are:\n {list(span.name for span in exporter.spans)}.") print(f"The span status codes are:\n " f"{list(span.status.status_code for span in exporter.spans)}.") .. testoutput:: There are 3 spans. The span names are: ['server.FlightMethod.DO_ACTION', 'client.FlightMethod.DO_ACTION', 'hello_world']. The span status codes are: [, , ]. As expected, we have three spans: one in our client code, one in the client middleware, and one in the server middleware. .. testcode:: :hide: # Shutdown the server server.shutdown()