bacommon.transfer

Functionality related to transferring files/data.

  1# Released under the MIT License. See LICENSE for details.
  2#
  3"""Functionality related to transferring files/data."""
  4
  5from __future__ import annotations
  6
  7import os
  8from pathlib import Path
  9from dataclasses import dataclass
 10from typing import TYPE_CHECKING, Annotated
 11
 12from efro.dataclassio import ioprepped, IOAttrs
 13
 14if TYPE_CHECKING:
 15    pass
 16
 17
 18@ioprepped
 19@dataclass
 20class DirectoryManifestFile:
 21    """Describes a file in a manifest."""
 22
 23    hash_sha256: Annotated[str, IOAttrs('h')]
 24    size: Annotated[int, IOAttrs('s')]
 25
 26
 27@ioprepped
 28@dataclass
 29class DirectoryManifest:
 30    """Contains a summary of files in a directory."""
 31
 32    files: Annotated[dict[str, DirectoryManifestFile], IOAttrs('f')]
 33
 34    # Soft-default added April 2024; can remove eventually once this
 35    # attr is widespread in client.
 36    exists: Annotated[bool, IOAttrs('e', soft_default=True)]
 37
 38    @classmethod
 39    def create_from_disk(cls, path: Path) -> DirectoryManifest:
 40        """Create a manifest from a directory on disk."""
 41        import hashlib
 42        from concurrent.futures import ThreadPoolExecutor
 43
 44        pathstr = str(path)
 45        paths: list[str] = []
 46
 47        exists = path.exists()
 48
 49        if path.is_dir():
 50            # Build the full list of relative paths.
 51            for basename, _dirnames, filenames in os.walk(path):
 52                for filename in filenames:
 53                    fullname = os.path.join(basename, filename)
 54                    assert fullname.startswith(pathstr)
 55                    # Make sure we end up with forward slashes no matter
 56                    # what the os.* stuff above here was using.
 57                    paths.append(Path(fullname[len(pathstr) + 1 :]).as_posix())
 58        elif exists:
 59            # Just return a single file entry if path is not a dir.
 60            paths.append(path.as_posix())
 61
 62        def _get_file_info(filepath: str) -> tuple[str, DirectoryManifestFile]:
 63            sha = hashlib.sha256()
 64            fullfilepath = os.path.join(pathstr, filepath)
 65            if not os.path.isfile(fullfilepath):
 66                raise RuntimeError(f'File not found: "{fullfilepath}".')
 67            with open(fullfilepath, 'rb') as infile:
 68                filebytes = infile.read()
 69                filesize = len(filebytes)
 70                sha.update(filebytes)
 71            return (
 72                filepath,
 73                DirectoryManifestFile(
 74                    hash_sha256=sha.hexdigest(), size=filesize
 75                ),
 76            )
 77
 78        # Now use all procs to hash the files efficiently.
 79        cpus = os.cpu_count()
 80        if cpus is None:
 81            cpus = 4
 82        with ThreadPoolExecutor(max_workers=cpus) as executor:
 83            return cls(
 84                files=dict(executor.map(_get_file_info, paths)), exists=exists
 85            )
 86
 87    def validate(self) -> None:
 88        """Log any odd data in the manifest; for debugging."""
 89        import logging
 90
 91        for fpath, _fentry in self.files.items():
 92            # We want to be dealing in only forward slashes; make sure
 93            # that's the case (wondering if we'll ever see backslashes
 94            # for escape purposes).
 95            if '\\' in fpath:
 96                logging.exception(
 97                    "Found unusual path in manifest: '%s'.", fpath
 98                )
 99                break  # 1 error is enough for now.
100
101    # @classmethod
102    # def get_empty_hash(cls) -> str:
103    #     """Return the hash for an empty file."""
104    #     if cls._empty_hash is None:
105    #         import hashlib
106
107    #         sha = hashlib.sha256()
108    #         cls._empty_hash = sha.hexdigest()
109    #     return cls._empty_hash
@ioprepped
@dataclass
class DirectoryManifestFile:
19@ioprepped
20@dataclass
21class DirectoryManifestFile:
22    """Describes a file in a manifest."""
23
24    hash_sha256: Annotated[str, IOAttrs('h')]
25    size: Annotated[int, IOAttrs('s')]

Describes a file in a manifest.

DirectoryManifestFile( hash_sha256: Annotated[str, <efro.dataclassio._base.IOAttrs object>], size: Annotated[int, <efro.dataclassio._base.IOAttrs object>])
hash_sha256: Annotated[str, <efro.dataclassio._base.IOAttrs object at 0x108323da0>]
size: Annotated[int, <efro.dataclassio._base.IOAttrs object at 0x108323fe0>]
@ioprepped
@dataclass
class DirectoryManifest:
 28@ioprepped
 29@dataclass
 30class DirectoryManifest:
 31    """Contains a summary of files in a directory."""
 32
 33    files: Annotated[dict[str, DirectoryManifestFile], IOAttrs('f')]
 34
 35    # Soft-default added April 2024; can remove eventually once this
 36    # attr is widespread in client.
 37    exists: Annotated[bool, IOAttrs('e', soft_default=True)]
 38
 39    @classmethod
 40    def create_from_disk(cls, path: Path) -> DirectoryManifest:
 41        """Create a manifest from a directory on disk."""
 42        import hashlib
 43        from concurrent.futures import ThreadPoolExecutor
 44
 45        pathstr = str(path)
 46        paths: list[str] = []
 47
 48        exists = path.exists()
 49
 50        if path.is_dir():
 51            # Build the full list of relative paths.
 52            for basename, _dirnames, filenames in os.walk(path):
 53                for filename in filenames:
 54                    fullname = os.path.join(basename, filename)
 55                    assert fullname.startswith(pathstr)
 56                    # Make sure we end up with forward slashes no matter
 57                    # what the os.* stuff above here was using.
 58                    paths.append(Path(fullname[len(pathstr) + 1 :]).as_posix())
 59        elif exists:
 60            # Just return a single file entry if path is not a dir.
 61            paths.append(path.as_posix())
 62
 63        def _get_file_info(filepath: str) -> tuple[str, DirectoryManifestFile]:
 64            sha = hashlib.sha256()
 65            fullfilepath = os.path.join(pathstr, filepath)
 66            if not os.path.isfile(fullfilepath):
 67                raise RuntimeError(f'File not found: "{fullfilepath}".')
 68            with open(fullfilepath, 'rb') as infile:
 69                filebytes = infile.read()
 70                filesize = len(filebytes)
 71                sha.update(filebytes)
 72            return (
 73                filepath,
 74                DirectoryManifestFile(
 75                    hash_sha256=sha.hexdigest(), size=filesize
 76                ),
 77            )
 78
 79        # Now use all procs to hash the files efficiently.
 80        cpus = os.cpu_count()
 81        if cpus is None:
 82            cpus = 4
 83        with ThreadPoolExecutor(max_workers=cpus) as executor:
 84            return cls(
 85                files=dict(executor.map(_get_file_info, paths)), exists=exists
 86            )
 87
 88    def validate(self) -> None:
 89        """Log any odd data in the manifest; for debugging."""
 90        import logging
 91
 92        for fpath, _fentry in self.files.items():
 93            # We want to be dealing in only forward slashes; make sure
 94            # that's the case (wondering if we'll ever see backslashes
 95            # for escape purposes).
 96            if '\\' in fpath:
 97                logging.exception(
 98                    "Found unusual path in manifest: '%s'.", fpath
 99                )
100                break  # 1 error is enough for now.
101
102    # @classmethod
103    # def get_empty_hash(cls) -> str:
104    #     """Return the hash for an empty file."""
105    #     if cls._empty_hash is None:
106    #         import hashlib
107
108    #         sha = hashlib.sha256()
109    #         cls._empty_hash = sha.hexdigest()
110    #     return cls._empty_hash

Contains a summary of files in a directory.

DirectoryManifest( files: Annotated[dict[str, DirectoryManifestFile], <efro.dataclassio._base.IOAttrs object>], exists: Annotated[bool, <efro.dataclassio._base.IOAttrs object>])
files: Annotated[dict[str, DirectoryManifestFile], <efro.dataclassio._base.IOAttrs object at 0x108346720>]
exists: Annotated[bool, <efro.dataclassio._base.IOAttrs object at 0x108344c50>]
@classmethod
def create_from_disk(cls, path: pathlib.Path) -> DirectoryManifest:
39    @classmethod
40    def create_from_disk(cls, path: Path) -> DirectoryManifest:
41        """Create a manifest from a directory on disk."""
42        import hashlib
43        from concurrent.futures import ThreadPoolExecutor
44
45        pathstr = str(path)
46        paths: list[str] = []
47
48        exists = path.exists()
49
50        if path.is_dir():
51            # Build the full list of relative paths.
52            for basename, _dirnames, filenames in os.walk(path):
53                for filename in filenames:
54                    fullname = os.path.join(basename, filename)
55                    assert fullname.startswith(pathstr)
56                    # Make sure we end up with forward slashes no matter
57                    # what the os.* stuff above here was using.
58                    paths.append(Path(fullname[len(pathstr) + 1 :]).as_posix())
59        elif exists:
60            # Just return a single file entry if path is not a dir.
61            paths.append(path.as_posix())
62
63        def _get_file_info(filepath: str) -> tuple[str, DirectoryManifestFile]:
64            sha = hashlib.sha256()
65            fullfilepath = os.path.join(pathstr, filepath)
66            if not os.path.isfile(fullfilepath):
67                raise RuntimeError(f'File not found: "{fullfilepath}".')
68            with open(fullfilepath, 'rb') as infile:
69                filebytes = infile.read()
70                filesize = len(filebytes)
71                sha.update(filebytes)
72            return (
73                filepath,
74                DirectoryManifestFile(
75                    hash_sha256=sha.hexdigest(), size=filesize
76                ),
77            )
78
79        # Now use all procs to hash the files efficiently.
80        cpus = os.cpu_count()
81        if cpus is None:
82            cpus = 4
83        with ThreadPoolExecutor(max_workers=cpus) as executor:
84            return cls(
85                files=dict(executor.map(_get_file_info, paths)), exists=exists
86            )

Create a manifest from a directory on disk.

def validate(self) -> None:
 88    def validate(self) -> None:
 89        """Log any odd data in the manifest; for debugging."""
 90        import logging
 91
 92        for fpath, _fentry in self.files.items():
 93            # We want to be dealing in only forward slashes; make sure
 94            # that's the case (wondering if we'll ever see backslashes
 95            # for escape purposes).
 96            if '\\' in fpath:
 97                logging.exception(
 98                    "Found unusual path in manifest: '%s'.", fpath
 99                )
100                break  # 1 error is enough for now.

Log any odd data in the manifest; for debugging.