# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
import os
import inspect
import posixpath
import sys
import urllib.parse
import warnings
from os.path import join as pjoin
import pyarrow as pa
from pyarrow.util import implements, _stringify_path, _is_path_like, _DEPR_MSG
_FS_DEPR_MSG = _DEPR_MSG.format(
"filesystem.LocalFileSystem", "2.0.0", "fs.LocalFileSystem"
)
class FileSystem:
"""
Abstract filesystem interface.
"""
def cat(self, path):
"""
Return contents of file as a bytes object.
Parameters
----------
path : str
File path to read content from.
Returns
-------
contents : bytes
"""
with self.open(path, 'rb') as f:
return f.read()
def ls(self, path):
"""
Return list of file paths.
Parameters
----------
path : str
Directory to list contents from.
"""
raise NotImplementedError
def delete(self, path, recursive=False):
"""
Delete the indicated file or directory.
Parameters
----------
path : str
Path to delete.
recursive : bool, default False
If True, also delete child paths for directories.
"""
raise NotImplementedError
def disk_usage(self, path):
"""
Compute bytes used by all contents under indicated path in file tree.
Parameters
----------
path : str
Can be a file path or directory.
Returns
-------
usage : int
"""
path = _stringify_path(path)
path_info = self.stat(path)
if path_info['kind'] == 'file':
return path_info['size']
total = 0
for root, directories, files in self.walk(path):
for child_path in files:
abspath = self._path_join(root, child_path)
total += self.stat(abspath)['size']
return total
def _path_join(self, *args):
return self.pathsep.join(args)
def stat(self, path):
"""
Information about a filesystem entry.
Returns
-------
stat : dict
"""
raise NotImplementedError('FileSystem.stat')
def rm(self, path, recursive=False):
"""
Alias for FileSystem.delete.
"""
return self.delete(path, recursive=recursive)
def mv(self, path, new_path):
"""
Alias for FileSystem.rename.
"""
return self.rename(path, new_path)
def rename(self, path, new_path):
"""
Rename file, like UNIX mv command.
Parameters
----------
path : str
Path to alter.
new_path : str
Path to move to.
"""
raise NotImplementedError('FileSystem.rename')
def mkdir(self, path, create_parents=True):
"""
Create a directory.
Parameters
----------
path : str
Path to the directory.
create_parents : bool, default True
If the parent directories don't exists create them as well.
"""
raise NotImplementedError
def exists(self, path):
"""
Return True if path exists.
Parameters
----------
path : str
Path to check.
"""
raise NotImplementedError
def isdir(self, path):
"""
Return True if path is a directory.
Parameters
----------
path : str
Path to check.
"""
raise NotImplementedError
def isfile(self, path):
"""
Return True if path is a file.
Parameters
----------
path : str
Path to check.
"""
raise NotImplementedError
def _isfilestore(self):
"""
Returns True if this FileSystem is a unix-style file store with
directories.
"""
raise NotImplementedError
def read_parquet(self, path, columns=None, metadata=None, schema=None,
use_threads=True, use_pandas_metadata=False):
"""
Read Parquet data from path in file system. Can read from a single file
or a directory of files.
Parameters
----------
path : str
Single file path or directory
columns : List[str], optional
Subset of columns to read.
metadata : pyarrow.parquet.FileMetaData
Known metadata to validate files against.
schema : pyarrow.parquet.Schema
Known schema to validate files against. Alternative to metadata
argument.
use_threads : bool, default True
Perform multi-threaded column reads.
use_pandas_metadata : bool, default False
If True and file has custom pandas schema metadata, ensure that
index columns are also loaded.
Returns
-------
table : pyarrow.Table
"""
from pyarrow.parquet import ParquetDataset
dataset = ParquetDataset(path, schema=schema, metadata=metadata,
filesystem=self)
return dataset.read(columns=columns, use_threads=use_threads,
use_pandas_metadata=use_pandas_metadata)
def open(self, path, mode='rb'):
"""
Open file for reading or writing.
"""
raise NotImplementedError
@property
def pathsep(self):
return '/'
[docs]class LocalFileSystem(FileSystem):
_instance = None
[docs] def __init__(self):
warnings.warn(_FS_DEPR_MSG, DeprecationWarning, stacklevel=2)
super().__init__()
@classmethod
def _get_instance(cls):
if cls._instance is None:
with warnings.catch_warnings():
warnings.simplefilter("ignore")
cls._instance = LocalFileSystem()
return cls._instance
[docs] @classmethod
def get_instance(cls):
warnings.warn(_FS_DEPR_MSG, DeprecationWarning, stacklevel=2)
return cls._get_instance()
[docs] @implements(FileSystem.ls)
def ls(self, path):
path = _stringify_path(path)
return sorted(pjoin(path, x) for x in os.listdir(path))
[docs] @implements(FileSystem.mkdir)
def mkdir(self, path, create_parents=True):
path = _stringify_path(path)
if create_parents:
os.makedirs(path)
else:
os.mkdir(path)
[docs] @implements(FileSystem.isdir)
def isdir(self, path):
path = _stringify_path(path)
return os.path.isdir(path)
[docs] @implements(FileSystem.isfile)
def isfile(self, path):
path = _stringify_path(path)
return os.path.isfile(path)
@implements(FileSystem._isfilestore)
def _isfilestore(self):
return True
[docs] @implements(FileSystem.exists)
def exists(self, path):
path = _stringify_path(path)
return os.path.exists(path)
[docs] @implements(FileSystem.open)
def open(self, path, mode='rb'):
"""
Open file for reading or writing.
"""
path = _stringify_path(path)
return open(path, mode=mode)
@property
def pathsep(self):
return os.path.sep
[docs] def walk(self, path):
"""
Directory tree generator, see os.walk.
"""
path = _stringify_path(path)
return os.walk(path)
class DaskFileSystem(FileSystem):
"""
Wraps s3fs Dask filesystem implementation like s3fs, gcsfs, etc.
"""
def __init__(self, fs):
self.fs = fs
@implements(FileSystem.isdir)
def isdir(self, path):
raise NotImplementedError("Unsupported file system API")
@implements(FileSystem.isfile)
def isfile(self, path):
raise NotImplementedError("Unsupported file system API")
@implements(FileSystem._isfilestore)
def _isfilestore(self):
"""
Object Stores like S3 and GCSFS are based on key lookups, not true
file-paths.
"""
return False
@implements(FileSystem.delete)
def delete(self, path, recursive=False):
path = _stringify_path(path)
return self.fs.rm(path, recursive=recursive)
@implements(FileSystem.exists)
def exists(self, path):
path = _stringify_path(path)
return self.fs.exists(path)
@implements(FileSystem.mkdir)
def mkdir(self, path, create_parents=True):
path = _stringify_path(path)
if create_parents:
return self.fs.mkdirs(path)
else:
return self.fs.mkdir(path)
@implements(FileSystem.open)
def open(self, path, mode='rb'):
"""
Open file for reading or writing.
"""
path = _stringify_path(path)
return self.fs.open(path, mode=mode)
def ls(self, path, detail=False):
path = _stringify_path(path)
return self.fs.ls(path, detail=detail)
def walk(self, path):
"""
Directory tree generator, like os.walk.
"""
path = _stringify_path(path)
return self.fs.walk(path)
class S3FSWrapper(DaskFileSystem):
@implements(FileSystem.isdir)
def isdir(self, path):
path = _sanitize_s3(_stringify_path(path))
try:
contents = self.fs.ls(path)
if len(contents) == 1 and contents[0] == path:
return False
else:
return True
except OSError:
return False
@implements(FileSystem.isfile)
def isfile(self, path):
path = _sanitize_s3(_stringify_path(path))
try:
contents = self.fs.ls(path)
return len(contents) == 1 and contents[0] == path
except OSError:
return False
def walk(self, path, refresh=False):
"""
Directory tree generator, like os.walk.
Generator version of what is in s3fs, which yields a flattened list of
files.
"""
path = _sanitize_s3(_stringify_path(path))
directories = set()
files = set()
for key in list(self.fs._ls(path, refresh=refresh)):
path = key['Key']
if key['StorageClass'] == 'DIRECTORY':
directories.add(path)
elif key['StorageClass'] == 'BUCKET':
pass
else:
files.add(path)
# s3fs creates duplicate 'DIRECTORY' entries
files = sorted([posixpath.split(f)[1] for f in files
if f not in directories])
directories = sorted([posixpath.split(x)[1]
for x in directories])
yield path, directories, files
for directory in directories:
yield from self.walk(directory, refresh=refresh)
def _sanitize_s3(path):
if path.startswith('s3://'):
return path.replace('s3://', '')
else:
return path
def _ensure_filesystem(fs):
fs_type = type(fs)
# If the arrow filesystem was subclassed, assume it supports the full
# interface and return it
if not issubclass(fs_type, FileSystem):
for mro in inspect.getmro(fs_type):
if mro.__name__ == 'S3FileSystem':
return S3FSWrapper(fs)
# In case its a simple LocalFileSystem (e.g. dask) use native arrow
# FS
elif mro.__name__ == 'LocalFileSystem':
return LocalFileSystem._get_instance()
if "fsspec" in sys.modules:
fsspec = sys.modules["fsspec"]
if isinstance(fs, fsspec.AbstractFileSystem):
# for recent fsspec versions that stop inheriting from
# pyarrow.filesystem.FileSystem, still allow fsspec
# filesystems (which should be compatible with our legacy fs)
return fs
raise OSError('Unrecognized filesystem: {}'.format(fs_type))
else:
return fs
def resolve_filesystem_and_path(where, filesystem=None):
"""
Return filesystem from path which could be an HDFS URI, a local URI,
or a plain filesystem path.
"""
if not _is_path_like(where):
if filesystem is not None:
raise ValueError("filesystem passed but where is file-like, so"
" there is nothing to open with filesystem.")
return filesystem, where
if filesystem is not None:
filesystem = _ensure_filesystem(filesystem)
if isinstance(filesystem, LocalFileSystem):
path = _stringify_path(where)
elif not isinstance(where, str):
raise TypeError(
"Expected string path; path-like objects are only allowed "
"with a local filesystem"
)
else:
path = where
return filesystem, path
path = _stringify_path(where)
parsed_uri = urllib.parse.urlparse(path)
if parsed_uri.scheme == 'hdfs' or parsed_uri.scheme == 'viewfs':
# Input is hdfs URI such as hdfs://host:port/myfile.parquet
netloc_split = parsed_uri.netloc.split(':')
host = netloc_split[0]
if host == '':
host = 'default'
else:
host = parsed_uri.scheme + "://" + host
port = 0
if len(netloc_split) == 2 and netloc_split[1].isnumeric():
port = int(netloc_split[1])
fs = pa.hdfs._connect(host=host, port=port)
fs_path = parsed_uri.path
elif parsed_uri.scheme == 'file':
# Input is local URI such as file:///home/user/myfile.parquet
fs = LocalFileSystem._get_instance()
fs_path = parsed_uri.path
else:
# Input is local path such as /home/user/myfile.parquet
fs = LocalFileSystem._get_instance()
fs_path = path
return fs, fs_path