# Licensed to the Apache Software Foundation (ASF) under one# or more contributor license agreements. See the NOTICE file# distributed with this work for additional information# regarding copyright ownership. The ASF licenses this file# to you under the Apache License, Version 2.0 (the# "License"); you may not use this file except in compliance# with the License. You may obtain a copy of the License at## http://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing,# software distributed under the License is distributed on an# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY# KIND, either express or implied. See the License for the# specific language governing permissions and limitations# under the License."""FileSystem abstraction to interact with various local and remote filesystems."""frompyarrow.utilimport_is_path_like,_stringify_pathfrompyarrow._fsimport(# noqaFileSelector,FileType,FileInfo,FileSystem,LocalFileSystem,SubTreeFileSystem,_MockFileSystem,FileSystemHandler,PyFileSystem,_copy_files,_copy_files_selector,)# For backward compatibility.FileStats=FileInfo_not_imported=[]try:frompyarrow._azurefsimportAzureFileSystem# noqaexceptImportError:_not_imported.append("AzureFileSystem")try:frompyarrow._hdfsimportHadoopFileSystem# noqaexceptImportError:_not_imported.append("HadoopFileSystem")try:frompyarrow._gcsfsimportGcsFileSystem# noqaexceptImportError:_not_imported.append("GcsFileSystem")try:frompyarrow._s3fsimport(# noqaAwsDefaultS3RetryStrategy,AwsStandardS3RetryStrategy,S3FileSystem,S3LogLevel,S3RetryStrategy,ensure_s3_initialized,finalize_s3,ensure_s3_finalized,initialize_s3,resolve_s3_region)exceptImportError:_not_imported.append("S3FileSystem")else:# GH-38364: we don't initialize S3 eagerly as that could lead# to crashes at shutdown even when S3 isn't used.# Instead, S3 is initialized lazily using `ensure_s3_initialized`# in assorted places.importatexitatexit.register(ensure_s3_finalized)def__getattr__(name):ifnamein_not_imported:raiseImportError("The pyarrow installation is not built with support for ""'{0}'".format(name))raiseAttributeError("module 'pyarrow.fs' has no attribute '{0}'".format(name))def_filesystem_from_str(uri):# instantiate the file system from an uri, if the uri has a path# component then it will be treated as a path prefixfilesystem,prefix=FileSystem.from_uri(uri)prefix=filesystem.normalize_path(prefix)ifprefix:# validate that the prefix is pointing to a directoryprefix_info=filesystem.get_file_info([prefix])[0]ifprefix_info.type!=FileType.Directory:raiseValueError("The path component of the filesystem URI must point to a ""directory but it has a type: `{}`. The path component ""is `{}` and the given filesystem URI is `{}`".format(prefix_info.type.name,prefix_info.path,uri))filesystem=SubTreeFileSystem(prefix,filesystem)returnfilesystemdef_ensure_filesystem(filesystem,*,use_mmap=False):ifisinstance(filesystem,FileSystem):returnfilesystemelifisinstance(filesystem,str):ifuse_mmap:raiseValueError("Specifying to use memory mapping not supported for ""filesystem specified as an URI string")return_filesystem_from_str(filesystem)# handle fsspec-compatible filesystemstry:importfsspecexceptImportError:passelse:ifisinstance(filesystem,fsspec.AbstractFileSystem):iftype(filesystem).__name__=='LocalFileSystem':# In case its a simple LocalFileSystem, use native arrow onereturnLocalFileSystem(use_mmap=use_mmap)returnPyFileSystem(FSSpecHandler(filesystem))raiseTypeError("Unrecognized filesystem: {}. `filesystem` argument must be a ""FileSystem instance or a valid file system URI'".format(type(filesystem)))def_resolve_filesystem_and_path(path,filesystem=None,*,memory_map=False):""" Return filesystem/path from path which could be an URI or a plain filesystem path. """ifnot_is_path_like(path):iffilesystemisnotNone:raiseValueError("'filesystem' passed but the specified path is file-like, so"" there is nothing to open with 'filesystem'.")returnfilesystem,pathiffilesystemisnotNone:filesystem=_ensure_filesystem(filesystem,use_mmap=memory_map)ifisinstance(filesystem,LocalFileSystem):path=_stringify_path(path)elifnotisinstance(path,str):raiseTypeError("Expected string path; path-like objects are only allowed ""with a local filesystem")path=filesystem.normalize_path(path)returnfilesystem,pathpath=_stringify_path(path)# if filesystem is not given, try to automatically determine one# first check if the file exists as a local (relative) file path# if not then try to parse the path as an URIfilesystem=LocalFileSystem(use_mmap=memory_map)try:file_info=filesystem.get_file_info(path)exceptValueError:# ValueError means path is likely an URIfile_info=Noneexists_locally=Falseelse:exists_locally=(file_info.type!=FileType.NotFound)# if the file or directory doesn't exists locally, then assume that# the path is an URI describing the file system as wellifnotexists_locally:try:filesystem,path=FileSystem.from_uri(path)exceptValueErrorase:# neither an URI nor a locally existing path, so assume that# local path was given and propagate a nicer file not found error# instead of a more confusing scheme parsing errorif"empty scheme"notinstr(e) \
and"Cannot parse URI"notinstr(e):raiseelse:path=filesystem.normalize_path(path)returnfilesystem,path
[docs]defcopy_files(source,destination,source_filesystem=None,destination_filesystem=None,*,chunk_size=1024*1024,use_threads=True):""" Copy files between FileSystems. This functions allows you to recursively copy directories of files from one file system to another, such as from S3 to your local machine. Parameters ---------- source : string Source file path or URI to a single file or directory. If a directory, files will be copied recursively from this path. destination : string Destination file path or URI. If `source` is a file, `destination` is also interpreted as the destination file (not directory). Directories will be created as necessary. source_filesystem : FileSystem, optional Source filesystem, needs to be specified if `source` is not a URI, otherwise inferred. destination_filesystem : FileSystem, optional Destination filesystem, needs to be specified if `destination` is not a URI, otherwise inferred. chunk_size : int, default 1MB The maximum size of block to read before flushing to the destination file. A larger chunk_size will use more memory while copying but may help accommodate high latency FileSystems. use_threads : bool, default True Whether to use multiple threads to accelerate copying. Examples -------- Inspect an S3 bucket's files: >>> s3, path = fs.FileSystem.from_uri( ... "s3://registry.opendata.aws/roda/ndjson/") >>> selector = fs.FileSelector(path) >>> s3.get_file_info(selector) [<FileInfo for 'registry.opendata.aws/roda/ndjson/index.ndjson':...] Copy one file from S3 bucket to a local directory: >>> fs.copy_files("s3://registry.opendata.aws/roda/ndjson/index.ndjson", ... "file:///{}/index_copy.ndjson".format(local_path)) >>> fs.LocalFileSystem().get_file_info(str(local_path)+ ... '/index_copy.ndjson') <FileInfo for '.../index_copy.ndjson': type=FileType.File, size=...> Copy file using a FileSystem object: >>> fs.copy_files("registry.opendata.aws/roda/ndjson/index.ndjson", ... "file:///{}/index_copy.ndjson".format(local_path), ... source_filesystem=fs.S3FileSystem()) """source_fs,source_path=_resolve_filesystem_and_path(source,source_filesystem)destination_fs,destination_path=_resolve_filesystem_and_path(destination,destination_filesystem)file_info=source_fs.get_file_info(source_path)iffile_info.type==FileType.Directory:source_sel=FileSelector(source_path,recursive=True)_copy_files_selector(source_fs,source_sel,destination_fs,destination_path,chunk_size,use_threads)else:_copy_files(source_fs,source_path,destination_fs,destination_path,chunk_size,use_threads)
@staticmethoddef_create_file_info(path,info):size=info["size"]ifinfo["type"]=="file":ftype=FileType.Fileelifinfo["type"]=="directory":ftype=FileType.Directory# some fsspec filesystems include a file size for directoriessize=Noneelse:ftype=FileType.UnknownreturnFileInfo(path,ftype,size=size,mtime=info.get("mtime",None))
[docs]defget_file_info_selector(self,selector):ifnotself.fs.isdir(selector.base_dir):ifself.fs.exists(selector.base_dir):raiseNotADirectoryError(selector.base_dir)else:ifselector.allow_not_found:return[]else:raiseFileNotFoundError(selector.base_dir)ifselector.recursive:maxdepth=Noneelse:maxdepth=1infos=[]selected_files=self.fs.find(selector.base_dir,maxdepth=maxdepth,withdirs=True,detail=True)forpath,infoinselected_files.items():_path=path.strip("/")base_dir=selector.base_dir.strip("/")# Need to exclude base directory from selected files if present# (fsspec filesystems, see GH-37555)if_path!=base_dir:infos.append(self._create_file_info(path,info))returninfos
[docs]defcreate_dir(self,path,recursive):# mkdir also raises FileNotFoundError when base directory is not foundtry:self.fs.mkdir(path,create_parents=recursive)exceptFileExistsError:pass
[docs]defdelete_dir_contents(self,path,missing_dir_ok):ifpath.strip("/")=="":raiseValueError("delete_dir_contents called on path '",path,"'")self._delete_dir_contents(path,missing_dir_ok)
[docs]defdelete_file(self,path):# fs.rm correctly raises IsADirectoryError when `path` is a directory# instead of a file and `recursive` is not set to Trueifnotself.fs.exists(path):raiseFileNotFoundError(path)self.fs.rm(path)