Parquet column encryption#

The following example defines a arrow::Table instance, and then writes it to an encrypted Parquet file. Metadata footer and columns are encrypted with different encryption keys.

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "arrow/api.h"
#include "arrow/dataset/file_parquet.h"
#include "arrow/dataset/parquet_encryption_config.h"
#include "arrow/filesystem/localfs.h"
#include "parquet/encryption/crypto_factory.h"
#include "parquet/encryption/test_in_memory_kms.h"

#include <iostream>

namespace fs = arrow::fs;

namespace ds = arrow::dataset;

arrow::Result<std::shared_ptr<arrow::Table>> GetTable() {
  auto int_builder = arrow::Int32Builder();

  std::shared_ptr<arrow::Array> arr_i;
  ARROW_RETURN_NOT_OK(int_builder.AppendValues({1, 3, 5, 7, 1}));
  ARROW_RETURN_NOT_OK(int_builder.Finish(&arr_i));

  auto struct_type = arrow::struct_({{"a", arrow::int32()}, {"b", arrow::int64()}});
  auto pool = arrow::default_memory_pool();
  auto a_builder = std::make_shared<arrow::Int32Builder>();
  auto b_builder = std::make_shared<arrow::Int64Builder>();
  auto struct_builder = arrow::StructBuilder(struct_type, pool, {a_builder, b_builder});

  std::shared_ptr<arrow::Array> arr_struct;
  ARROW_RETURN_NOT_OK(struct_builder.Append());
  ARROW_RETURN_NOT_OK(a_builder->Append(2));
  ARROW_RETURN_NOT_OK(b_builder->Append(20));
  ARROW_RETURN_NOT_OK(struct_builder.Append());
  ARROW_RETURN_NOT_OK(a_builder->Append(4));
  ARROW_RETURN_NOT_OK(b_builder->Append(40));
  ARROW_RETURN_NOT_OK(struct_builder.Append());
  ARROW_RETURN_NOT_OK(a_builder->Append(6));
  ARROW_RETURN_NOT_OK(b_builder->Append(60));
  ARROW_RETURN_NOT_OK(struct_builder.Append());
  ARROW_RETURN_NOT_OK(a_builder->Append(8));
  ARROW_RETURN_NOT_OK(b_builder->Append(80));
  ARROW_RETURN_NOT_OK(struct_builder.Append());
  ARROW_RETURN_NOT_OK(a_builder->Append(10));
  ARROW_RETURN_NOT_OK(b_builder->Append(100));
  ARROW_RETURN_NOT_OK(struct_builder.Finish(&arr_struct));

  auto map_type = arrow::map(arrow::int32(), arrow::utf8());
  auto key_builder = std::make_shared<arrow::Int32Builder>();
  auto item_builder = std::make_shared<arrow::StringBuilder>();
  auto map_builder = arrow::MapBuilder(pool, key_builder, item_builder, map_type);

  std::shared_ptr<arrow::Array> arr_map;
  ARROW_RETURN_NOT_OK(map_builder.Append());
  ARROW_RETURN_NOT_OK(key_builder->AppendValues({2, 4}));
  ARROW_RETURN_NOT_OK(item_builder->AppendValues({"2", "4"}));
  ARROW_RETURN_NOT_OK(map_builder.Append());
  ARROW_RETURN_NOT_OK(key_builder->AppendValues({6}));
  ARROW_RETURN_NOT_OK(item_builder->AppendValues({"6"}));
  ARROW_RETURN_NOT_OK(map_builder.Append());
  ARROW_RETURN_NOT_OK(map_builder.Append());
  ARROW_RETURN_NOT_OK(key_builder->AppendValues({8, 10}));
  ARROW_RETURN_NOT_OK(item_builder->AppendValues({"8", "10"}));
  ARROW_RETURN_NOT_OK(map_builder.Append());
  ARROW_RETURN_NOT_OK(map_builder.Finish(&arr_map));

  auto list_type = arrow::list(arrow::int32());
  auto value_builder = std::make_shared<arrow::Int32Builder>();
  auto list_builder = arrow::ListBuilder(pool, value_builder, list_type);

  std::shared_ptr<arrow::Array> arr_list;
  ARROW_RETURN_NOT_OK(list_builder.Append());
  ARROW_RETURN_NOT_OK(value_builder->AppendValues({1, 2, 3}));
  ARROW_RETURN_NOT_OK(list_builder.Append());
  ARROW_RETURN_NOT_OK(value_builder->AppendValues({4, 5, 6}));
  ARROW_RETURN_NOT_OK(list_builder.Append());
  ARROW_RETURN_NOT_OK(value_builder->AppendValues({7}));
  ARROW_RETURN_NOT_OK(list_builder.Append());
  ARROW_RETURN_NOT_OK(value_builder->AppendValues({8}));
  ARROW_RETURN_NOT_OK(list_builder.Append());
  ARROW_RETURN_NOT_OK(list_builder.Finish(&arr_list));

  auto schema = arrow::schema({
      arrow::field("i", arrow::int32()),
      arrow::field("s", struct_type),
      arrow::field("m", map_type),
      arrow::field("l", list_type),
  });

  return arrow::Table::Make(schema, {arr_i, arr_struct, arr_map, arr_list});
}

std::shared_ptr<parquet::encryption::CryptoFactory> GetCryptoFactory() {
  // Configure KMS.
  std::unordered_map<std::string, std::string> key_map;
  key_map.emplace("footerKeyId", "0123456789012345");
  key_map.emplace("columnKeyId", "1234567890123456");

  auto crypto_factory = std::make_shared<parquet::encryption::CryptoFactory>();
  auto kms_client_factory =
      // for testing only, do not use it as an example of KmsClientFactory implementation
      std::make_shared<parquet::encryption::TestOnlyInMemoryKmsClientFactory>(
          /*wrap_locally=*/true, key_map);
  crypto_factory->RegisterKmsClientFactory(std::move(kms_client_factory));
  return crypto_factory;
}

arrow::Status WriteEncryptedFile(const std::string& path_to_file) {
  using arrow::internal::checked_pointer_cast;

  // Get a configured crypto factory and kms connection conf.
  auto crypto_factory = GetCryptoFactory();
  auto kms_connection_config =
      std::make_shared<parquet::encryption::KmsConnectionConfig>();

  // Set write options with encryption configuration.
  auto encryption_config = std::make_shared<parquet::encryption::EncryptionConfiguration>(
      std::string("footerKeyId"));
  encryption_config->column_keys =
      "columnKeyId: i, s.a, s.b, m.key_value.key, m.key_value.value, l.list.element";

  auto parquet_encryption_config = std::make_shared<ds::ParquetEncryptionConfig>();
  // Directly assign shared_ptr objects to ParquetEncryptionConfig members.
  parquet_encryption_config->crypto_factory = crypto_factory;
  parquet_encryption_config->kms_connection_config = kms_connection_config;
  parquet_encryption_config->encryption_config = std::move(encryption_config);

  auto file_format = std::make_shared<ds::ParquetFileFormat>();
  auto parquet_file_write_options = checked_pointer_cast<ds::ParquetFileWriteOptions>(
      file_format->DefaultWriteOptions());
  parquet_file_write_options->parquet_encryption_config =
      std::move(parquet_encryption_config);

  // Write dataset.
  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<arrow::Table> table, GetTable());
  printf("%s", table->ToString().c_str());
  auto dataset = std::make_shared<ds::InMemoryDataset>(table);
  ARROW_ASSIGN_OR_RAISE(auto scanner_builder, dataset->NewScan());
  ARROW_ASSIGN_OR_RAISE(auto scanner, scanner_builder->Finish());

  auto file_system = std::make_shared<fs::LocalFileSystem>();
  auto partitioning = std::make_shared<ds::HivePartitioning>(
      arrow::schema({arrow::field("part", arrow::utf8())}));

  ds::FileSystemDatasetWriteOptions write_options;
  write_options.file_write_options = parquet_file_write_options;
  write_options.filesystem = file_system;
  write_options.base_dir = path_to_file;
  write_options.partitioning = partitioning;
  write_options.basename_template = "part{i}.parquet";
  return ds::FileSystemDataset::Write(write_options, std::move(scanner));
}

arrow::Status ReadEncryptedFile(const std::string& path_to_file) {
  // Get a configured crypto factory and kms connection conf
  auto crypto_factory = GetCryptoFactory();
  auto kms_connection_config =
      std::make_shared<parquet::encryption::KmsConnectionConfig>();

  // Create decryption properties.
  auto decryption_config =
      std::make_shared<parquet::encryption::DecryptionConfiguration>();
  auto parquet_decryption_config = std::make_shared<ds::ParquetDecryptionConfig>();
  parquet_decryption_config->crypto_factory = crypto_factory;
  parquet_decryption_config->kms_connection_config = kms_connection_config;
  parquet_decryption_config->decryption_config = std::move(decryption_config);

  // Set scan options.
  auto parquet_scan_options = std::make_shared<ds::ParquetFragmentScanOptions>();
  parquet_scan_options->parquet_decryption_config = std::move(parquet_decryption_config);

  // Get configured Parquet file format
  auto file_format = std::make_shared<ds::ParquetFileFormat>();
  file_format->default_fragment_scan_options = std::move(parquet_scan_options);

  // Get the FileSystem.
  auto file_system = std::make_shared<fs::LocalFileSystem>();

  // Get FileInfo objects for all files under the base directory
  fs::FileSelector selector;
  selector.base_dir = path_to_file;
  selector.recursive = true;

  // Create the dataset
  ds::FileSystemFactoryOptions factory_options;
  ARROW_ASSIGN_OR_RAISE(auto dataset_factory,
                        ds::FileSystemDatasetFactory::Make(file_system, selector,
                                                           file_format, factory_options));
  ARROW_ASSIGN_OR_RAISE(auto dataset, dataset_factory->Finish());
  ARROW_ASSIGN_OR_RAISE(auto scanner_builder, dataset->NewScan());
  ARROW_ASSIGN_OR_RAISE(auto scanner, scanner_builder->Finish());
  ARROW_ASSIGN_OR_RAISE(auto table, scanner->ToTable());
  std::cout << "Table size: " << table->num_rows() << "\n";
  return arrow::Status::OK();
}

arrow::Status RunExamples(const std::string& path_to_file) {
  ARROW_RETURN_NOT_OK(WriteEncryptedFile(path_to_file));
  ARROW_RETURN_NOT_OK(ReadEncryptedFile(path_to_file));
  return arrow::Status::OK();
}

int main(int argc, char** argv) {
  if (argc != 2) {
    // Fake success for CI purposes.
    return EXIT_SUCCESS;
  }

  std::string path_to_file = argv[1];
  arrow::Status status = RunExamples(path_to_file);

  if (!status.ok()) {
    std::cerr << "Error occurred: " << status.message() << std::endl;
    return EXIT_FAILURE;
  }
  return EXIT_SUCCESS;
}