Source code for pyarrow.fs

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

"""
FileSystem abstraction to interact with various local and remote filesystems.
"""

from pyarrow.util import _is_path_like, _stringify_path

from pyarrow._fs import (  # noqa
    FileSelector,
    FileType,
    FileInfo,
    FileSystem,
    LocalFileSystem,
    SubTreeFileSystem,
    _MockFileSystem,
    FileSystemHandler,
    PyFileSystem,
    _copy_files,
    _copy_files_selector,
)

# For backward compatibility.
FileStats = FileInfo

_not_imported = []

try:
    from pyarrow._hdfs import HadoopFileSystem  # noqa
except ImportError:
    _not_imported.append("HadoopFileSystem")

try:
    from pyarrow._s3fs import (  # noqa
        S3FileSystem, S3LogLevel, initialize_s3, finalize_s3)
except ImportError:
    _not_imported.append("S3FileSystem")
else:
    initialize_s3()


def __getattr__(name):
    if name in _not_imported:
        raise ImportError(
            "The pyarrow installation is not built with support for "
            "'{0}'".format(name)
        )

    raise AttributeError(
        "module 'pyarrow.fs' has no attribute '{0}'".format(name)
    )


def _filesystem_from_str(uri):
    # instantiate the file system from an uri, if the uri has a path
    # component then it will be treated as a path prefix
    filesystem, prefix = FileSystem.from_uri(uri)
    prefix = filesystem.normalize_path(prefix)
    if prefix:
        # validate that the prefix is pointing to a directory
        prefix_info = filesystem.get_file_info([prefix])[0]
        if prefix_info.type != FileType.Directory:
            raise ValueError(
                "The path component of the filesystem URI must point to a "
                "directory but it has a type: `{}`. The path component "
                "is `{}` and the given filesystem URI is `{}`".format(
                    prefix_info.type.name, prefix_info.path, uri
                )
            )
        filesystem = SubTreeFileSystem(prefix, filesystem)
    return filesystem


def _ensure_filesystem(
    filesystem, use_mmap=False, allow_legacy_filesystem=False
):
    if isinstance(filesystem, FileSystem):
        return filesystem
    elif isinstance(filesystem, str):
        if use_mmap:
            raise ValueError(
                "Specifying to use memory mapping not supported for "
                "filesytem specified as an URI string"
            )
        return _filesystem_from_str(filesystem)

    # handle fsspec-compatible filesystems
    try:
        import fsspec
    except ImportError:
        pass
    else:
        if isinstance(filesystem, fsspec.AbstractFileSystem):
            if type(filesystem).__name__ == 'LocalFileSystem':
                # In case its a simple LocalFileSystem, use native arrow one
                return LocalFileSystem(use_mmap=use_mmap)
            return PyFileSystem(FSSpecHandler(filesystem))

    # map old filesystems to new ones
    import pyarrow.filesystem as legacyfs

    if isinstance(filesystem, legacyfs.LocalFileSystem):
        return LocalFileSystem(use_mmap=use_mmap)
    # TODO handle HDFS?
    if allow_legacy_filesystem and isinstance(filesystem, legacyfs.FileSystem):
        return filesystem

    raise TypeError(
        "Unrecognized filesystem: {}. `filesystem` argument must be a "
        "FileSystem instance or a valid file system URI'".format(
            type(filesystem))
    )


def _resolve_filesystem_and_path(
    path, filesystem=None, allow_legacy_filesystem=False
):
    """
    Return filesystem/path from path which could be an URI or a plain
    filesystem path.
    """
    if not _is_path_like(path):
        if filesystem is not None:
            raise ValueError(
                "'filesystem' passed but the specified path is file-like, so"
                " there is nothing to open with 'filesystem'."
            )
        return filesystem, path

    if filesystem is not None:
        filesystem = _ensure_filesystem(
            filesystem, allow_legacy_filesystem=allow_legacy_filesystem
        )
        if isinstance(filesystem, LocalFileSystem):
            path = _stringify_path(path)
        elif not isinstance(path, str):
            raise TypeError(
                "Expected string path; path-like objects are only allowed "
                "with a local filesystem"
            )
        if not allow_legacy_filesystem:
            path = filesystem.normalize_path(path)
        return filesystem, path

    path = _stringify_path(path)

    # if filesystem is not given, try to automatically determine one
    # first check if the file exists as a local (relative) file path
    # if not then try to parse the path as an URI
    filesystem = LocalFileSystem()
    try:
        file_info = filesystem.get_file_info(path)
    except OSError:
        file_info = None
        exists_locally = False
    else:
        exists_locally = (file_info.type != FileType.NotFound)

    # if the file or directory doesn't exists locally, then assume that
    # the path is an URI describing the file system as well
    if not exists_locally:
        try:
            filesystem, path = FileSystem.from_uri(path)
        except ValueError as e:
            # neither an URI nor a locally existing path, so assume that
            # local path was given and propagate a nicer file not found error
            # instead of a more confusing scheme parsing error
            if "empty scheme" not in str(e):
                raise
    else:
        path = filesystem.normalize_path(path)

    return filesystem, path


def copy_files(source, destination,
               source_filesystem=None, destination_filesystem=None,
               *, chunk_size=1024*1024, use_threads=True):
    """
    Copy files between FileSystems.

    This functions allows you to recursively copy directories of files from
    one file system to another, such as from S3 to your local machine.

    Parameters
    ----------
    source : string
        Source file path or URI to a single file or directory.
        If a directory, files will be copied recursively from this path.
    destination : string
        Destination file path or URI. If `source` is a file, `destination`
        is also interpreted as the destination file (not directory).
        Directories will be created as necessary.
    source_filesystem : FileSystem, optional
        Source filesystem, needs to be specified if `source` is not a URI,
        otherwise inferred.
    destination_filesystem : FileSystem, optional
        Destination filesystem, needs to be specified if `destination` is not
        a URI, otherwise inferred.
    chunk_size : int, default 1MB
        The maximum size of block to read before flushing to the
        destination file. A larger chunk_size will use more memory while
        copying but may help accommodate high latency FileSystems.
    use_threads : bool, default True
        Whether to use multiple threads to accelerate copying.

    Examples
    --------
    Copy an S3 bucket's files to a local directory:

    >>> copy_files("s3://your-bucket-name", "local-directory")

    Using a FileSystem object:

    >>> copy_files("your-bucket-name", "local-directory",
    ...            source_filesystem=S3FileSystem(...))

    """
    source_fs, source_path = _resolve_filesystem_and_path(
        source, source_filesystem
    )
    destination_fs, destination_path = _resolve_filesystem_and_path(
        destination, destination_filesystem
    )

    file_info = source_fs.get_file_info(source_path)
    if file_info.type == FileType.Directory:
        source_sel = FileSelector(source_path, recursive=True)
        _copy_files_selector(source_fs, source_sel,
                             destination_fs, destination_path,
                             chunk_size, use_threads)
    else:
        _copy_files(source_fs, source_path,
                    destination_fs, destination_path,
                    chunk_size, use_threads)


[docs]class FSSpecHandler(FileSystemHandler): """ Handler for fsspec-based Python filesystems. https://filesystem-spec.readthedocs.io/en/latest/index.html Parameters ---------- fs : The FSSpec-compliant filesystem instance. Examples -------- >>> PyFileSystem(FSSpecHandler(fsspec_fs)) """
[docs] def __init__(self, fs): self.fs = fs
def __eq__(self, other): if isinstance(other, FSSpecHandler): return self.fs == other.fs return NotImplemented def __ne__(self, other): if isinstance(other, FSSpecHandler): return self.fs != other.fs return NotImplemented
[docs] def get_type_name(self): protocol = self.fs.protocol if isinstance(protocol, list): protocol = protocol[0] return "fsspec+{0}".format(protocol)
[docs] def normalize_path(self, path): return path
@staticmethod def _create_file_info(path, info): size = info["size"] if info["type"] == "file": ftype = FileType.File elif info["type"] == "directory": ftype = FileType.Directory # some fsspec filesystems include a file size for directories size = None else: ftype = FileType.Unknown return FileInfo(path, ftype, size=size, mtime=info.get("mtime", None))
[docs] def get_file_info(self, paths): infos = [] for path in paths: try: info = self.fs.info(path) except FileNotFoundError: infos.append(FileInfo(path, FileType.NotFound)) else: infos.append(self._create_file_info(path, info)) return infos
[docs] def get_file_info_selector(self, selector): if not self.fs.isdir(selector.base_dir): if self.fs.exists(selector.base_dir): raise NotADirectoryError(selector.base_dir) else: if selector.allow_not_found: return [] else: raise FileNotFoundError(selector.base_dir) if selector.recursive: maxdepth = None else: maxdepth = 1 infos = [] selected_files = self.fs.find( selector.base_dir, maxdepth=maxdepth, withdirs=True, detail=True ) for path, info in selected_files.items(): infos.append(self._create_file_info(path, info)) return infos
[docs] def create_dir(self, path, recursive): # mkdir also raises FileNotFoundError when base directory is not found try: self.fs.mkdir(path, create_parents=recursive) except FileExistsError: pass
[docs] def delete_dir(self, path): self.fs.rm(path, recursive=True)
def _delete_dir_contents(self, path): for subpath in self.fs.listdir(path, detail=False): if self.fs.isdir(subpath): self.fs.rm(subpath, recursive=True) elif self.fs.isfile(subpath): self.fs.rm(subpath)
[docs] def delete_dir_contents(self, path): if path.strip("/") == "": raise ValueError( "delete_dir_contents called on path '", path, "'") self._delete_dir_contents(path)
[docs] def delete_root_dir_contents(self): self._delete_dir_contents("/")
[docs] def delete_file(self, path): # fs.rm correctly raises IsADirectoryError when `path` is a directory # instead of a file and `recursive` is not set to True if not self.fs.exists(path): raise FileNotFoundError(path) self.fs.rm(path)
[docs] def move(self, src, dest): self.fs.mv(src, dest, recursive=True)
[docs] def copy_file(self, src, dest): # fs.copy correctly raises IsADirectoryError when `src` is a directory # instead of a file self.fs.copy(src, dest)
# TODO can we read/pass metadata (e.g. Content-Type) in the methods below?
[docs] def open_input_stream(self, path): from pyarrow import PythonFile if not self.fs.isfile(path): raise FileNotFoundError(path) return PythonFile(self.fs.open(path, mode="rb"), mode="r")
[docs] def open_input_file(self, path): from pyarrow import PythonFile if not self.fs.isfile(path): raise FileNotFoundError(path) return PythonFile(self.fs.open(path, mode="rb"), mode="r")
[docs] def open_output_stream(self, path, metadata): from pyarrow import PythonFile return PythonFile(self.fs.open(path, mode="wb"), mode="w")
[docs] def open_append_stream(self, path, metadata): from pyarrow import PythonFile return PythonFile(self.fs.open(path, mode="ab"), mode="w")