bacommon.transfer

Functionality related to transferring files/data.

  1# Released under the MIT License. See LICENSE for details.
  2#
  3"""Functionality related to transferring files/data."""
  4
  5from __future__ import annotations
  6
  7import os
  8from pathlib import Path
  9from dataclasses import dataclass
 10from typing import TYPE_CHECKING, Annotated
 11
 12from efro.dataclassio import ioprepped, IOAttrs
 13
 14if TYPE_CHECKING:
 15    pass
 16
 17
 18@ioprepped
 19@dataclass
 20class DirectoryManifestFile:
 21    """Describes metadata and hashes for a file in a manifest."""
 22
 23    filehash: Annotated[str, IOAttrs('h')]
 24    filesize: Annotated[int, IOAttrs('s')]
 25
 26
 27@ioprepped
 28@dataclass
 29class DirectoryManifest:
 30    """Contains a summary of files in a directory."""
 31
 32    files: Annotated[dict[str, DirectoryManifestFile], IOAttrs('f')]
 33
 34    _empty_hash: str | None = None
 35
 36    @classmethod
 37    def create_from_disk(cls, path: Path) -> DirectoryManifest:
 38        """Create a manifest from a directory on disk."""
 39        import hashlib
 40        from concurrent.futures import ThreadPoolExecutor
 41
 42        pathstr = str(path)
 43        paths: list[str] = []
 44
 45        if path.is_dir():
 46            # Build the full list of relative paths.
 47            for basename, _dirnames, filenames in os.walk(path):
 48                for filename in filenames:
 49                    fullname = os.path.join(basename, filename)
 50                    assert fullname.startswith(pathstr)
 51                    # Make sure we end up with forward slashes no matter
 52                    # what the os.* stuff above here was using.
 53                    paths.append(Path(fullname[len(pathstr) + 1 :]).as_posix())
 54        elif path.exists():
 55            # Just return a single file entry if path is not a dir.
 56            paths.append(path.as_posix())
 57
 58        def _get_file_info(filepath: str) -> tuple[str, DirectoryManifestFile]:
 59            sha = hashlib.sha256()
 60            fullfilepath = os.path.join(pathstr, filepath)
 61            if not os.path.isfile(fullfilepath):
 62                raise RuntimeError(f'File not found: "{fullfilepath}".')
 63            with open(fullfilepath, 'rb') as infile:
 64                filebytes = infile.read()
 65                filesize = len(filebytes)
 66                sha.update(filebytes)
 67            return (
 68                filepath,
 69                DirectoryManifestFile(
 70                    filehash=sha.hexdigest(), filesize=filesize
 71                ),
 72            )
 73
 74        # Now use all procs to hash the files efficiently.
 75        cpus = os.cpu_count()
 76        if cpus is None:
 77            cpus = 4
 78        with ThreadPoolExecutor(max_workers=cpus) as executor:
 79            return cls(files=dict(executor.map(_get_file_info, paths)))
 80
 81    def validate(self) -> None:
 82        """Log any odd data in the manifest; for debugging."""
 83        import logging
 84
 85        for fpath, _fentry in self.files.items():
 86            # We want to be dealing in only forward slashes; make sure
 87            # that's the case (wondering if we'll ever see backslashes
 88            # for escape purposes).
 89            if '\\' in fpath:
 90                logging.exception(
 91                    "Found unusual path in manifest: '%s'.", fpath
 92                )
 93                break  # 1 error is enough for now.
 94
 95    @classmethod
 96    def get_empty_hash(cls) -> str:
 97        """Return the hash for an empty file."""
 98        if cls._empty_hash is None:
 99            import hashlib
100
101            sha = hashlib.sha256()
102            cls._empty_hash = sha.hexdigest()
103        return cls._empty_hash
@ioprepped
@dataclass
class DirectoryManifestFile:
19@ioprepped
20@dataclass
21class DirectoryManifestFile:
22    """Describes metadata and hashes for a file in a manifest."""
23
24    filehash: Annotated[str, IOAttrs('h')]
25    filesize: Annotated[int, IOAttrs('s')]

Describes metadata and hashes for a file in a manifest.

DirectoryManifestFile( filehash: typing.Annotated[str, <efro.dataclassio._base.IOAttrs object at 0x104751e90>], filesize: typing.Annotated[int, <efro.dataclassio._base.IOAttrs object at 0x104734690>])
filehash: typing.Annotated[str, <efro.dataclassio._base.IOAttrs object at 0x1037001d0>]
filesize: typing.Annotated[int, <efro.dataclassio._base.IOAttrs object at 0x1044127d0>]
@ioprepped
@dataclass
class DirectoryManifest:
 28@ioprepped
 29@dataclass
 30class DirectoryManifest:
 31    """Contains a summary of files in a directory."""
 32
 33    files: Annotated[dict[str, DirectoryManifestFile], IOAttrs('f')]
 34
 35    _empty_hash: str | None = None
 36
 37    @classmethod
 38    def create_from_disk(cls, path: Path) -> DirectoryManifest:
 39        """Create a manifest from a directory on disk."""
 40        import hashlib
 41        from concurrent.futures import ThreadPoolExecutor
 42
 43        pathstr = str(path)
 44        paths: list[str] = []
 45
 46        if path.is_dir():
 47            # Build the full list of relative paths.
 48            for basename, _dirnames, filenames in os.walk(path):
 49                for filename in filenames:
 50                    fullname = os.path.join(basename, filename)
 51                    assert fullname.startswith(pathstr)
 52                    # Make sure we end up with forward slashes no matter
 53                    # what the os.* stuff above here was using.
 54                    paths.append(Path(fullname[len(pathstr) + 1 :]).as_posix())
 55        elif path.exists():
 56            # Just return a single file entry if path is not a dir.
 57            paths.append(path.as_posix())
 58
 59        def _get_file_info(filepath: str) -> tuple[str, DirectoryManifestFile]:
 60            sha = hashlib.sha256()
 61            fullfilepath = os.path.join(pathstr, filepath)
 62            if not os.path.isfile(fullfilepath):
 63                raise RuntimeError(f'File not found: "{fullfilepath}".')
 64            with open(fullfilepath, 'rb') as infile:
 65                filebytes = infile.read()
 66                filesize = len(filebytes)
 67                sha.update(filebytes)
 68            return (
 69                filepath,
 70                DirectoryManifestFile(
 71                    filehash=sha.hexdigest(), filesize=filesize
 72                ),
 73            )
 74
 75        # Now use all procs to hash the files efficiently.
 76        cpus = os.cpu_count()
 77        if cpus is None:
 78            cpus = 4
 79        with ThreadPoolExecutor(max_workers=cpus) as executor:
 80            return cls(files=dict(executor.map(_get_file_info, paths)))
 81
 82    def validate(self) -> None:
 83        """Log any odd data in the manifest; for debugging."""
 84        import logging
 85
 86        for fpath, _fentry in self.files.items():
 87            # We want to be dealing in only forward slashes; make sure
 88            # that's the case (wondering if we'll ever see backslashes
 89            # for escape purposes).
 90            if '\\' in fpath:
 91                logging.exception(
 92                    "Found unusual path in manifest: '%s'.", fpath
 93                )
 94                break  # 1 error is enough for now.
 95
 96    @classmethod
 97    def get_empty_hash(cls) -> str:
 98        """Return the hash for an empty file."""
 99        if cls._empty_hash is None:
100            import hashlib
101
102            sha = hashlib.sha256()
103            cls._empty_hash = sha.hexdigest()
104        return cls._empty_hash

Contains a summary of files in a directory.

DirectoryManifest( files: typing.Annotated[dict[str, DirectoryManifestFile], <efro.dataclassio._base.IOAttrs object at 0x104777110>], _empty_hash: str | None = None)
files: typing.Annotated[dict[str, DirectoryManifestFile], <efro.dataclassio._base.IOAttrs object at 0x1046db690>]
@classmethod
def create_from_disk(cls, path: pathlib.Path) -> DirectoryManifest:
37    @classmethod
38    def create_from_disk(cls, path: Path) -> DirectoryManifest:
39        """Create a manifest from a directory on disk."""
40        import hashlib
41        from concurrent.futures import ThreadPoolExecutor
42
43        pathstr = str(path)
44        paths: list[str] = []
45
46        if path.is_dir():
47            # Build the full list of relative paths.
48            for basename, _dirnames, filenames in os.walk(path):
49                for filename in filenames:
50                    fullname = os.path.join(basename, filename)
51                    assert fullname.startswith(pathstr)
52                    # Make sure we end up with forward slashes no matter
53                    # what the os.* stuff above here was using.
54                    paths.append(Path(fullname[len(pathstr) + 1 :]).as_posix())
55        elif path.exists():
56            # Just return a single file entry if path is not a dir.
57            paths.append(path.as_posix())
58
59        def _get_file_info(filepath: str) -> tuple[str, DirectoryManifestFile]:
60            sha = hashlib.sha256()
61            fullfilepath = os.path.join(pathstr, filepath)
62            if not os.path.isfile(fullfilepath):
63                raise RuntimeError(f'File not found: "{fullfilepath}".')
64            with open(fullfilepath, 'rb') as infile:
65                filebytes = infile.read()
66                filesize = len(filebytes)
67                sha.update(filebytes)
68            return (
69                filepath,
70                DirectoryManifestFile(
71                    filehash=sha.hexdigest(), filesize=filesize
72                ),
73            )
74
75        # Now use all procs to hash the files efficiently.
76        cpus = os.cpu_count()
77        if cpus is None:
78            cpus = 4
79        with ThreadPoolExecutor(max_workers=cpus) as executor:
80            return cls(files=dict(executor.map(_get_file_info, paths)))

Create a manifest from a directory on disk.

def validate(self) -> None:
82    def validate(self) -> None:
83        """Log any odd data in the manifest; for debugging."""
84        import logging
85
86        for fpath, _fentry in self.files.items():
87            # We want to be dealing in only forward slashes; make sure
88            # that's the case (wondering if we'll ever see backslashes
89            # for escape purposes).
90            if '\\' in fpath:
91                logging.exception(
92                    "Found unusual path in manifest: '%s'.", fpath
93                )
94                break  # 1 error is enough for now.

Log any odd data in the manifest; for debugging.

@classmethod
def get_empty_hash(cls) -> str:
 96    @classmethod
 97    def get_empty_hash(cls) -> str:
 98        """Return the hash for an empty file."""
 99        if cls._empty_hash is None:
100            import hashlib
101
102            sha = hashlib.sha256()
103            cls._empty_hash = sha.hexdigest()
104        return cls._empty_hash

Return the hash for an empty file.