bacommon.transfer
Functionality related to transferring files/data.
1# Released under the MIT License. See LICENSE for details. 2# 3"""Functionality related to transferring files/data.""" 4 5from __future__ import annotations 6 7import os 8from pathlib import Path 9from dataclasses import dataclass 10from typing import TYPE_CHECKING, Annotated 11 12from efro.dataclassio import ioprepped, IOAttrs 13 14if TYPE_CHECKING: 15 pass 16 17 18@ioprepped 19@dataclass 20class DirectoryManifestFile: 21 """Describes metadata and hashes for a file in a manifest.""" 22 23 filehash: Annotated[str, IOAttrs('h')] 24 filesize: Annotated[int, IOAttrs('s')] 25 26 27@ioprepped 28@dataclass 29class DirectoryManifest: 30 """Contains a summary of files in a directory.""" 31 32 files: Annotated[dict[str, DirectoryManifestFile], IOAttrs('f')] 33 34 _empty_hash: str | None = None 35 36 @classmethod 37 def create_from_disk(cls, path: Path) -> DirectoryManifest: 38 """Create a manifest from a directory on disk.""" 39 import hashlib 40 from concurrent.futures import ThreadPoolExecutor 41 42 pathstr = str(path) 43 paths: list[str] = [] 44 45 if path.is_dir(): 46 # Build the full list of relative paths. 47 for basename, _dirnames, filenames in os.walk(path): 48 for filename in filenames: 49 fullname = os.path.join(basename, filename) 50 assert fullname.startswith(pathstr) 51 # Make sure we end up with forward slashes no matter 52 # what the os.* stuff above here was using. 53 paths.append(Path(fullname[len(pathstr) + 1 :]).as_posix()) 54 elif path.exists(): 55 # Just return a single file entry if path is not a dir. 56 paths.append(path.as_posix()) 57 58 def _get_file_info(filepath: str) -> tuple[str, DirectoryManifestFile]: 59 sha = hashlib.sha256() 60 fullfilepath = os.path.join(pathstr, filepath) 61 if not os.path.isfile(fullfilepath): 62 raise RuntimeError(f'File not found: "{fullfilepath}".') 63 with open(fullfilepath, 'rb') as infile: 64 filebytes = infile.read() 65 filesize = len(filebytes) 66 sha.update(filebytes) 67 return ( 68 filepath, 69 DirectoryManifestFile( 70 filehash=sha.hexdigest(), filesize=filesize 71 ), 72 ) 73 74 # Now use all procs to hash the files efficiently. 75 cpus = os.cpu_count() 76 if cpus is None: 77 cpus = 4 78 with ThreadPoolExecutor(max_workers=cpus) as executor: 79 return cls(files=dict(executor.map(_get_file_info, paths))) 80 81 def validate(self) -> None: 82 """Log any odd data in the manifest; for debugging.""" 83 import logging 84 85 for fpath, _fentry in self.files.items(): 86 # We want to be dealing in only forward slashes; make sure 87 # that's the case (wondering if we'll ever see backslashes 88 # for escape purposes). 89 if '\\' in fpath: 90 logging.exception( 91 "Found unusual path in manifest: '%s'.", fpath 92 ) 93 break # 1 error is enough for now. 94 95 @classmethod 96 def get_empty_hash(cls) -> str: 97 """Return the hash for an empty file.""" 98 if cls._empty_hash is None: 99 import hashlib 100 101 sha = hashlib.sha256() 102 cls._empty_hash = sha.hexdigest() 103 return cls._empty_hash
@ioprepped
@dataclass
class
DirectoryManifestFile:
19@ioprepped 20@dataclass 21class DirectoryManifestFile: 22 """Describes metadata and hashes for a file in a manifest.""" 23 24 filehash: Annotated[str, IOAttrs('h')] 25 filesize: Annotated[int, IOAttrs('s')]
Describes metadata and hashes for a file in a manifest.
@ioprepped
@dataclass
class
DirectoryManifest:
28@ioprepped 29@dataclass 30class DirectoryManifest: 31 """Contains a summary of files in a directory.""" 32 33 files: Annotated[dict[str, DirectoryManifestFile], IOAttrs('f')] 34 35 _empty_hash: str | None = None 36 37 @classmethod 38 def create_from_disk(cls, path: Path) -> DirectoryManifest: 39 """Create a manifest from a directory on disk.""" 40 import hashlib 41 from concurrent.futures import ThreadPoolExecutor 42 43 pathstr = str(path) 44 paths: list[str] = [] 45 46 if path.is_dir(): 47 # Build the full list of relative paths. 48 for basename, _dirnames, filenames in os.walk(path): 49 for filename in filenames: 50 fullname = os.path.join(basename, filename) 51 assert fullname.startswith(pathstr) 52 # Make sure we end up with forward slashes no matter 53 # what the os.* stuff above here was using. 54 paths.append(Path(fullname[len(pathstr) + 1 :]).as_posix()) 55 elif path.exists(): 56 # Just return a single file entry if path is not a dir. 57 paths.append(path.as_posix()) 58 59 def _get_file_info(filepath: str) -> tuple[str, DirectoryManifestFile]: 60 sha = hashlib.sha256() 61 fullfilepath = os.path.join(pathstr, filepath) 62 if not os.path.isfile(fullfilepath): 63 raise RuntimeError(f'File not found: "{fullfilepath}".') 64 with open(fullfilepath, 'rb') as infile: 65 filebytes = infile.read() 66 filesize = len(filebytes) 67 sha.update(filebytes) 68 return ( 69 filepath, 70 DirectoryManifestFile( 71 filehash=sha.hexdigest(), filesize=filesize 72 ), 73 ) 74 75 # Now use all procs to hash the files efficiently. 76 cpus = os.cpu_count() 77 if cpus is None: 78 cpus = 4 79 with ThreadPoolExecutor(max_workers=cpus) as executor: 80 return cls(files=dict(executor.map(_get_file_info, paths))) 81 82 def validate(self) -> None: 83 """Log any odd data in the manifest; for debugging.""" 84 import logging 85 86 for fpath, _fentry in self.files.items(): 87 # We want to be dealing in only forward slashes; make sure 88 # that's the case (wondering if we'll ever see backslashes 89 # for escape purposes). 90 if '\\' in fpath: 91 logging.exception( 92 "Found unusual path in manifest: '%s'.", fpath 93 ) 94 break # 1 error is enough for now. 95 96 @classmethod 97 def get_empty_hash(cls) -> str: 98 """Return the hash for an empty file.""" 99 if cls._empty_hash is None: 100 import hashlib 101 102 sha = hashlib.sha256() 103 cls._empty_hash = sha.hexdigest() 104 return cls._empty_hash
Contains a summary of files in a directory.
DirectoryManifest( files: typing.Annotated[dict[str, DirectoryManifestFile], <efro.dataclassio._base.IOAttrs object at 0x104777110>], _empty_hash: str | None = None)
files: typing.Annotated[dict[str, DirectoryManifestFile], <efro.dataclassio._base.IOAttrs object at 0x1046db690>]
37 @classmethod 38 def create_from_disk(cls, path: Path) -> DirectoryManifest: 39 """Create a manifest from a directory on disk.""" 40 import hashlib 41 from concurrent.futures import ThreadPoolExecutor 42 43 pathstr = str(path) 44 paths: list[str] = [] 45 46 if path.is_dir(): 47 # Build the full list of relative paths. 48 for basename, _dirnames, filenames in os.walk(path): 49 for filename in filenames: 50 fullname = os.path.join(basename, filename) 51 assert fullname.startswith(pathstr) 52 # Make sure we end up with forward slashes no matter 53 # what the os.* stuff above here was using. 54 paths.append(Path(fullname[len(pathstr) + 1 :]).as_posix()) 55 elif path.exists(): 56 # Just return a single file entry if path is not a dir. 57 paths.append(path.as_posix()) 58 59 def _get_file_info(filepath: str) -> tuple[str, DirectoryManifestFile]: 60 sha = hashlib.sha256() 61 fullfilepath = os.path.join(pathstr, filepath) 62 if not os.path.isfile(fullfilepath): 63 raise RuntimeError(f'File not found: "{fullfilepath}".') 64 with open(fullfilepath, 'rb') as infile: 65 filebytes = infile.read() 66 filesize = len(filebytes) 67 sha.update(filebytes) 68 return ( 69 filepath, 70 DirectoryManifestFile( 71 filehash=sha.hexdigest(), filesize=filesize 72 ), 73 ) 74 75 # Now use all procs to hash the files efficiently. 76 cpus = os.cpu_count() 77 if cpus is None: 78 cpus = 4 79 with ThreadPoolExecutor(max_workers=cpus) as executor: 80 return cls(files=dict(executor.map(_get_file_info, paths)))
Create a manifest from a directory on disk.
def
validate(self) -> None:
82 def validate(self) -> None: 83 """Log any odd data in the manifest; for debugging.""" 84 import logging 85 86 for fpath, _fentry in self.files.items(): 87 # We want to be dealing in only forward slashes; make sure 88 # that's the case (wondering if we'll ever see backslashes 89 # for escape purposes). 90 if '\\' in fpath: 91 logging.exception( 92 "Found unusual path in manifest: '%s'.", fpath 93 ) 94 break # 1 error is enough for now.
Log any odd data in the manifest; for debugging.
@classmethod
def
get_empty_hash(cls) -> str:
96 @classmethod 97 def get_empty_hash(cls) -> str: 98 """Return the hash for an empty file.""" 99 if cls._empty_hash is None: 100 import hashlib 101 102 sha = hashlib.sha256() 103 cls._empty_hash = sha.hexdigest() 104 return cls._empty_hash
Return the hash for an empty file.