efro.message

Functionality for sending and responding to messages. Supports static typing for message types and possible return types.

 1# Released under the MIT License. See LICENSE for details.
 2#
 3"""Functionality for sending and responding to messages.
 4Supports static typing for message types and possible return types.
 5"""
 6
 7from __future__ import annotations
 8
 9from efro.util import set_canonical_module_names
10from efro.message._protocol import MessageProtocol
11from efro.message._sender import MessageSender, BoundMessageSender
12from efro.message._receiver import MessageReceiver, BoundMessageReceiver
13from efro.message._module import create_sender_module, create_receiver_module
14from efro.message._message import (
15    Message,
16    Response,
17    SysResponse,
18    EmptySysResponse,
19    ErrorSysResponse,
20    StringResponse,
21    BoolResponse,
22    UnregisteredMessageIDError,
23)
24
25__all__ = [
26    'Message',
27    'Response',
28    'SysResponse',
29    'EmptySysResponse',
30    'ErrorSysResponse',
31    'StringResponse',
32    'BoolResponse',
33    'MessageProtocol',
34    'MessageSender',
35    'BoundMessageSender',
36    'MessageReceiver',
37    'BoundMessageReceiver',
38    'create_sender_module',
39    'create_receiver_module',
40    'UnregisteredMessageIDError',
41]
42
43# Have these things present themselves cleanly as 'thismodule.SomeClass'
44# instead of 'thismodule._internalmodule.SomeClass'
45set_canonical_module_names(globals())
class Message:
24class Message:
25    """Base class for messages."""
26
27    @classmethod
28    def get_response_types(cls) -> list[type[Response] | None]:
29        """Return all Response types this Message can return when sent.
30
31        The default implementation specifies a None return type.
32        """
33        return [None]

Base class for messages.

@classmethod
def get_response_types(cls) -> list[type[Response] | None]:
27    @classmethod
28    def get_response_types(cls) -> list[type[Response] | None]:
29        """Return all Response types this Message can return when sent.
30
31        The default implementation specifies a None return type.
32        """
33        return [None]

Return all Response types this Message can return when sent.

The default implementation specifies a None return type.

class Response:
36class Response:
37    """Base class for responses to messages."""

Base class for responses to messages.

class SysResponse:
40class SysResponse:
41    """Base class for system-responses to messages.
42
43    These are only sent/handled by the messaging system itself;
44    users of the api never see them.
45    """
46
47    def set_local_exception(self, exc: Exception) -> None:
48        """Attach a local exception to facilitate better logging/handling.
49
50        Be aware that this data does not get serialized and only
51        exists on the local object.
52        """
53        setattr(self, '_sr_local_exception', exc)
54
55    def get_local_exception(self) -> Exception | None:
56        """Fetch a local attached exception."""
57        value = getattr(self, '_sr_local_exception', None)
58        assert isinstance(value, Exception | None)
59        return value

Base class for system-responses to messages.

These are only sent/handled by the messaging system itself; users of the api never see them.

def set_local_exception(self, exc: Exception) -> None:
47    def set_local_exception(self, exc: Exception) -> None:
48        """Attach a local exception to facilitate better logging/handling.
49
50        Be aware that this data does not get serialized and only
51        exists on the local object.
52        """
53        setattr(self, '_sr_local_exception', exc)

Attach a local exception to facilitate better logging/handling.

Be aware that this data does not get serialized and only exists on the local object.

def get_local_exception(self) -> Exception | None:
55    def get_local_exception(self) -> Exception | None:
56        """Fetch a local attached exception."""
57        value = getattr(self, '_sr_local_exception', None)
58        assert isinstance(value, Exception | None)
59        return value

Fetch a local attached exception.

@ioprepped
@dataclass
class EmptySysResponse(efro.message.SysResponse):
86@ioprepped
87@dataclass
88class EmptySysResponse(SysResponse):
89    """The response equivalent of None."""

The response equivalent of None.

@ioprepped
@dataclass
class ErrorSysResponse(efro.message.SysResponse):
65@ioprepped
66@dataclass
67class ErrorSysResponse(SysResponse):
68    """SysResponse saying some error has occurred for the send.
69
70    This generally results in an Exception being raised for the caller.
71    """
72
73    class ErrorType(Enum):
74        """Type of error that occurred while sending a message."""
75
76        REMOTE = 0
77        REMOTE_CLEAN = 1
78        LOCAL = 2
79        COMMUNICATION = 3
80        REMOTE_COMMUNICATION = 4
81
82    error_message: Annotated[str, IOAttrs('m')]
83    error_type: Annotated[ErrorType, IOAttrs('e')] = ErrorType.REMOTE

SysResponse saying some error has occurred for the send.

This generally results in an Exception being raised for the caller.

ErrorSysResponse( error_message: Annotated[str, <efro.dataclassio.IOAttrs object>], error_type: Annotated[ErrorSysResponse.ErrorType, <efro.dataclassio.IOAttrs object>] = <ErrorType.REMOTE: 0>)
error_message: Annotated[str, <efro.dataclassio.IOAttrs object at 0x12ba0adb0>]
error_type: Annotated[ErrorSysResponse.ErrorType, <efro.dataclassio.IOAttrs object at 0x12ba0afc0>] = <ErrorType.REMOTE: 0>
class ErrorSysResponse.ErrorType(enum.Enum):
73    class ErrorType(Enum):
74        """Type of error that occurred while sending a message."""
75
76        REMOTE = 0
77        REMOTE_CLEAN = 1
78        LOCAL = 2
79        COMMUNICATION = 3
80        REMOTE_COMMUNICATION = 4

Type of error that occurred while sending a message.

REMOTE = <ErrorType.REMOTE: 0>
REMOTE_CLEAN = <ErrorType.REMOTE_CLEAN: 1>
LOCAL = <ErrorType.LOCAL: 2>
COMMUNICATION = <ErrorType.COMMUNICATION: 3>
REMOTE_COMMUNICATION = <ErrorType.REMOTE_COMMUNICATION: 4>
@ioprepped
@dataclass
class StringResponse(efro.message.Response):
104@ioprepped
105@dataclass
106class StringResponse(Response):
107    """A simple string value response."""
108
109    value: Annotated[str, IOAttrs('v')]

A simple string value response.

StringResponse(value: Annotated[str, <efro.dataclassio.IOAttrs object>])
value: Annotated[str, <efro.dataclassio.IOAttrs object at 0x12ba946b0>]
@ioprepped
@dataclass
class BoolResponse(efro.message.Response):
 96@ioprepped
 97@dataclass
 98class BoolResponse(Response):
 99    """A simple bool value response."""
100
101    value: Annotated[bool, IOAttrs('v')]

A simple bool value response.

BoolResponse(value: Annotated[bool, <efro.dataclassio.IOAttrs object>])
value: Annotated[bool, <efro.dataclassio.IOAttrs object at 0x12ba94fb0>]
class MessageProtocol:
 33class MessageProtocol:
 34    """Wrangles a set of message types, formats, and response types.
 35    Both endpoints must be using a compatible Protocol for communication
 36    to succeed. To maintain Protocol compatibility between revisions,
 37    all message types must retain the same id, message attr storage
 38    names must not change, newly added attrs must have default values,
 39    etc.
 40    """
 41
 42    def __init__(
 43        self,
 44        message_types: dict[int, type[Message]],
 45        response_types: dict[int, type[Response]],
 46        *,
 47        forward_communication_errors: bool = False,
 48        forward_clean_errors: bool = False,
 49        remote_errors_include_stack_traces: bool = False,
 50        log_errors_on_receiver: bool = True,
 51        log_response_decode_errors: bool = True,
 52    ) -> None:
 53        """Create a protocol with a given configuration.
 54
 55        If 'forward_communication_errors' is True,
 56        efro.error.CommunicationErrors raised on the receiver end will
 57        result in a matching error raised back on the sender. This can
 58        be useful if the receiver will be in some way forwarding
 59        messages along and the sender doesn't need to know where
 60        communication breakdowns occurred; only that they did.
 61
 62        If 'forward_clean_errors' is True, efro.error.CleanError
 63        exceptions raised on the receiver end will result in a matching
 64        CleanError raised back on the sender.
 65
 66        When an exception is not covered by the optional forwarding
 67        mechanisms above, it will come across as efro.error.RemoteError
 68        and the exception will be logged on the receiver end - at least
 69        by default (see details below).
 70
 71        If 'remote_errors_include_stack_traces' is True, stringified
 72        stack traces will be returned with efro.error.RemoteError
 73        exceptions. This is useful for debugging but should only be
 74        enabled in cases where the sender is trusted to see internal
 75        details of the receiver.
 76
 77        By default, when a message-handling exception will result in an
 78        efro.error.RemoteError being returned to the sender, the
 79        exception will be logged on the receiver. This is because the
 80        goal is usually to avoid returning opaque RemoteErrors and to
 81        instead return something meaningful as part of the expected
 82        response type (even if that value itself represents a logical
 83        error state). If 'log_errors_on_receiver' is False, however,
 84        such exceptions will *not* be logged on the receiver. This can
 85        be useful in combination with
 86        'remote_errors_include_stack_traces' and 'forward_clean_errors'
 87        in situations where all error logging/management will be
 88        happening on the sender end. Be aware, however, that in that
 89        case it may be possible for communication errors to prevent some
 90        errors from ever being acknowledged.
 91
 92        If an error occurs when decoding a message response, a
 93        RuntimeError is generated locally. However, in practice it is
 94        likely for such errors to be silently ignored by message
 95        handling code alongside more common communication type errors,
 96        meaning serious protocol breakage could go unnoticed. To avoid
 97        this, a log message is also printed in such cases. Pass
 98        'log_response_decode_errors' as False to disable this logging.
 99        """
100        # pylint: disable=too-many-locals
101        self.message_types_by_id: dict[int, type[Message]] = {}
102        self.message_ids_by_type: dict[type[Message], int] = {}
103        self.response_types_by_id: dict[
104            int, type[Response] | type[SysResponse]
105        ] = {}
106        self.response_ids_by_type: dict[
107            type[Response] | type[SysResponse], int
108        ] = {}
109        for m_id, m_type in message_types.items():
110            # Make sure only valid message types were passed and each
111            # id was assigned only once.
112            assert isinstance(m_id, int)
113            assert m_id >= 0
114            assert is_ioprepped_dataclass(m_type) and issubclass(
115                m_type, Message
116            )
117            assert self.message_types_by_id.get(m_id) is None
118            self.message_types_by_id[m_id] = m_type
119            self.message_ids_by_type[m_type] = m_id
120
121        for r_id, r_type in response_types.items():
122            assert isinstance(r_id, int)
123            assert r_id >= 0
124            assert is_ioprepped_dataclass(r_type) and issubclass(
125                r_type, Response
126            )
127            assert self.response_types_by_id.get(r_id) is None
128            self.response_types_by_id[r_id] = r_type
129            self.response_ids_by_type[r_type] = r_id
130
131        # Register our SysResponse types. These use negative
132        # IDs so as to never overlap with user Response types.
133        def _reg_sys(reg_tp: type[SysResponse], reg_id: int) -> None:
134            assert self.response_types_by_id.get(reg_id) is None
135            self.response_types_by_id[reg_id] = reg_tp
136            self.response_ids_by_type[reg_tp] = reg_id
137
138        _reg_sys(ErrorSysResponse, -1)
139        _reg_sys(EmptySysResponse, -2)
140
141        # Some extra-thorough validation in debug mode.
142        if __debug__:
143            # Make sure all Message types' return types are valid
144            # and have been assigned an ID as well.
145            all_response_types: set[type[Response] | None] = set()
146            for m_id, m_type in message_types.items():
147                m_rtypes = m_type.get_response_types()
148
149                assert isinstance(m_rtypes, list)
150                assert (
151                    m_rtypes
152                ), f'Message type {m_type} specifies no return types.'
153                assert len(set(m_rtypes)) == len(m_rtypes)  # check dups
154                for m_rtype in m_rtypes:
155                    all_response_types.add(m_rtype)
156            for cls in all_response_types:
157                if cls is None:
158                    continue
159                assert is_ioprepped_dataclass(cls)
160                assert issubclass(cls, Response)
161                if cls not in self.response_ids_by_type:
162                    raise ValueError(
163                        f'Possible response type {cls} needs to be included'
164                        f' in response_types for this protocol.'
165                    )
166
167            # Make sure all registered types have unique base names.
168            # We can take advantage of this to generate cleaner looking
169            # protocol modules. Can revisit if this is ever a problem.
170            mtypenames = set(tp.__name__ for tp in self.message_ids_by_type)
171            if len(mtypenames) != len(message_types):
172                raise ValueError(
173                    'message_types contains duplicate __name__s;'
174                    ' all types are required to have unique names.'
175                )
176
177        self.forward_clean_errors = forward_clean_errors
178        self.forward_communication_errors = forward_communication_errors
179        self.remote_errors_include_stack_traces = (
180            remote_errors_include_stack_traces
181        )
182        self.log_errors_on_receiver = log_errors_on_receiver
183        self.log_response_decode_errors = log_response_decode_errors
184
185    @staticmethod
186    def encode_dict(obj: dict) -> str:
187        """Json-encode a provided dict."""
188        return json.dumps(obj, separators=(',', ':'))
189
190    def message_to_dict(self, message: Message) -> dict:
191        """Encode a message to a json ready dict."""
192        return self._to_dict(message, self.message_ids_by_type, 'message')
193
194    def response_to_dict(self, response: Response | SysResponse) -> dict:
195        """Encode a response to a json ready dict."""
196        return self._to_dict(response, self.response_ids_by_type, 'response')
197
198    def error_to_response(self, exc: Exception) -> tuple[SysResponse, bool]:
199        """Translate an Exception to a SysResponse.
200
201        Also returns whether the error should be logged if this happened
202        within handle_raw_message().
203        """
204
205        # If anything goes wrong, return a ErrorSysResponse instead.
206        # (either CLEAN or generic REMOTE)
207        if self.forward_clean_errors and isinstance(exc, CleanError):
208            return (
209                ErrorSysResponse(
210                    error_message=str(exc),
211                    error_type=ErrorSysResponse.ErrorType.REMOTE_CLEAN,
212                ),
213                False,
214            )
215        if self.forward_communication_errors and isinstance(
216            exc, CommunicationError
217        ):
218            return (
219                ErrorSysResponse(
220                    error_message=str(exc),
221                    error_type=ErrorSysResponse.ErrorType.REMOTE_COMMUNICATION,
222                ),
223                False,
224            )
225        return (
226            ErrorSysResponse(
227                error_message=(
228                    # Note: need to format exception ourself here; it
229                    # might not be current so we can't use
230                    # traceback.format_exc().
231                    ''.join(
232                        traceback.format_exception(
233                            type(exc), exc, exc.__traceback__
234                        )
235                    )
236                    if self.remote_errors_include_stack_traces
237                    else 'An internal error has occurred.'
238                ),
239                error_type=ErrorSysResponse.ErrorType.REMOTE,
240            ),
241            self.log_errors_on_receiver,
242        )
243
244    def _to_dict(
245        self, message: Any, ids_by_type: dict[type, int], opname: str
246    ) -> dict:
247        """Encode a message to a json string for transport."""
248
249        m_id: int | None = ids_by_type.get(type(message))
250        if m_id is None:
251            raise TypeError(
252                f'{opname} type is not registered in protocol:'
253                f' {type(message)}'
254            )
255        out = {'t': m_id, 'm': dataclass_to_dict(message)}
256        return out
257
258    @staticmethod
259    def decode_dict(data: str) -> dict:
260        """Decode data to a dict."""
261        out = json.loads(data)
262        assert isinstance(out, dict)
263        return out
264
265    def message_from_dict(self, data: dict) -> Message:
266        """Decode a message from a dict."""
267        out = self._from_dict(data, self.message_types_by_id, 'message')
268        assert isinstance(out, Message)
269        return out
270
271    def response_from_dict(self, data: dict) -> Response | SysResponse:
272        """Decode a response from a json string."""
273        out = self._from_dict(data, self.response_types_by_id, 'response')
274        assert isinstance(out, Response | SysResponse)
275        return out
276
277    # Weeeird; we get mypy errors returning dict[int, type] but
278    # dict[int, typing.Type] or dict[int, type[Any]] works..
279    def _from_dict(
280        self, data: dict, types_by_id: dict[int, type[Any]], opname: str
281    ) -> Any:
282        """Decode a message from a json string."""
283        msgdict: dict | None
284
285        m_id = data.get('t')
286        # Allow omitting 'm' dict if its empty.
287        msgdict = data.get('m', {})
288
289        assert isinstance(m_id, int)
290        assert isinstance(msgdict, dict)
291
292        # Decode this particular type.
293        msgtype = types_by_id.get(m_id)
294        if msgtype is None:
295            raise UnregisteredMessageIDError(
296                f'Got unregistered {opname} id of {m_id}.'
297            )
298
299        # Explicitly allow any fallbacks we define for our enums and
300        # multitypes. This allows us to build message types that remain
301        # loadable even when containing unrecognized future
302        # enums/multitype data. Be aware that this flags the object as
303        # 'lossy' however which prevents it from being reserialized by
304        # default.
305        return dataclass_from_dict(msgtype, msgdict, lossy=True)
306
307    def _get_module_header(
308        self,
309        part: Literal['sender', 'receiver'],
310        extra_import_code: str | None,
311        enable_async_sends: bool,
312    ) -> str:
313        """Return common parts of generated modules."""
314        # pylint: disable=too-many-locals
315        # pylint: disable=too-many-branches
316        # pylint: disable=too-many-statements
317        import textwrap
318
319        tpimports: dict[str, list[str]] = {}
320        imports: dict[str, list[str]] = {}
321
322        single_message_type = len(self.message_ids_by_type) == 1
323
324        msgtypes = list(self.message_ids_by_type)
325        if part == 'sender':
326            msgtypes.append(Message)
327        for msgtype in msgtypes:
328            tpimports.setdefault(msgtype.__module__, []).append(
329                msgtype.__name__
330            )
331        rsptypes = list(self.response_ids_by_type)
332        if part == 'sender':
333            rsptypes.append(Response)
334        for rsp_tp in rsptypes:
335            # Skip these as they don't actually show up in code.
336            if rsp_tp is EmptySysResponse or rsp_tp is ErrorSysResponse:
337                continue
338            if (
339                single_message_type
340                and part == 'sender'
341                and rsp_tp is not Response
342            ):
343                # We need to cast to the single supported response type
344                # in this case so need response types at runtime.
345                imports.setdefault(rsp_tp.__module__, []).append(
346                    rsp_tp.__name__
347                )
348            else:
349                tpimports.setdefault(rsp_tp.__module__, []).append(
350                    rsp_tp.__name__
351                )
352
353        import_lines = ''
354        tpimport_lines = ''
355
356        for module, names in sorted(imports.items()):
357            jnames = ', '.join(names)
358            line = f'from {module} import {jnames}'
359            if len(line) > 80:
360                # Recreate in a wrapping-friendly form.
361                line = f'from {module} import ({jnames})'
362            import_lines += f'{line}\n'
363        for module, names in sorted(tpimports.items()):
364            jnames = ', '.join(names)
365            line = f'from {module} import {jnames}'
366            if len(line) > 75:  # Account for indent
367                # Recreate in a wrapping-friendly form.
368                line = f'from {module} import ({jnames})'
369            tpimport_lines += f'{line}\n'
370
371        if part == 'sender':
372            import_lines += (
373                'from efro.message import MessageSender, BoundMessageSender\n'
374            )
375            tpimport_typing_extras = ''
376        else:
377            if single_message_type:
378                import_lines += (
379                    'from efro.message import (MessageReceiver,'
380                    ' BoundMessageReceiver, Message, Response)\n'
381                )
382            else:
383                import_lines += (
384                    'from efro.message import MessageReceiver,'
385                    ' BoundMessageReceiver\n'
386                )
387            tpimport_typing_extras = ', Awaitable'
388
389        if extra_import_code is not None:
390            import_lines += extra_import_code
391
392        ovld = ', overload' if not single_message_type else ''
393        ovld2 = (
394            ', cast, Awaitable'
395            if (single_message_type and part == 'sender' and enable_async_sends)
396            else ''
397        )
398        tpimport_lines = textwrap.indent(tpimport_lines, '    ')
399
400        baseimps = ['Any']
401        if part == 'receiver':
402            baseimps.append('Callable')
403        if part == 'sender' and enable_async_sends:
404            baseimps.append('Awaitable')
405        baseimps_s = ', '.join(baseimps)
406        out = (
407            '# Released under the MIT License. See LICENSE for details.\n'
408            f'#\n'
409            f'"""Auto-generated {part} module. Do not edit by hand."""\n'
410            f'\n'
411            f'from __future__ import annotations\n'
412            f'\n'
413            f'from typing import TYPE_CHECKING{ovld}{ovld2}\n'
414            f'\n'
415            f'{import_lines}'
416            f'\n'
417            f'if TYPE_CHECKING:\n'
418            f'    from typing import {baseimps_s}'
419            f'{tpimport_typing_extras}\n'
420            f'{tpimport_lines}'
421            f'\n'
422            f'\n'
423        )
424        return out
425
426    def do_create_sender_module(
427        self,
428        basename: str,
429        protocol_create_code: str,
430        enable_sync_sends: bool,
431        enable_async_sends: bool,
432        private: bool = False,
433        protocol_module_level_import_code: str | None = None,
434    ) -> str:
435        """Used by create_sender_module(); do not call directly."""
436        # pylint: disable=too-many-positional-arguments
437        # pylint: disable=too-many-locals
438        # pylint: disable=too-many-branches
439        import textwrap
440
441        msgtypes = list(self.message_ids_by_type.keys())
442
443        ppre = '_' if private else ''
444        out = self._get_module_header(
445            'sender',
446            extra_import_code=protocol_module_level_import_code,
447            enable_async_sends=enable_async_sends,
448        )
449        ccind = textwrap.indent(protocol_create_code, '        ')
450        out += (
451            f'class {ppre}{basename}(MessageSender):\n'
452            f'    """Protocol-specific sender."""\n'
453            f'\n'
454            f'    def __init__(self) -> None:\n'
455            f'{ccind}\n'
456            f'        super().__init__(protocol)\n'
457            f'\n'
458            f'    def __get__(\n'
459            f'        self, obj: Any, type_in: Any = None\n'
460            f'    ) -> {ppre}Bound{basename}:\n'
461            f'        return {ppre}Bound{basename}(obj, self)\n'
462            f'\n'
463            f'\n'
464            f'class {ppre}Bound{basename}(BoundMessageSender):\n'
465            f'    """Protocol-specific bound sender."""\n'
466        )
467
468        def _filt_tp_name(rtype: type[Response] | None) -> str:
469            return 'None' if rtype is None else rtype.__name__
470
471        # Define handler() overloads for all registered message types.
472        if msgtypes:
473            for async_pass in False, True:
474                if async_pass and not enable_async_sends:
475                    continue
476                if not async_pass and not enable_sync_sends:
477                    continue
478                pfx = 'async ' if async_pass else ''
479                sfx = '_async' if async_pass else ''
480                # awt = 'await ' if async_pass else ''
481                awt = ''
482                how = 'asynchronously' if async_pass else 'synchronously'
483
484                if len(msgtypes) == 1:
485                    # Special case: with a single message types we don't
486                    # use overloads.
487                    msgtype = msgtypes[0]
488                    msgtypevar = msgtype.__name__
489                    rtypes = msgtype.get_response_types()
490                    if len(rtypes) > 1:
491                        rtypevar = ' | '.join(_filt_tp_name(t) for t in rtypes)
492                    else:
493                        rtypevar = _filt_tp_name(rtypes[0])
494                    if async_pass:
495                        rtypevar = f'Awaitable[{rtypevar}]'
496                    out += (
497                        f'\n'
498                        f'    def send{sfx}(self,'
499                        f' message: {msgtypevar})'
500                        f' -> {rtypevar}:\n'
501                        f'        """Send a message {how}."""\n'
502                        f'        out = {awt}self._sender.'
503                        f'send{sfx}(self._obj, message)\n'
504                    )
505                    if not async_pass:
506                        out += (
507                            f'        assert isinstance(out, {rtypevar})\n'
508                            '        return out\n'
509                        )
510                    else:
511                        out += f'        return cast({rtypevar}, out)\n'
512
513                else:
514                    for msgtype in msgtypes:
515                        msgtypevar = msgtype.__name__
516                        rtypes = msgtype.get_response_types()
517                        if len(rtypes) > 1:
518                            rtypevar = ' | '.join(
519                                _filt_tp_name(t) for t in rtypes
520                            )
521                        else:
522                            rtypevar = _filt_tp_name(rtypes[0])
523                        out += (
524                            f'\n'
525                            f'    @overload\n'
526                            f'    {pfx}def send{sfx}(self,'
527                            f' message: {msgtypevar})'
528                            f' -> {rtypevar}: ...\n'
529                        )
530                    rtypevar = 'Response | None'
531                    if async_pass:
532                        rtypevar = f'Awaitable[{rtypevar}]'
533                    out += (
534                        f'\n'
535                        f'    def send{sfx}(self, message: Message)'
536                        f' -> {rtypevar}:\n'
537                        f'        """Send a message {how}."""\n'
538                        f'        return {awt}self._sender.'
539                        f'send{sfx}(self._obj, message)\n'
540                    )
541
542        return out
543
544    def do_create_receiver_module(
545        self,
546        basename: str,
547        protocol_create_code: str,
548        is_async: bool,
549        private: bool = False,
550        protocol_module_level_import_code: str | None = None,
551    ) -> str:
552        """Used by create_receiver_module(); do not call directly."""
553        # pylint: disable=too-many-locals
554        # pylint: disable=too-many-positional-arguments
555        import textwrap
556
557        desc = 'asynchronous' if is_async else 'synchronous'
558        ppre = '_' if private else ''
559        msgtypes = list(self.message_ids_by_type.keys())
560        out = self._get_module_header(
561            'receiver',
562            extra_import_code=protocol_module_level_import_code,
563            enable_async_sends=False,
564        )
565        ccind = textwrap.indent(protocol_create_code, '        ')
566        out += (
567            f'class {ppre}{basename}(MessageReceiver):\n'
568            f'    """Protocol-specific {desc} receiver."""\n'
569            f'\n'
570            f'    is_async = {is_async}\n'
571            f'\n'
572            f'    def __init__(self) -> None:\n'
573            f'{ccind}\n'
574            f'        super().__init__(protocol)\n'
575            f'\n'
576            f'    def __get__(\n'
577            f'        self,\n'
578            f'        obj: Any,\n'
579            f'        type_in: Any = None,\n'
580            f'    ) -> {ppre}Bound{basename}:\n'
581            f'        return {ppre}Bound{basename}('
582            f'obj, self)\n'
583        )
584
585        # Define handler() overloads for all registered message types.
586
587        def _filt_tp_name(rtype: type[Response] | None) -> str:
588            return 'None' if rtype is None else rtype.__name__
589
590        if msgtypes:
591            cbgn = 'Awaitable[' if is_async else ''
592            cend = ']' if is_async else ''
593            if len(msgtypes) == 1:
594                # Special case: when we have a single message type we don't
595                # use overloads.
596                msgtype = msgtypes[0]
597                msgtypevar = msgtype.__name__
598                rtypes = msgtype.get_response_types()
599                if len(rtypes) > 1:
600                    rtypevar = ' | '.join(_filt_tp_name(t) for t in rtypes)
601                else:
602                    rtypevar = _filt_tp_name(rtypes[0])
603                rtypevar = f'{cbgn}{rtypevar}{cend}'
604                out += (
605                    f'\n'
606                    f'    def handler(\n'
607                    f'        self,\n'
608                    f'        call: Callable[[Any, {msgtypevar}], '
609                    f'{rtypevar}],\n'
610                    f'    )'
611                    f' -> Callable[[Any, {msgtypevar}], {rtypevar}]:\n'
612                    f'        """Decorator to register message handlers."""\n'
613                    f'        from typing import cast, Callable, Any\n'
614                    f'\n'
615                    f'        self.register_handler(cast(Callable'
616                    f'[[Any, Message], Response], call))\n'
617                    f'        return call\n'
618                )
619            else:
620                for msgtype in msgtypes:
621                    msgtypevar = msgtype.__name__
622                    rtypes = msgtype.get_response_types()
623                    if len(rtypes) > 1:
624                        rtypevar = ' | '.join(_filt_tp_name(t) for t in rtypes)
625                    else:
626                        rtypevar = _filt_tp_name(rtypes[0])
627                    rtypevar = f'{cbgn}{rtypevar}{cend}'
628                    out += (
629                        f'\n'
630                        f'    @overload\n'
631                        f'    def handler(\n'
632                        f'        self,\n'
633                        f'        call: Callable[[Any, {msgtypevar}], '
634                        f'{rtypevar}],\n'
635                        f'    )'
636                        f' -> Callable[[Any, {msgtypevar}], {rtypevar}]: ...\n'
637                    )
638                out += (
639                    '\n'
640                    '    def handler(self, call: Callable) -> Callable:\n'
641                    '        """Decorator to register message handlers."""\n'
642                    '        self.register_handler(call)\n'
643                    '        return call\n'
644                )
645
646        out += (
647            f'\n'
648            f'\n'
649            f'class {ppre}Bound{basename}(BoundMessageReceiver):\n'
650            f'    """Protocol-specific bound receiver."""\n'
651        )
652        if is_async:
653            out += (
654                '\n'
655                '    def handle_raw_message(\n'
656                '        self, message: str, raise_unregistered: bool = False\n'
657                '    ) -> Awaitable[str]:\n'
658                '        """Asynchronously handle a raw incoming message."""\n'
659                '        return self._receiver.'
660                'handle_raw_message_async(\n'
661                '            self._obj, message, raise_unregistered\n'
662                '        )\n'
663            )
664
665        else:
666            out += (
667                '\n'
668                '    def handle_raw_message(\n'
669                '        self, message: str, raise_unregistered: bool = False\n'
670                '    ) -> str:\n'
671                '        """Synchronously handle a raw incoming message."""\n'
672                '        return self._receiver.handle_raw_message(\n'
673                '            self._obj, message, raise_unregistered\n'
674                '        )\n'
675            )
676
677        return out

Wrangles a set of message types, formats, and response types. Both endpoints must be using a compatible Protocol for communication to succeed. To maintain Protocol compatibility between revisions, all message types must retain the same id, message attr storage names must not change, newly added attrs must have default values, etc.

MessageProtocol( message_types: dict[int, type[Message]], response_types: dict[int, type[Response]], *, forward_communication_errors: bool = False, forward_clean_errors: bool = False, remote_errors_include_stack_traces: bool = False, log_errors_on_receiver: bool = True, log_response_decode_errors: bool = True)
 42    def __init__(
 43        self,
 44        message_types: dict[int, type[Message]],
 45        response_types: dict[int, type[Response]],
 46        *,
 47        forward_communication_errors: bool = False,
 48        forward_clean_errors: bool = False,
 49        remote_errors_include_stack_traces: bool = False,
 50        log_errors_on_receiver: bool = True,
 51        log_response_decode_errors: bool = True,
 52    ) -> None:
 53        """Create a protocol with a given configuration.
 54
 55        If 'forward_communication_errors' is True,
 56        efro.error.CommunicationErrors raised on the receiver end will
 57        result in a matching error raised back on the sender. This can
 58        be useful if the receiver will be in some way forwarding
 59        messages along and the sender doesn't need to know where
 60        communication breakdowns occurred; only that they did.
 61
 62        If 'forward_clean_errors' is True, efro.error.CleanError
 63        exceptions raised on the receiver end will result in a matching
 64        CleanError raised back on the sender.
 65
 66        When an exception is not covered by the optional forwarding
 67        mechanisms above, it will come across as efro.error.RemoteError
 68        and the exception will be logged on the receiver end - at least
 69        by default (see details below).
 70
 71        If 'remote_errors_include_stack_traces' is True, stringified
 72        stack traces will be returned with efro.error.RemoteError
 73        exceptions. This is useful for debugging but should only be
 74        enabled in cases where the sender is trusted to see internal
 75        details of the receiver.
 76
 77        By default, when a message-handling exception will result in an
 78        efro.error.RemoteError being returned to the sender, the
 79        exception will be logged on the receiver. This is because the
 80        goal is usually to avoid returning opaque RemoteErrors and to
 81        instead return something meaningful as part of the expected
 82        response type (even if that value itself represents a logical
 83        error state). If 'log_errors_on_receiver' is False, however,
 84        such exceptions will *not* be logged on the receiver. This can
 85        be useful in combination with
 86        'remote_errors_include_stack_traces' and 'forward_clean_errors'
 87        in situations where all error logging/management will be
 88        happening on the sender end. Be aware, however, that in that
 89        case it may be possible for communication errors to prevent some
 90        errors from ever being acknowledged.
 91
 92        If an error occurs when decoding a message response, a
 93        RuntimeError is generated locally. However, in practice it is
 94        likely for such errors to be silently ignored by message
 95        handling code alongside more common communication type errors,
 96        meaning serious protocol breakage could go unnoticed. To avoid
 97        this, a log message is also printed in such cases. Pass
 98        'log_response_decode_errors' as False to disable this logging.
 99        """
100        # pylint: disable=too-many-locals
101        self.message_types_by_id: dict[int, type[Message]] = {}
102        self.message_ids_by_type: dict[type[Message], int] = {}
103        self.response_types_by_id: dict[
104            int, type[Response] | type[SysResponse]
105        ] = {}
106        self.response_ids_by_type: dict[
107            type[Response] | type[SysResponse], int
108        ] = {}
109        for m_id, m_type in message_types.items():
110            # Make sure only valid message types were passed and each
111            # id was assigned only once.
112            assert isinstance(m_id, int)
113            assert m_id >= 0
114            assert is_ioprepped_dataclass(m_type) and issubclass(
115                m_type, Message
116            )
117            assert self.message_types_by_id.get(m_id) is None
118            self.message_types_by_id[m_id] = m_type
119            self.message_ids_by_type[m_type] = m_id
120
121        for r_id, r_type in response_types.items():
122            assert isinstance(r_id, int)
123            assert r_id >= 0
124            assert is_ioprepped_dataclass(r_type) and issubclass(
125                r_type, Response
126            )
127            assert self.response_types_by_id.get(r_id) is None
128            self.response_types_by_id[r_id] = r_type
129            self.response_ids_by_type[r_type] = r_id
130
131        # Register our SysResponse types. These use negative
132        # IDs so as to never overlap with user Response types.
133        def _reg_sys(reg_tp: type[SysResponse], reg_id: int) -> None:
134            assert self.response_types_by_id.get(reg_id) is None
135            self.response_types_by_id[reg_id] = reg_tp
136            self.response_ids_by_type[reg_tp] = reg_id
137
138        _reg_sys(ErrorSysResponse, -1)
139        _reg_sys(EmptySysResponse, -2)
140
141        # Some extra-thorough validation in debug mode.
142        if __debug__:
143            # Make sure all Message types' return types are valid
144            # and have been assigned an ID as well.
145            all_response_types: set[type[Response] | None] = set()
146            for m_id, m_type in message_types.items():
147                m_rtypes = m_type.get_response_types()
148
149                assert isinstance(m_rtypes, list)
150                assert (
151                    m_rtypes
152                ), f'Message type {m_type} specifies no return types.'
153                assert len(set(m_rtypes)) == len(m_rtypes)  # check dups
154                for m_rtype in m_rtypes:
155                    all_response_types.add(m_rtype)
156            for cls in all_response_types:
157                if cls is None:
158                    continue
159                assert is_ioprepped_dataclass(cls)
160                assert issubclass(cls, Response)
161                if cls not in self.response_ids_by_type:
162                    raise ValueError(
163                        f'Possible response type {cls} needs to be included'
164                        f' in response_types for this protocol.'
165                    )
166
167            # Make sure all registered types have unique base names.
168            # We can take advantage of this to generate cleaner looking
169            # protocol modules. Can revisit if this is ever a problem.
170            mtypenames = set(tp.__name__ for tp in self.message_ids_by_type)
171            if len(mtypenames) != len(message_types):
172                raise ValueError(
173                    'message_types contains duplicate __name__s;'
174                    ' all types are required to have unique names.'
175                )
176
177        self.forward_clean_errors = forward_clean_errors
178        self.forward_communication_errors = forward_communication_errors
179        self.remote_errors_include_stack_traces = (
180            remote_errors_include_stack_traces
181        )
182        self.log_errors_on_receiver = log_errors_on_receiver
183        self.log_response_decode_errors = log_response_decode_errors

Create a protocol with a given configuration.

If 'forward_communication_errors' is True, efro.error.CommunicationErrors raised on the receiver end will result in a matching error raised back on the sender. This can be useful if the receiver will be in some way forwarding messages along and the sender doesn't need to know where communication breakdowns occurred; only that they did.

If 'forward_clean_errors' is True, efro.error.CleanError exceptions raised on the receiver end will result in a matching CleanError raised back on the sender.

When an exception is not covered by the optional forwarding mechanisms above, it will come across as efro.error.RemoteError and the exception will be logged on the receiver end - at least by default (see details below).

If 'remote_errors_include_stack_traces' is True, stringified stack traces will be returned with efro.error.RemoteError exceptions. This is useful for debugging but should only be enabled in cases where the sender is trusted to see internal details of the receiver.

By default, when a message-handling exception will result in an efro.error.RemoteError being returned to the sender, the exception will be logged on the receiver. This is because the goal is usually to avoid returning opaque RemoteErrors and to instead return something meaningful as part of the expected response type (even if that value itself represents a logical error state). If 'log_errors_on_receiver' is False, however, such exceptions will *not* be logged on the receiver. This can be useful in combination with 'remote_errors_include_stack_traces' and 'forward_clean_errors' in situations where all error logging/management will be happening on the sender end. Be aware, however, that in that case it may be possible for communication errors to prevent some errors from ever being acknowledged.

If an error occurs when decoding a message response, a RuntimeError is generated locally. However, in practice it is likely for such errors to be silently ignored by message handling code alongside more common communication type errors, meaning serious protocol breakage could go unnoticed. To avoid this, a log message is also printed in such cases. Pass 'log_response_decode_errors' as False to disable this logging.

message_types_by_id: dict[int, type[Message]]
message_ids_by_type: dict[type[Message], int]
response_types_by_id: dict[int, type[Response] | type[SysResponse]]
response_ids_by_type: dict[type[Response] | type[SysResponse], int]
forward_clean_errors
forward_communication_errors
remote_errors_include_stack_traces
log_errors_on_receiver
log_response_decode_errors
@staticmethod
def encode_dict(obj: dict) -> str:
185    @staticmethod
186    def encode_dict(obj: dict) -> str:
187        """Json-encode a provided dict."""
188        return json.dumps(obj, separators=(',', ':'))

Json-encode a provided dict.

def message_to_dict(self, message: Message) -> dict:
190    def message_to_dict(self, message: Message) -> dict:
191        """Encode a message to a json ready dict."""
192        return self._to_dict(message, self.message_ids_by_type, 'message')

Encode a message to a json ready dict.

def response_to_dict( self, response: Response | SysResponse) -> dict:
194    def response_to_dict(self, response: Response | SysResponse) -> dict:
195        """Encode a response to a json ready dict."""
196        return self._to_dict(response, self.response_ids_by_type, 'response')

Encode a response to a json ready dict.

def error_to_response(self, exc: Exception) -> tuple[SysResponse, bool]:
198    def error_to_response(self, exc: Exception) -> tuple[SysResponse, bool]:
199        """Translate an Exception to a SysResponse.
200
201        Also returns whether the error should be logged if this happened
202        within handle_raw_message().
203        """
204
205        # If anything goes wrong, return a ErrorSysResponse instead.
206        # (either CLEAN or generic REMOTE)
207        if self.forward_clean_errors and isinstance(exc, CleanError):
208            return (
209                ErrorSysResponse(
210                    error_message=str(exc),
211                    error_type=ErrorSysResponse.ErrorType.REMOTE_CLEAN,
212                ),
213                False,
214            )
215        if self.forward_communication_errors and isinstance(
216            exc, CommunicationError
217        ):
218            return (
219                ErrorSysResponse(
220                    error_message=str(exc),
221                    error_type=ErrorSysResponse.ErrorType.REMOTE_COMMUNICATION,
222                ),
223                False,
224            )
225        return (
226            ErrorSysResponse(
227                error_message=(
228                    # Note: need to format exception ourself here; it
229                    # might not be current so we can't use
230                    # traceback.format_exc().
231                    ''.join(
232                        traceback.format_exception(
233                            type(exc), exc, exc.__traceback__
234                        )
235                    )
236                    if self.remote_errors_include_stack_traces
237                    else 'An internal error has occurred.'
238                ),
239                error_type=ErrorSysResponse.ErrorType.REMOTE,
240            ),
241            self.log_errors_on_receiver,
242        )

Translate an Exception to a SysResponse.

Also returns whether the error should be logged if this happened within handle_raw_message().

@staticmethod
def decode_dict(data: str) -> dict:
258    @staticmethod
259    def decode_dict(data: str) -> dict:
260        """Decode data to a dict."""
261        out = json.loads(data)
262        assert isinstance(out, dict)
263        return out

Decode data to a dict.

def message_from_dict(self, data: dict) -> Message:
265    def message_from_dict(self, data: dict) -> Message:
266        """Decode a message from a dict."""
267        out = self._from_dict(data, self.message_types_by_id, 'message')
268        assert isinstance(out, Message)
269        return out

Decode a message from a dict.

def response_from_dict( self, data: dict) -> Response | SysResponse:
271    def response_from_dict(self, data: dict) -> Response | SysResponse:
272        """Decode a response from a json string."""
273        out = self._from_dict(data, self.response_types_by_id, 'response')
274        assert isinstance(out, Response | SysResponse)
275        return out

Decode a response from a json string.

def do_create_sender_module( self, basename: str, protocol_create_code: str, enable_sync_sends: bool, enable_async_sends: bool, private: bool = False, protocol_module_level_import_code: str | None = None) -> str:
426    def do_create_sender_module(
427        self,
428        basename: str,
429        protocol_create_code: str,
430        enable_sync_sends: bool,
431        enable_async_sends: bool,
432        private: bool = False,
433        protocol_module_level_import_code: str | None = None,
434    ) -> str:
435        """Used by create_sender_module(); do not call directly."""
436        # pylint: disable=too-many-positional-arguments
437        # pylint: disable=too-many-locals
438        # pylint: disable=too-many-branches
439        import textwrap
440
441        msgtypes = list(self.message_ids_by_type.keys())
442
443        ppre = '_' if private else ''
444        out = self._get_module_header(
445            'sender',
446            extra_import_code=protocol_module_level_import_code,
447            enable_async_sends=enable_async_sends,
448        )
449        ccind = textwrap.indent(protocol_create_code, '        ')
450        out += (
451            f'class {ppre}{basename}(MessageSender):\n'
452            f'    """Protocol-specific sender."""\n'
453            f'\n'
454            f'    def __init__(self) -> None:\n'
455            f'{ccind}\n'
456            f'        super().__init__(protocol)\n'
457            f'\n'
458            f'    def __get__(\n'
459            f'        self, obj: Any, type_in: Any = None\n'
460            f'    ) -> {ppre}Bound{basename}:\n'
461            f'        return {ppre}Bound{basename}(obj, self)\n'
462            f'\n'
463            f'\n'
464            f'class {ppre}Bound{basename}(BoundMessageSender):\n'
465            f'    """Protocol-specific bound sender."""\n'
466        )
467
468        def _filt_tp_name(rtype: type[Response] | None) -> str:
469            return 'None' if rtype is None else rtype.__name__
470
471        # Define handler() overloads for all registered message types.
472        if msgtypes:
473            for async_pass in False, True:
474                if async_pass and not enable_async_sends:
475                    continue
476                if not async_pass and not enable_sync_sends:
477                    continue
478                pfx = 'async ' if async_pass else ''
479                sfx = '_async' if async_pass else ''
480                # awt = 'await ' if async_pass else ''
481                awt = ''
482                how = 'asynchronously' if async_pass else 'synchronously'
483
484                if len(msgtypes) == 1:
485                    # Special case: with a single message types we don't
486                    # use overloads.
487                    msgtype = msgtypes[0]
488                    msgtypevar = msgtype.__name__
489                    rtypes = msgtype.get_response_types()
490                    if len(rtypes) > 1:
491                        rtypevar = ' | '.join(_filt_tp_name(t) for t in rtypes)
492                    else:
493                        rtypevar = _filt_tp_name(rtypes[0])
494                    if async_pass:
495                        rtypevar = f'Awaitable[{rtypevar}]'
496                    out += (
497                        f'\n'
498                        f'    def send{sfx}(self,'
499                        f' message: {msgtypevar})'
500                        f' -> {rtypevar}:\n'
501                        f'        """Send a message {how}."""\n'
502                        f'        out = {awt}self._sender.'
503                        f'send{sfx}(self._obj, message)\n'
504                    )
505                    if not async_pass:
506                        out += (
507                            f'        assert isinstance(out, {rtypevar})\n'
508                            '        return out\n'
509                        )
510                    else:
511                        out += f'        return cast({rtypevar}, out)\n'
512
513                else:
514                    for msgtype in msgtypes:
515                        msgtypevar = msgtype.__name__
516                        rtypes = msgtype.get_response_types()
517                        if len(rtypes) > 1:
518                            rtypevar = ' | '.join(
519                                _filt_tp_name(t) for t in rtypes
520                            )
521                        else:
522                            rtypevar = _filt_tp_name(rtypes[0])
523                        out += (
524                            f'\n'
525                            f'    @overload\n'
526                            f'    {pfx}def send{sfx}(self,'
527                            f' message: {msgtypevar})'
528                            f' -> {rtypevar}: ...\n'
529                        )
530                    rtypevar = 'Response | None'
531                    if async_pass:
532                        rtypevar = f'Awaitable[{rtypevar}]'
533                    out += (
534                        f'\n'
535                        f'    def send{sfx}(self, message: Message)'
536                        f' -> {rtypevar}:\n'
537                        f'        """Send a message {how}."""\n'
538                        f'        return {awt}self._sender.'
539                        f'send{sfx}(self._obj, message)\n'
540                    )
541
542        return out

Used by create_sender_module(); do not call directly.

def do_create_receiver_module( self, basename: str, protocol_create_code: str, is_async: bool, private: bool = False, protocol_module_level_import_code: str | None = None) -> str:
544    def do_create_receiver_module(
545        self,
546        basename: str,
547        protocol_create_code: str,
548        is_async: bool,
549        private: bool = False,
550        protocol_module_level_import_code: str | None = None,
551    ) -> str:
552        """Used by create_receiver_module(); do not call directly."""
553        # pylint: disable=too-many-locals
554        # pylint: disable=too-many-positional-arguments
555        import textwrap
556
557        desc = 'asynchronous' if is_async else 'synchronous'
558        ppre = '_' if private else ''
559        msgtypes = list(self.message_ids_by_type.keys())
560        out = self._get_module_header(
561            'receiver',
562            extra_import_code=protocol_module_level_import_code,
563            enable_async_sends=False,
564        )
565        ccind = textwrap.indent(protocol_create_code, '        ')
566        out += (
567            f'class {ppre}{basename}(MessageReceiver):\n'
568            f'    """Protocol-specific {desc} receiver."""\n'
569            f'\n'
570            f'    is_async = {is_async}\n'
571            f'\n'
572            f'    def __init__(self) -> None:\n'
573            f'{ccind}\n'
574            f'        super().__init__(protocol)\n'
575            f'\n'
576            f'    def __get__(\n'
577            f'        self,\n'
578            f'        obj: Any,\n'
579            f'        type_in: Any = None,\n'
580            f'    ) -> {ppre}Bound{basename}:\n'
581            f'        return {ppre}Bound{basename}('
582            f'obj, self)\n'
583        )
584
585        # Define handler() overloads for all registered message types.
586
587        def _filt_tp_name(rtype: type[Response] | None) -> str:
588            return 'None' if rtype is None else rtype.__name__
589
590        if msgtypes:
591            cbgn = 'Awaitable[' if is_async else ''
592            cend = ']' if is_async else ''
593            if len(msgtypes) == 1:
594                # Special case: when we have a single message type we don't
595                # use overloads.
596                msgtype = msgtypes[0]
597                msgtypevar = msgtype.__name__
598                rtypes = msgtype.get_response_types()
599                if len(rtypes) > 1:
600                    rtypevar = ' | '.join(_filt_tp_name(t) for t in rtypes)
601                else:
602                    rtypevar = _filt_tp_name(rtypes[0])
603                rtypevar = f'{cbgn}{rtypevar}{cend}'
604                out += (
605                    f'\n'
606                    f'    def handler(\n'
607                    f'        self,\n'
608                    f'        call: Callable[[Any, {msgtypevar}], '
609                    f'{rtypevar}],\n'
610                    f'    )'
611                    f' -> Callable[[Any, {msgtypevar}], {rtypevar}]:\n'
612                    f'        """Decorator to register message handlers."""\n'
613                    f'        from typing import cast, Callable, Any\n'
614                    f'\n'
615                    f'        self.register_handler(cast(Callable'
616                    f'[[Any, Message], Response], call))\n'
617                    f'        return call\n'
618                )
619            else:
620                for msgtype in msgtypes:
621                    msgtypevar = msgtype.__name__
622                    rtypes = msgtype.get_response_types()
623                    if len(rtypes) > 1:
624                        rtypevar = ' | '.join(_filt_tp_name(t) for t in rtypes)
625                    else:
626                        rtypevar = _filt_tp_name(rtypes[0])
627                    rtypevar = f'{cbgn}{rtypevar}{cend}'
628                    out += (
629                        f'\n'
630                        f'    @overload\n'
631                        f'    def handler(\n'
632                        f'        self,\n'
633                        f'        call: Callable[[Any, {msgtypevar}], '
634                        f'{rtypevar}],\n'
635                        f'    )'
636                        f' -> Callable[[Any, {msgtypevar}], {rtypevar}]: ...\n'
637                    )
638                out += (
639                    '\n'
640                    '    def handler(self, call: Callable) -> Callable:\n'
641                    '        """Decorator to register message handlers."""\n'
642                    '        self.register_handler(call)\n'
643                    '        return call\n'
644                )
645
646        out += (
647            f'\n'
648            f'\n'
649            f'class {ppre}Bound{basename}(BoundMessageReceiver):\n'
650            f'    """Protocol-specific bound receiver."""\n'
651        )
652        if is_async:
653            out += (
654                '\n'
655                '    def handle_raw_message(\n'
656                '        self, message: str, raise_unregistered: bool = False\n'
657                '    ) -> Awaitable[str]:\n'
658                '        """Asynchronously handle a raw incoming message."""\n'
659                '        return self._receiver.'
660                'handle_raw_message_async(\n'
661                '            self._obj, message, raise_unregistered\n'
662                '        )\n'
663            )
664
665        else:
666            out += (
667                '\n'
668                '    def handle_raw_message(\n'
669                '        self, message: str, raise_unregistered: bool = False\n'
670                '    ) -> str:\n'
671                '        """Synchronously handle a raw incoming message."""\n'
672                '        return self._receiver.handle_raw_message(\n'
673                '            self._obj, message, raise_unregistered\n'
674                '        )\n'
675            )
676
677        return out

Used by create_receiver_module(); do not call directly.

class MessageSender:
 23class MessageSender:
 24    """Facilitates sending messages to a target and receiving responses.
 25
 26    These are instantiated at the class level and used to register unbound
 27    class methods to handle raw message sending. Generally this class is not
 28    used directly, but instead autogenerated subclasses which provide type
 29    safe overloads are used instead.
 30
 31    Example:
 32      (In this example, MyMessageSender is an autogenerated class that
 33      inherits from MessageSender).
 34
 35    class MyClass:
 36        msg = MyMessageSender()
 37
 38        @msg.send_method
 39        def send_raw_message(self, message: str) -> str:
 40            # Actually send the message here.
 41
 42    obj = MyClass()
 43
 44    # The MyMessageSender generated class would provides overloads for
 45    # send(), send_async(), etc. to provide type-safety for message types
 46    # and their associated response types.
 47    # Thus, given the statement below, a type-checker would know that
 48    # 'response' is a SomeResponseType or whatever is associated with
 49    # SomeMessageType.
 50    response = obj.msg.send(SomeMessageType())
 51
 52    """
 53
 54    def __init__(self, protocol: MessageProtocol) -> None:
 55        self.protocol = protocol
 56        self._send_raw_message_call: Callable[[Any, str], str] | None = None
 57        self._send_raw_message_ex_call: (
 58            Callable[[Any, str, Message], str] | None
 59        ) = None
 60        self._send_async_raw_message_call: (
 61            Callable[[Any, str], Awaitable[str]] | None
 62        ) = None
 63        self._send_async_raw_message_ex_call: (
 64            Callable[[Any, str, Message], Awaitable[str]] | None
 65        ) = None
 66        self._encode_filter_call: (
 67            Callable[[Any, Message, dict], None] | None
 68        ) = None
 69        self._decode_filter_call: (
 70            Callable[[Any, Message, dict, Response | SysResponse], None] | None
 71        ) = None
 72        self._peer_desc_call: Callable[[Any], str] | None = None
 73
 74    def send_method(
 75        self, call: Callable[[Any, str], str]
 76    ) -> Callable[[Any, str], str]:
 77        """Function decorator for setting raw send method.
 78
 79        Send methods take strings and should return strings.
 80        CommunicationErrors raised here will be returned to the sender
 81        as such; all other exceptions will result in a RuntimeError for
 82        the sender.
 83        """
 84        assert self._send_raw_message_call is None
 85        self._send_raw_message_call = call
 86        return call
 87
 88    def send_ex_method(
 89        self, call: Callable[[Any, str, Message], str]
 90    ) -> Callable[[Any, str, Message], str]:
 91        """Function decorator for extended send method.
 92
 93        Version of send_method which is also is passed the original
 94        unencoded message; can be useful for cases where metadata is sent
 95        along with messages referring to their payloads/etc.
 96        """
 97        assert self._send_raw_message_ex_call is None
 98        self._send_raw_message_ex_call = call
 99        return call
100
101    def send_async_method(
102        self, call: Callable[[Any, str], Awaitable[str]]
103    ) -> Callable[[Any, str], Awaitable[str]]:
104        """Function decorator for setting raw send-async method.
105
106        Send methods take strings and should return strings.
107        CommunicationErrors raised here will be returned to the sender
108        as such; all other exceptions will result in a RuntimeError for
109        the sender.
110
111        IMPORTANT: Generally async send methods should not be implemented
112        as 'async' methods, but instead should be regular methods that
113        return awaitable objects. This way it can be guaranteed that
114        outgoing messages are synchronously enqueued in the correct
115        order, and then async calls can be returned which finish each
116        send. If the entire call is async, they may be enqueued out of
117        order in rare cases.
118        """
119        assert self._send_async_raw_message_call is None
120        self._send_async_raw_message_call = call
121        return call
122
123    def send_async_ex_method(
124        self, call: Callable[[Any, str, Message], Awaitable[str]]
125    ) -> Callable[[Any, str, Message], Awaitable[str]]:
126        """Function decorator for extended send-async method.
127
128        Version of send_async_method which is also is passed the original
129        unencoded message; can be useful for cases where metadata is sent
130        along with messages referring to their payloads/etc.
131        """
132        assert self._send_async_raw_message_ex_call is None
133        self._send_async_raw_message_ex_call = call
134        return call
135
136    def encode_filter_method(
137        self, call: Callable[[Any, Message, dict], None]
138    ) -> Callable[[Any, Message, dict], None]:
139        """Function decorator for defining an encode filter.
140
141        Encode filters can be used to add extra data to the message
142        dict before is is encoded to a string and sent out.
143        """
144        assert self._encode_filter_call is None
145        self._encode_filter_call = call
146        return call
147
148    def decode_filter_method(
149        self, call: Callable[[Any, Message, dict, Response | SysResponse], None]
150    ) -> Callable[[Any, Message, dict, Response], None]:
151        """Function decorator for defining a decode filter.
152
153        Decode filters can be used to extract extra data from incoming
154        message dicts.
155        """
156        assert self._decode_filter_call is None
157        self._decode_filter_call = call
158        return call
159
160    def peer_desc_method(
161        self, call: Callable[[Any], str]
162    ) -> Callable[[Any], str]:
163        """Function decorator for defining peer descriptions.
164
165        These are included in error messages or other diagnostics.
166        """
167        assert self._peer_desc_call is None
168        self._peer_desc_call = call
169        return call
170
171    def send(self, bound_obj: Any, message: Message) -> Response | None:
172        """Send a message synchronously."""
173        return self.unpack_raw_response(
174            bound_obj=bound_obj,
175            message=message,
176            raw_response=self.fetch_raw_response(
177                bound_obj=bound_obj,
178                message=message,
179            ),
180        )
181
182    def send_async(
183        self, bound_obj: Any, message: Message
184    ) -> Awaitable[Response | None]:
185        """Send a message asynchronously."""
186
187        # Note: This call is synchronous so that the first part of it can
188        # happen synchronously. If the whole call were async we wouldn't be
189        # able to guarantee that messages sent in order would actually go
190        # out in order.
191        raw_response_awaitable = self.fetch_raw_response_async(
192            bound_obj=bound_obj,
193            message=message,
194        )
195        # Now return an awaitable that will finish the send.
196        return self._send_async_awaitable(
197            bound_obj, message, raw_response_awaitable
198        )
199
200    async def _send_async_awaitable(
201        self,
202        bound_obj: Any,
203        message: Message,
204        raw_response_awaitable: Awaitable[Response | SysResponse],
205    ) -> Response | None:
206        return self.unpack_raw_response(
207            bound_obj=bound_obj,
208            message=message,
209            raw_response=await raw_response_awaitable,
210        )
211
212    def fetch_raw_response(
213        self, bound_obj: Any, message: Message
214    ) -> Response | SysResponse:
215        """Send a message synchronously.
216
217        Generally you can just call send(); these split versions are
218        for when message sending and response handling need to happen
219        in different contexts/threads.
220        """
221        if (
222            self._send_raw_message_call is None
223            and self._send_raw_message_ex_call is None
224        ):
225            raise RuntimeError('send() is unimplemented for this type.')
226
227        msg_encoded = self._encode_message(bound_obj, message)
228        try:
229            if self._send_raw_message_ex_call is not None:
230                response_encoded = self._send_raw_message_ex_call(
231                    bound_obj, msg_encoded, message
232                )
233            else:
234                assert self._send_raw_message_call is not None
235                response_encoded = self._send_raw_message_call(
236                    bound_obj, msg_encoded
237                )
238        except Exception as exc:
239            response = ErrorSysResponse(
240                error_message='Error in MessageSender @send_method.',
241                error_type=(
242                    ErrorSysResponse.ErrorType.COMMUNICATION
243                    if isinstance(exc, CommunicationError)
244                    else ErrorSysResponse.ErrorType.LOCAL
245                ),
246            )
247            # Can include the actual exception since we'll be looking at
248            # this locally; might be helpful.
249            response.set_local_exception(exc)
250            return response
251        return self._decode_raw_response(bound_obj, message, response_encoded)
252
253    def fetch_raw_response_async(
254        self, bound_obj: Any, message: Message
255    ) -> Awaitable[Response | SysResponse]:
256        """Fetch a raw message response awaitable.
257
258        The result of this should be awaited and then passed to
259        unpack_raw_response() to produce the final message result.
260
261        Generally you can just call send(); calling fetch and unpack
262        manually is for when message sending and response handling need
263        to happen in different contexts/threads.
264        """
265
266        # Note: This call is synchronous so that the first part of it can
267        # happen synchronously. If the whole call were async we wouldn't be
268        # able to guarantee that messages sent in order would actually go
269        # out in order.
270        if (
271            self._send_async_raw_message_call is None
272            and self._send_async_raw_message_ex_call is None
273        ):
274            raise RuntimeError('send_async() is unimplemented for this type.')
275
276        msg_encoded = self._encode_message(bound_obj, message)
277        try:
278            if self._send_async_raw_message_ex_call is not None:
279                send_awaitable = self._send_async_raw_message_ex_call(
280                    bound_obj, msg_encoded, message
281                )
282            else:
283                assert self._send_async_raw_message_call is not None
284                send_awaitable = self._send_async_raw_message_call(
285                    bound_obj, msg_encoded
286                )
287        except Exception as exc:
288            return self._error_awaitable(exc)
289
290        # Now return an awaitable to finish the job.
291        return self._fetch_raw_response_awaitable(
292            bound_obj, message, send_awaitable
293        )
294
295    async def _error_awaitable(self, exc: Exception) -> SysResponse:
296        response = ErrorSysResponse(
297            error_message='Error in MessageSender @send_async_method.',
298            error_type=(
299                ErrorSysResponse.ErrorType.COMMUNICATION
300                if isinstance(exc, CommunicationError)
301                else ErrorSysResponse.ErrorType.LOCAL
302            ),
303        )
304        # Can include the actual exception since we'll be looking at
305        # this locally; might be helpful.
306        response.set_local_exception(exc)
307        return response
308
309    async def _fetch_raw_response_awaitable(
310        self, bound_obj: Any, message: Message, send_awaitable: Awaitable[str]
311    ) -> Response | SysResponse:
312        try:
313            response_encoded = await send_awaitable
314        except Exception as exc:
315            response = ErrorSysResponse(
316                error_message='Error in MessageSender @send_async_method.',
317                error_type=(
318                    ErrorSysResponse.ErrorType.COMMUNICATION
319                    if isinstance(exc, CommunicationError)
320                    else ErrorSysResponse.ErrorType.LOCAL
321                ),
322            )
323            # Can include the actual exception since we'll be looking at
324            # this locally; might be helpful.
325            response.set_local_exception(exc)
326            return response
327        return self._decode_raw_response(bound_obj, message, response_encoded)
328
329    def unpack_raw_response(
330        self,
331        bound_obj: Any,
332        message: Message,
333        raw_response: Response | SysResponse,
334    ) -> Response | None:
335        """Convert a raw fetched response into a final response/error/etc.
336
337        Generally you can just call send(); calling fetch and unpack
338        manually is for when message sending and response handling need
339        to happen in different contexts/threads.
340        """
341        response = self._unpack_raw_response(bound_obj, raw_response)
342        assert (
343            response is None
344            or type(response) in type(message).get_response_types()
345        )
346        return response
347
348    def _encode_message(self, bound_obj: Any, message: Message) -> str:
349        """Encode a message for sending."""
350        msg_dict = self.protocol.message_to_dict(message)
351        if self._encode_filter_call is not None:
352            self._encode_filter_call(bound_obj, message, msg_dict)
353        return self.protocol.encode_dict(msg_dict)
354
355    def _decode_raw_response(
356        self, bound_obj: Any, message: Message, response_encoded: str
357    ) -> Response | SysResponse:
358        """Create a Response from returned data.
359
360        These Responses may encapsulate things like remote errors and
361        should not be handed directly to users. _unpack_raw_response()
362        should be used to translate to special values like None or raise
363        Exceptions. This function itself should never raise Exceptions.
364        """
365        response: Response | SysResponse
366        try:
367            response_dict = self.protocol.decode_dict(response_encoded)
368            response = self.protocol.response_from_dict(response_dict)
369            if self._decode_filter_call is not None:
370                self._decode_filter_call(
371                    bound_obj, message, response_dict, response
372                )
373        except Exception as exc:
374
375            # We pragmatically log by default if decoding fails. This
376            # means a message type was likely changed in a way that
377            # breaks the protocol, but individual message handlers are
378            # likely to lump all errors together (communication and
379            # otherwise) which could cause such breakage to go
380            # unnoticed.
381            if self.protocol.log_response_decode_errors:
382                logging.exception(
383                    'Error decoding message response;'
384                    ' protocol might be broken.',
385                )
386
387            response = ErrorSysResponse(
388                error_message='Error decoding raw response.',
389                error_type=ErrorSysResponse.ErrorType.LOCAL,
390            )
391            # Since we'll be looking at this locally, we can include
392            # extra info for logging/etc.
393            response.set_local_exception(exc)
394        return response
395
396    def _unpack_raw_response(
397        self, bound_obj: Any, raw_response: Response | SysResponse
398    ) -> Response | None:
399        """Given a raw Response, unpacks to special values or Exceptions.
400
401        The result of this call is what should be passed to users.
402        For complex messaging situations such as response callbacks
403        operating across different threads, this last stage should be
404        run such that any raised Exception is active when the callback
405        fires; not on the thread where the message was sent.
406        """
407        # EmptySysResponse translates to None
408        if isinstance(raw_response, EmptySysResponse):
409            return None
410
411        # Some error occurred. Raise a local Exception for it.
412        if isinstance(raw_response, ErrorSysResponse):
413            # Errors that happened locally can attach their exceptions
414            # here for extra logging goodness.
415            local_exception = raw_response.get_local_exception()
416
417            if (
418                raw_response.error_type
419                is ErrorSysResponse.ErrorType.COMMUNICATION
420            ):
421                raise CommunicationError(
422                    raw_response.error_message
423                ) from local_exception
424
425            # If something went wrong on *our* end of the connection,
426            # don't say it was a remote error.
427            if raw_response.error_type is ErrorSysResponse.ErrorType.LOCAL:
428                raise RuntimeError(
429                    raw_response.error_message
430                ) from local_exception
431
432            # If they want to support clean errors, do those.
433            if (
434                self.protocol.forward_clean_errors
435                and raw_response.error_type
436                is ErrorSysResponse.ErrorType.REMOTE_CLEAN
437            ):
438                raise CleanError(
439                    raw_response.error_message
440                ) from local_exception
441
442            if (
443                self.protocol.forward_communication_errors
444                and raw_response.error_type
445                is ErrorSysResponse.ErrorType.REMOTE_COMMUNICATION
446            ):
447                raise CommunicationError(
448                    raw_response.error_message
449                ) from local_exception
450
451            # Everything else gets lumped in as a remote error.
452            raise RemoteError(
453                raw_response.error_message,
454                peer_desc=(
455                    'peer'
456                    if self._peer_desc_call is None
457                    else self._peer_desc_call(bound_obj)
458                ),
459            ) from local_exception
460
461        assert isinstance(raw_response, Response)
462        return raw_response

Facilitates sending messages to a target and receiving responses.

These are instantiated at the class level and used to register unbound class methods to handle raw message sending. Generally this class is not used directly, but instead autogenerated subclasses which provide type safe overloads are used instead.

Example: (In this example, MyMessageSender is an autogenerated class that inherits from MessageSender).

class MyClass: msg = MyMessageSender()

@msg.send_method
def send_raw_message(self, message: str) -> str:
    # Actually send the message here.

obj = MyClass()

The MyMessageSender generated class would provides overloads for

send(), send_async(), etc. to provide type-safety for message types

and their associated response types.

Thus, given the statement below, a type-checker would know that

'response' is a SomeResponseType or whatever is associated with

SomeMessageType.

response = obj.msg.send(SomeMessageType())

MessageSender(protocol: MessageProtocol)
54    def __init__(self, protocol: MessageProtocol) -> None:
55        self.protocol = protocol
56        self._send_raw_message_call: Callable[[Any, str], str] | None = None
57        self._send_raw_message_ex_call: (
58            Callable[[Any, str, Message], str] | None
59        ) = None
60        self._send_async_raw_message_call: (
61            Callable[[Any, str], Awaitable[str]] | None
62        ) = None
63        self._send_async_raw_message_ex_call: (
64            Callable[[Any, str, Message], Awaitable[str]] | None
65        ) = None
66        self._encode_filter_call: (
67            Callable[[Any, Message, dict], None] | None
68        ) = None
69        self._decode_filter_call: (
70            Callable[[Any, Message, dict, Response | SysResponse], None] | None
71        ) = None
72        self._peer_desc_call: Callable[[Any], str] | None = None
protocol
def send_method(self, call: Callable[[Any, str], str]) -> Callable[[Any, str], str]:
74    def send_method(
75        self, call: Callable[[Any, str], str]
76    ) -> Callable[[Any, str], str]:
77        """Function decorator for setting raw send method.
78
79        Send methods take strings and should return strings.
80        CommunicationErrors raised here will be returned to the sender
81        as such; all other exceptions will result in a RuntimeError for
82        the sender.
83        """
84        assert self._send_raw_message_call is None
85        self._send_raw_message_call = call
86        return call

Function decorator for setting raw send method.

Send methods take strings and should return strings. CommunicationErrors raised here will be returned to the sender as such; all other exceptions will result in a RuntimeError for the sender.

def send_ex_method( self, call: Callable[[Any, str, Message], str]) -> Callable[[Any, str, Message], str]:
88    def send_ex_method(
89        self, call: Callable[[Any, str, Message], str]
90    ) -> Callable[[Any, str, Message], str]:
91        """Function decorator for extended send method.
92
93        Version of send_method which is also is passed the original
94        unencoded message; can be useful for cases where metadata is sent
95        along with messages referring to their payloads/etc.
96        """
97        assert self._send_raw_message_ex_call is None
98        self._send_raw_message_ex_call = call
99        return call

Function decorator for extended send method.

Version of send_method which is also is passed the original unencoded message; can be useful for cases where metadata is sent along with messages referring to their payloads/etc.

def send_async_method( self, call: Callable[[Any, str], Awaitable[str]]) -> Callable[[Any, str], Awaitable[str]]:
101    def send_async_method(
102        self, call: Callable[[Any, str], Awaitable[str]]
103    ) -> Callable[[Any, str], Awaitable[str]]:
104        """Function decorator for setting raw send-async method.
105
106        Send methods take strings and should return strings.
107        CommunicationErrors raised here will be returned to the sender
108        as such; all other exceptions will result in a RuntimeError for
109        the sender.
110
111        IMPORTANT: Generally async send methods should not be implemented
112        as 'async' methods, but instead should be regular methods that
113        return awaitable objects. This way it can be guaranteed that
114        outgoing messages are synchronously enqueued in the correct
115        order, and then async calls can be returned which finish each
116        send. If the entire call is async, they may be enqueued out of
117        order in rare cases.
118        """
119        assert self._send_async_raw_message_call is None
120        self._send_async_raw_message_call = call
121        return call

Function decorator for setting raw send-async method.

Send methods take strings and should return strings. CommunicationErrors raised here will be returned to the sender as such; all other exceptions will result in a RuntimeError for the sender.

IMPORTANT: Generally async send methods should not be implemented as 'async' methods, but instead should be regular methods that return awaitable objects. This way it can be guaranteed that outgoing messages are synchronously enqueued in the correct order, and then async calls can be returned which finish each send. If the entire call is async, they may be enqueued out of order in rare cases.

def send_async_ex_method( self, call: Callable[[Any, str, Message], Awaitable[str]]) -> Callable[[Any, str, Message], Awaitable[str]]:
123    def send_async_ex_method(
124        self, call: Callable[[Any, str, Message], Awaitable[str]]
125    ) -> Callable[[Any, str, Message], Awaitable[str]]:
126        """Function decorator for extended send-async method.
127
128        Version of send_async_method which is also is passed the original
129        unencoded message; can be useful for cases where metadata is sent
130        along with messages referring to their payloads/etc.
131        """
132        assert self._send_async_raw_message_ex_call is None
133        self._send_async_raw_message_ex_call = call
134        return call

Function decorator for extended send-async method.

Version of send_async_method which is also is passed the original unencoded message; can be useful for cases where metadata is sent along with messages referring to their payloads/etc.

def encode_filter_method( self, call: Callable[[Any, Message, dict], NoneType]) -> Callable[[Any, Message, dict], NoneType]:
136    def encode_filter_method(
137        self, call: Callable[[Any, Message, dict], None]
138    ) -> Callable[[Any, Message, dict], None]:
139        """Function decorator for defining an encode filter.
140
141        Encode filters can be used to add extra data to the message
142        dict before is is encoded to a string and sent out.
143        """
144        assert self._encode_filter_call is None
145        self._encode_filter_call = call
146        return call

Function decorator for defining an encode filter.

Encode filters can be used to add extra data to the message dict before is is encoded to a string and sent out.

def decode_filter_method( self, call: Callable[[Any, Message, dict, Response | SysResponse], NoneType]) -> Callable[[Any, Message, dict, Response], NoneType]:
148    def decode_filter_method(
149        self, call: Callable[[Any, Message, dict, Response | SysResponse], None]
150    ) -> Callable[[Any, Message, dict, Response], None]:
151        """Function decorator for defining a decode filter.
152
153        Decode filters can be used to extract extra data from incoming
154        message dicts.
155        """
156        assert self._decode_filter_call is None
157        self._decode_filter_call = call
158        return call

Function decorator for defining a decode filter.

Decode filters can be used to extract extra data from incoming message dicts.

def peer_desc_method(self, call: Callable[[Any], str]) -> Callable[[Any], str]:
160    def peer_desc_method(
161        self, call: Callable[[Any], str]
162    ) -> Callable[[Any], str]:
163        """Function decorator for defining peer descriptions.
164
165        These are included in error messages or other diagnostics.
166        """
167        assert self._peer_desc_call is None
168        self._peer_desc_call = call
169        return call

Function decorator for defining peer descriptions.

These are included in error messages or other diagnostics.

def send( self, bound_obj: Any, message: Message) -> Response | None:
171    def send(self, bound_obj: Any, message: Message) -> Response | None:
172        """Send a message synchronously."""
173        return self.unpack_raw_response(
174            bound_obj=bound_obj,
175            message=message,
176            raw_response=self.fetch_raw_response(
177                bound_obj=bound_obj,
178                message=message,
179            ),
180        )

Send a message synchronously.

def send_async( self, bound_obj: Any, message: Message) -> Awaitable[Response | None]:
182    def send_async(
183        self, bound_obj: Any, message: Message
184    ) -> Awaitable[Response | None]:
185        """Send a message asynchronously."""
186
187        # Note: This call is synchronous so that the first part of it can
188        # happen synchronously. If the whole call were async we wouldn't be
189        # able to guarantee that messages sent in order would actually go
190        # out in order.
191        raw_response_awaitable = self.fetch_raw_response_async(
192            bound_obj=bound_obj,
193            message=message,
194        )
195        # Now return an awaitable that will finish the send.
196        return self._send_async_awaitable(
197            bound_obj, message, raw_response_awaitable
198        )

Send a message asynchronously.

def fetch_raw_response( self, bound_obj: Any, message: Message) -> Response | SysResponse:
212    def fetch_raw_response(
213        self, bound_obj: Any, message: Message
214    ) -> Response | SysResponse:
215        """Send a message synchronously.
216
217        Generally you can just call send(); these split versions are
218        for when message sending and response handling need to happen
219        in different contexts/threads.
220        """
221        if (
222            self._send_raw_message_call is None
223            and self._send_raw_message_ex_call is None
224        ):
225            raise RuntimeError('send() is unimplemented for this type.')
226
227        msg_encoded = self._encode_message(bound_obj, message)
228        try:
229            if self._send_raw_message_ex_call is not None:
230                response_encoded = self._send_raw_message_ex_call(
231                    bound_obj, msg_encoded, message
232                )
233            else:
234                assert self._send_raw_message_call is not None
235                response_encoded = self._send_raw_message_call(
236                    bound_obj, msg_encoded
237                )
238        except Exception as exc:
239            response = ErrorSysResponse(
240                error_message='Error in MessageSender @send_method.',
241                error_type=(
242                    ErrorSysResponse.ErrorType.COMMUNICATION
243                    if isinstance(exc, CommunicationError)
244                    else ErrorSysResponse.ErrorType.LOCAL
245                ),
246            )
247            # Can include the actual exception since we'll be looking at
248            # this locally; might be helpful.
249            response.set_local_exception(exc)
250            return response
251        return self._decode_raw_response(bound_obj, message, response_encoded)

Send a message synchronously.

Generally you can just call send(); these split versions are for when message sending and response handling need to happen in different contexts/threads.

def fetch_raw_response_async( self, bound_obj: Any, message: Message) -> Awaitable[Response | SysResponse]:
253    def fetch_raw_response_async(
254        self, bound_obj: Any, message: Message
255    ) -> Awaitable[Response | SysResponse]:
256        """Fetch a raw message response awaitable.
257
258        The result of this should be awaited and then passed to
259        unpack_raw_response() to produce the final message result.
260
261        Generally you can just call send(); calling fetch and unpack
262        manually is for when message sending and response handling need
263        to happen in different contexts/threads.
264        """
265
266        # Note: This call is synchronous so that the first part of it can
267        # happen synchronously. If the whole call were async we wouldn't be
268        # able to guarantee that messages sent in order would actually go
269        # out in order.
270        if (
271            self._send_async_raw_message_call is None
272            and self._send_async_raw_message_ex_call is None
273        ):
274            raise RuntimeError('send_async() is unimplemented for this type.')
275
276        msg_encoded = self._encode_message(bound_obj, message)
277        try:
278            if self._send_async_raw_message_ex_call is not None:
279                send_awaitable = self._send_async_raw_message_ex_call(
280                    bound_obj, msg_encoded, message
281                )
282            else:
283                assert self._send_async_raw_message_call is not None
284                send_awaitable = self._send_async_raw_message_call(
285                    bound_obj, msg_encoded
286                )
287        except Exception as exc:
288            return self._error_awaitable(exc)
289
290        # Now return an awaitable to finish the job.
291        return self._fetch_raw_response_awaitable(
292            bound_obj, message, send_awaitable
293        )

Fetch a raw message response awaitable.

The result of this should be awaited and then passed to unpack_raw_response() to produce the final message result.

Generally you can just call send(); calling fetch and unpack manually is for when message sending and response handling need to happen in different contexts/threads.

def unpack_raw_response( self, bound_obj: Any, message: Message, raw_response: Response | SysResponse) -> Response | None:
329    def unpack_raw_response(
330        self,
331        bound_obj: Any,
332        message: Message,
333        raw_response: Response | SysResponse,
334    ) -> Response | None:
335        """Convert a raw fetched response into a final response/error/etc.
336
337        Generally you can just call send(); calling fetch and unpack
338        manually is for when message sending and response handling need
339        to happen in different contexts/threads.
340        """
341        response = self._unpack_raw_response(bound_obj, raw_response)
342        assert (
343            response is None
344            or type(response) in type(message).get_response_types()
345        )
346        return response

Convert a raw fetched response into a final response/error/etc.

Generally you can just call send(); calling fetch and unpack manually is for when message sending and response handling need to happen in different contexts/threads.

class BoundMessageSender:
465class BoundMessageSender:
466    """Base class for bound senders."""
467
468    def __init__(self, obj: Any, sender: MessageSender) -> None:
469        # Note: not checking obj here since we want to support
470        # at least our protocol property when accessed via type.
471        self._obj = obj
472        self._sender = sender
473
474    @property
475    def protocol(self) -> MessageProtocol:
476        """Protocol associated with this sender."""
477        return self._sender.protocol
478
479    def send_untyped(self, message: Message) -> Response | None:
480        """Send a message synchronously.
481
482        Whenever possible, use the send() call provided by generated
483        subclasses instead of this; it will provide better type safety.
484        """
485        assert self._obj is not None
486        return self._sender.send(bound_obj=self._obj, message=message)
487
488    def send_async_untyped(
489        self, message: Message
490    ) -> Awaitable[Response | None]:
491        """Send a message asynchronously.
492
493        Whenever possible, use the send_async() call provided by generated
494        subclasses instead of this; it will provide better type safety.
495        """
496        assert self._obj is not None
497        return self._sender.send_async(bound_obj=self._obj, message=message)
498
499    def fetch_raw_response_async_untyped(
500        self, message: Message
501    ) -> Awaitable[Response | SysResponse]:
502        """Split send (part 1 of 2)."""
503        assert self._obj is not None
504        return self._sender.fetch_raw_response_async(
505            bound_obj=self._obj, message=message
506        )
507
508    def unpack_raw_response_untyped(
509        self, message: Message, raw_response: Response | SysResponse
510    ) -> Response | None:
511        """Split send (part 2 of 2)."""
512        return self._sender.unpack_raw_response(
513            bound_obj=self._obj, message=message, raw_response=raw_response
514        )

Base class for bound senders.

BoundMessageSender(obj: Any, sender: MessageSender)
468    def __init__(self, obj: Any, sender: MessageSender) -> None:
469        # Note: not checking obj here since we want to support
470        # at least our protocol property when accessed via type.
471        self._obj = obj
472        self._sender = sender
protocol: MessageProtocol
474    @property
475    def protocol(self) -> MessageProtocol:
476        """Protocol associated with this sender."""
477        return self._sender.protocol

Protocol associated with this sender.

def send_untyped( self, message: Message) -> Response | None:
479    def send_untyped(self, message: Message) -> Response | None:
480        """Send a message synchronously.
481
482        Whenever possible, use the send() call provided by generated
483        subclasses instead of this; it will provide better type safety.
484        """
485        assert self._obj is not None
486        return self._sender.send(bound_obj=self._obj, message=message)

Send a message synchronously.

Whenever possible, use the send() call provided by generated subclasses instead of this; it will provide better type safety.

def send_async_untyped( self, message: Message) -> Awaitable[Response | None]:
488    def send_async_untyped(
489        self, message: Message
490    ) -> Awaitable[Response | None]:
491        """Send a message asynchronously.
492
493        Whenever possible, use the send_async() call provided by generated
494        subclasses instead of this; it will provide better type safety.
495        """
496        assert self._obj is not None
497        return self._sender.send_async(bound_obj=self._obj, message=message)

Send a message asynchronously.

Whenever possible, use the send_async() call provided by generated subclasses instead of this; it will provide better type safety.

def fetch_raw_response_async_untyped( self, message: Message) -> Awaitable[Response | SysResponse]:
499    def fetch_raw_response_async_untyped(
500        self, message: Message
501    ) -> Awaitable[Response | SysResponse]:
502        """Split send (part 1 of 2)."""
503        assert self._obj is not None
504        return self._sender.fetch_raw_response_async(
505            bound_obj=self._obj, message=message
506        )

Split send (part 1 of 2).

def unpack_raw_response_untyped( self, message: Message, raw_response: Response | SysResponse) -> Response | None:
508    def unpack_raw_response_untyped(
509        self, message: Message, raw_response: Response | SysResponse
510    ) -> Response | None:
511        """Split send (part 2 of 2)."""
512        return self._sender.unpack_raw_response(
513            bound_obj=self._obj, message=message, raw_response=raw_response
514        )

Split send (part 2 of 2).

class MessageReceiver:
 29class MessageReceiver:
 30    """Facilitates receiving & responding to messages from a remote source.
 31
 32    This is instantiated at the class level with unbound methods registered
 33    as handlers for different message types in the protocol.
 34
 35    Example:
 36
 37    class MyClass:
 38        receiver = MyMessageReceiver()
 39
 40        # MyMessageReceiver fills out handler() overloads to ensure all
 41        # registered handlers have valid types/return-types.
 42
 43        @receiver.handler
 44        def handle_some_message_type(self, message: SomeMsg) -> SomeResponse:
 45            # Deal with this message type here.
 46
 47    # This will trigger the registered handler being called.
 48    obj = MyClass()
 49    obj.receiver.handle_raw_message(some_raw_data)
 50
 51    Any unhandled Exception occurring during message handling will result in
 52    an efro.error.RemoteError being raised on the sending end.
 53    """
 54
 55    is_async = False
 56
 57    def __init__(self, protocol: MessageProtocol) -> None:
 58        self.protocol = protocol
 59        self._handlers: dict[type[Message], Callable] = {}
 60        self._decode_filter_call: (
 61            Callable[[Any, dict, Message], None] | None
 62        ) = None
 63        self._encode_filter_call: (
 64            Callable[[Any, Message | None, Response | SysResponse, dict], None]
 65            | None
 66        ) = None
 67
 68    # noinspection PyProtectedMember
 69    def register_handler(
 70        self, call: Callable[[Any, Message], Response | None]
 71    ) -> None:
 72        """Register a handler call.
 73
 74        The message type handled by the call is determined by its
 75        type annotation.
 76        """
 77        # TODO: can use types.GenericAlias in 3.9.
 78        # (hmm though now that we're there,  it seems a drop-in
 79        # replace gives us errors. Should re-test in 3.11 as it seems
 80        # that typing_extensions handles it differently in that case)
 81        from typing import _GenericAlias  # type: ignore
 82        from typing import get_type_hints, get_args
 83
 84        sig = inspect.getfullargspec(call)
 85
 86        # The provided callable should be a method taking one 'msg' arg.
 87        expectedsig = ['self', 'msg']
 88        if sig.args != expectedsig:
 89            raise ValueError(
 90                f'Expected callable signature of {expectedsig};'
 91                f' got {sig.args}'
 92            )
 93
 94        # Check annotation types to determine what message types we handle.
 95        # Return-type annotation can be a Union, but we probably don't
 96        # have it available at runtime. Explicitly pull it in.
 97        # UPDATE: we've updated our pylint filter to where we should
 98        # have all annotations available.
 99        # anns = get_type_hints(call, localns={'Union': Union})
100        anns = get_type_hints(call)
101
102        msgtype = anns.get('msg')
103        if not isinstance(msgtype, type):
104            raise TypeError(
105                f'expected a type for "msg" annotation; got {type(msgtype)}.'
106            )
107        assert issubclass(msgtype, Message)
108
109        ret = anns.get('return')
110        responsetypes: tuple[type[Any] | None, ...]
111
112        # Return types can be a single type or a union of types.
113        if isinstance(ret, (_GenericAlias, types.UnionType)):
114            targs = get_args(ret)
115            if not all(isinstance(a, (type, type(None))) for a in targs):
116                raise TypeError(
117                    f'expected only types for "return" annotation;'
118                    f' got {targs}.'
119                )
120            responsetypes = targs
121        else:
122            if not isinstance(ret, (type, type(None))):
123                raise TypeError(
124                    f'expected one or more types for'
125                    f' "return" annotation; got a {type(ret)}.'
126                )
127            # This seems like maybe a mypy bug. Appeared after adding
128            # types.UnionType above.
129            responsetypes = (ret,)
130
131        # This will contain NoneType for empty return cases, but
132        # we expect it to be None.
133        # noinspection PyPep8
134        responsetypes = tuple(
135            None if r is type(None) else r for r in responsetypes
136        )
137
138        # Make sure our protocol has this message type registered and our
139        # return types exactly match. (Technically we could return a subset
140        # of the supported types; can allow this in the future if it makes
141        # sense).
142        registered_types = self.protocol.message_ids_by_type.keys()
143
144        if msgtype not in registered_types:
145            raise TypeError(
146                f'Message type {msgtype} is not registered'
147                f' in this Protocol.'
148            )
149
150        if msgtype in self._handlers:
151            raise TypeError(
152                f'Message type {msgtype} already has a registered handler.'
153            )
154
155        # Make sure the responses exactly matches what the message expects.
156        if set(responsetypes) != set(msgtype.get_response_types()):
157            raise TypeError(
158                f'Provided response types {responsetypes} do not'
159                f' match the set expected by message type {msgtype}: '
160                f'({msgtype.get_response_types()})'
161            )
162
163        # Ok; we're good!
164        self._handlers[msgtype] = call
165
166    def decode_filter_method(
167        self, call: Callable[[Any, dict, Message], None]
168    ) -> Callable[[Any, dict, Message], None]:
169        """Function decorator for defining a decode filter.
170
171        Decode filters can be used to extract extra data from incoming
172        message dicts. This version will work for both handle_raw_message()
173        and handle_raw_message_async()
174        """
175        assert self._decode_filter_call is None
176        self._decode_filter_call = call
177        return call
178
179    def encode_filter_method(
180        self,
181        call: Callable[
182            [Any, Message | None, Response | SysResponse, dict], None
183        ],
184    ) -> Callable[[Any, Message | None, Response, dict], None]:
185        """Function decorator for defining an encode filter.
186
187        Encode filters can be used to add extra data to the message
188        dict before is is encoded to a string and sent out.
189        """
190        assert self._encode_filter_call is None
191        self._encode_filter_call = call
192        return call
193
194    def validate(self, log_only: bool = False) -> None:
195        """Check for handler completeness, valid types, etc."""
196        for msgtype in self.protocol.message_ids_by_type.keys():
197            if issubclass(msgtype, Response):
198                continue
199            if msgtype not in self._handlers:
200                msg = (
201                    f'Protocol message type {msgtype} is not handled'
202                    f' by receiver type {type(self)}.'
203                )
204                if log_only:
205                    logging.error(msg)
206                else:
207                    raise TypeError(msg)
208
209    def _decode_incoming_message_base(
210        self, bound_obj: Any, msg: str
211    ) -> tuple[Any, dict, Message]:
212        # Decode the incoming message.
213        msg_dict = self.protocol.decode_dict(msg)
214        msg_decoded = self.protocol.message_from_dict(msg_dict)
215        assert isinstance(msg_decoded, Message)
216        if self._decode_filter_call is not None:
217            self._decode_filter_call(bound_obj, msg_dict, msg_decoded)
218        return bound_obj, msg_dict, msg_decoded
219
220    def _decode_incoming_message(self, bound_obj: Any, msg: str) -> Message:
221        bound_obj, _msg_dict, msg_decoded = self._decode_incoming_message_base(
222            bound_obj=bound_obj, msg=msg
223        )
224        return msg_decoded
225
226    def encode_user_response(
227        self, bound_obj: Any, message: Message, response: Response | None
228    ) -> str:
229        """Encode a response provided by the user for sending."""
230
231        assert isinstance(response, Response | None)
232        # (user should never explicitly return error-responses)
233        assert (
234            response is None or type(response) in message.get_response_types()
235        )
236
237        # A return value of None equals EmptySysResponse.
238        out_response: Response | SysResponse
239        if response is None:
240            out_response = EmptySysResponse()
241        else:
242            out_response = response
243
244        response_dict = self.protocol.response_to_dict(out_response)
245        if self._encode_filter_call is not None:
246            self._encode_filter_call(
247                bound_obj, message, out_response, response_dict
248            )
249        return self.protocol.encode_dict(response_dict)
250
251    def encode_error_response(
252        self, bound_obj: Any, message: Message | None, exc: Exception
253    ) -> tuple[str, bool]:
254        """Given an error, return sysresponse str and whether to log."""
255        response, dolog = self.protocol.error_to_response(exc)
256        response_dict = self.protocol.response_to_dict(response)
257        if self._encode_filter_call is not None:
258            self._encode_filter_call(
259                bound_obj, message, response, response_dict
260            )
261        return self.protocol.encode_dict(response_dict), dolog
262
263    def handle_raw_message(
264        self, bound_obj: Any, msg: str, raise_unregistered: bool = False
265    ) -> str:
266        """Decode, handle, and return an response for a message.
267
268        if 'raise_unregistered' is True, will raise an
269        efro.message.UnregisteredMessageIDError for messages not handled by
270        the protocol. In all other cases local errors will translate to
271        error responses returned to the sender.
272        """
273        assert not self.is_async, "can't call sync handler on async receiver"
274        msg_decoded: Message | None = None
275        try:
276            msg_decoded = self._decode_incoming_message(bound_obj, msg)
277            msgtype = type(msg_decoded)
278            handler = self._handlers.get(msgtype)
279            if handler is None:
280                raise RuntimeError(f'Got unhandled message type: {msgtype}.')
281            response = handler(bound_obj, msg_decoded)
282            assert isinstance(response, Response | None)
283            return self.encode_user_response(bound_obj, msg_decoded, response)
284
285        except Exception as exc:
286            if raise_unregistered and isinstance(
287                exc, UnregisteredMessageIDError
288            ):
289                raise
290            rstr, dolog = self.encode_error_response(
291                bound_obj, msg_decoded, exc
292            )
293            if dolog:
294                if msg_decoded is not None:
295                    msgtype = type(msg_decoded)
296                    logging.exception(
297                        'Error handling %s.%s message.',
298                        msgtype.__module__,
299                        msgtype.__qualname__,
300                    )
301                else:
302                    logging.exception(
303                        'Error handling raw efro.message'
304                        ' (likely a message format incompatibility): %s.',
305                        msg,
306                    )
307            return rstr
308
309    def handle_raw_message_async(
310        self, bound_obj: Any, msg: str, raise_unregistered: bool = False
311    ) -> Awaitable[str]:
312        """Should be called when the receiver gets a message.
313
314        The return value is the raw response to the message.
315        """
316
317        # Note: This call is synchronous so that the first part of it can
318        # happen synchronously. If the whole call were async we wouldn't be
319        # able to guarantee that messages handlers would be called in the
320        # order the messages were received.
321
322        assert self.is_async, "Can't call async handler on sync receiver."
323        msg_decoded: Message | None = None
324        try:
325            msg_decoded = self._decode_incoming_message(bound_obj, msg)
326            msgtype = type(msg_decoded)
327            handler = self._handlers.get(msgtype)
328            if handler is None:
329                raise RuntimeError(f'Got unhandled message type: {msgtype}.')
330            handler_awaitable = handler(bound_obj, msg_decoded)
331
332        except Exception as exc:
333            if raise_unregistered and isinstance(
334                exc, UnregisteredMessageIDError
335            ):
336                raise
337            return self._handle_raw_message_async_error(
338                bound_obj, msg, msg_decoded, exc
339            )
340
341        # Return an awaitable to handle the rest asynchronously.
342        return self._handle_raw_message_async(
343            bound_obj, msg, msg_decoded, handler_awaitable
344        )
345
346    async def _handle_raw_message_async_error(
347        self,
348        bound_obj: Any,
349        msg_raw: str,
350        msg_decoded: Message | None,
351        exc: Exception,
352    ) -> str:
353        rstr, dolog = self.encode_error_response(bound_obj, msg_decoded, exc)
354        if dolog:
355            if msg_decoded is not None:
356                msgtype = type(msg_decoded)
357                logging.exception(
358                    'Error handling %s.%s message.',
359                    msgtype.__module__,
360                    msgtype.__qualname__,
361                    # We need to explicitly provide the exception here,
362                    # otherwise it shows up at None. I assume related to
363                    # the fact that we're an async function.
364                    exc_info=exc,
365                )
366            else:
367                logging.exception(
368                    'Error handling raw async efro.message'
369                    ' (likely a message format incompatibility): %s.',
370                    msg_raw,
371                    # We need to explicitly provide the exception here,
372                    # otherwise it shows up at None. I assume related to
373                    # the fact that we're an async function.
374                    exc_info=exc,
375                )
376        return rstr
377
378    async def _handle_raw_message_async(
379        self,
380        bound_obj: Any,
381        msg_raw: str,
382        msg_decoded: Message,
383        handler_awaitable: Awaitable[Response | None],
384    ) -> str:
385        """Should be called when the receiver gets a message.
386
387        The return value is the raw response to the message.
388        """
389        try:
390            response = await handler_awaitable
391            assert isinstance(response, Response | None)
392            return self.encode_user_response(bound_obj, msg_decoded, response)
393
394        except Exception as exc:
395            return await self._handle_raw_message_async_error(
396                bound_obj, msg_raw, msg_decoded, exc
397            )

Facilitates receiving & responding to messages from a remote source.

This is instantiated at the class level with unbound methods registered as handlers for different message types in the protocol.

Example:

class MyClass: receiver = MyMessageReceiver()

# MyMessageReceiver fills out handler() overloads to ensure all
# registered handlers have valid types/return-types.

@receiver.handler
def handle_some_message_type(self, message: SomeMsg) -> SomeResponse:
    # Deal with this message type here.

This will trigger the registered handler being called.

obj = MyClass() obj.receiver.handle_raw_message(some_raw_data)

Any unhandled Exception occurring during message handling will result in an efro.error.RemoteError being raised on the sending end.

MessageReceiver(protocol: MessageProtocol)
57    def __init__(self, protocol: MessageProtocol) -> None:
58        self.protocol = protocol
59        self._handlers: dict[type[Message], Callable] = {}
60        self._decode_filter_call: (
61            Callable[[Any, dict, Message], None] | None
62        ) = None
63        self._encode_filter_call: (
64            Callable[[Any, Message | None, Response | SysResponse, dict], None]
65            | None
66        ) = None
is_async = False
protocol
def register_handler( self, call: Callable[[Any, Message], Response | None]) -> None:
 69    def register_handler(
 70        self, call: Callable[[Any, Message], Response | None]
 71    ) -> None:
 72        """Register a handler call.
 73
 74        The message type handled by the call is determined by its
 75        type annotation.
 76        """
 77        # TODO: can use types.GenericAlias in 3.9.
 78        # (hmm though now that we're there,  it seems a drop-in
 79        # replace gives us errors. Should re-test in 3.11 as it seems
 80        # that typing_extensions handles it differently in that case)
 81        from typing import _GenericAlias  # type: ignore
 82        from typing import get_type_hints, get_args
 83
 84        sig = inspect.getfullargspec(call)
 85
 86        # The provided callable should be a method taking one 'msg' arg.
 87        expectedsig = ['self', 'msg']
 88        if sig.args != expectedsig:
 89            raise ValueError(
 90                f'Expected callable signature of {expectedsig};'
 91                f' got {sig.args}'
 92            )
 93
 94        # Check annotation types to determine what message types we handle.
 95        # Return-type annotation can be a Union, but we probably don't
 96        # have it available at runtime. Explicitly pull it in.
 97        # UPDATE: we've updated our pylint filter to where we should
 98        # have all annotations available.
 99        # anns = get_type_hints(call, localns={'Union': Union})
100        anns = get_type_hints(call)
101
102        msgtype = anns.get('msg')
103        if not isinstance(msgtype, type):
104            raise TypeError(
105                f'expected a type for "msg" annotation; got {type(msgtype)}.'
106            )
107        assert issubclass(msgtype, Message)
108
109        ret = anns.get('return')
110        responsetypes: tuple[type[Any] | None, ...]
111
112        # Return types can be a single type or a union of types.
113        if isinstance(ret, (_GenericAlias, types.UnionType)):
114            targs = get_args(ret)
115            if not all(isinstance(a, (type, type(None))) for a in targs):
116                raise TypeError(
117                    f'expected only types for "return" annotation;'
118                    f' got {targs}.'
119                )
120            responsetypes = targs
121        else:
122            if not isinstance(ret, (type, type(None))):
123                raise TypeError(
124                    f'expected one or more types for'
125                    f' "return" annotation; got a {type(ret)}.'
126                )
127            # This seems like maybe a mypy bug. Appeared after adding
128            # types.UnionType above.
129            responsetypes = (ret,)
130
131        # This will contain NoneType for empty return cases, but
132        # we expect it to be None.
133        # noinspection PyPep8
134        responsetypes = tuple(
135            None if r is type(None) else r for r in responsetypes
136        )
137
138        # Make sure our protocol has this message type registered and our
139        # return types exactly match. (Technically we could return a subset
140        # of the supported types; can allow this in the future if it makes
141        # sense).
142        registered_types = self.protocol.message_ids_by_type.keys()
143
144        if msgtype not in registered_types:
145            raise TypeError(
146                f'Message type {msgtype} is not registered'
147                f' in this Protocol.'
148            )
149
150        if msgtype in self._handlers:
151            raise TypeError(
152                f'Message type {msgtype} already has a registered handler.'
153            )
154
155        # Make sure the responses exactly matches what the message expects.
156        if set(responsetypes) != set(msgtype.get_response_types()):
157            raise TypeError(
158                f'Provided response types {responsetypes} do not'
159                f' match the set expected by message type {msgtype}: '
160                f'({msgtype.get_response_types()})'
161            )
162
163        # Ok; we're good!
164        self._handlers[msgtype] = call

Register a handler call.

The message type handled by the call is determined by its type annotation.

def decode_filter_method( self, call: Callable[[Any, dict, Message], NoneType]) -> Callable[[Any, dict, Message], NoneType]:
166    def decode_filter_method(
167        self, call: Callable[[Any, dict, Message], None]
168    ) -> Callable[[Any, dict, Message], None]:
169        """Function decorator for defining a decode filter.
170
171        Decode filters can be used to extract extra data from incoming
172        message dicts. This version will work for both handle_raw_message()
173        and handle_raw_message_async()
174        """
175        assert self._decode_filter_call is None
176        self._decode_filter_call = call
177        return call

Function decorator for defining a decode filter.

Decode filters can be used to extract extra data from incoming message dicts. This version will work for both handle_raw_message() and handle_raw_message_async()

def encode_filter_method( self, call: Callable[[Any, Message | None, Response | SysResponse, dict], NoneType]) -> Callable[[Any, Message | None, Response, dict], NoneType]:
179    def encode_filter_method(
180        self,
181        call: Callable[
182            [Any, Message | None, Response | SysResponse, dict], None
183        ],
184    ) -> Callable[[Any, Message | None, Response, dict], None]:
185        """Function decorator for defining an encode filter.
186
187        Encode filters can be used to add extra data to the message
188        dict before is is encoded to a string and sent out.
189        """
190        assert self._encode_filter_call is None
191        self._encode_filter_call = call
192        return call

Function decorator for defining an encode filter.

Encode filters can be used to add extra data to the message dict before is is encoded to a string and sent out.

def validate(self, log_only: bool = False) -> None:
194    def validate(self, log_only: bool = False) -> None:
195        """Check for handler completeness, valid types, etc."""
196        for msgtype in self.protocol.message_ids_by_type.keys():
197            if issubclass(msgtype, Response):
198                continue
199            if msgtype not in self._handlers:
200                msg = (
201                    f'Protocol message type {msgtype} is not handled'
202                    f' by receiver type {type(self)}.'
203                )
204                if log_only:
205                    logging.error(msg)
206                else:
207                    raise TypeError(msg)

Check for handler completeness, valid types, etc.

def encode_user_response( self, bound_obj: Any, message: Message, response: Response | None) -> str:
226    def encode_user_response(
227        self, bound_obj: Any, message: Message, response: Response | None
228    ) -> str:
229        """Encode a response provided by the user for sending."""
230
231        assert isinstance(response, Response | None)
232        # (user should never explicitly return error-responses)
233        assert (
234            response is None or type(response) in message.get_response_types()
235        )
236
237        # A return value of None equals EmptySysResponse.
238        out_response: Response | SysResponse
239        if response is None:
240            out_response = EmptySysResponse()
241        else:
242            out_response = response
243
244        response_dict = self.protocol.response_to_dict(out_response)
245        if self._encode_filter_call is not None:
246            self._encode_filter_call(
247                bound_obj, message, out_response, response_dict
248            )
249        return self.protocol.encode_dict(response_dict)

Encode a response provided by the user for sending.

def encode_error_response( self, bound_obj: Any, message: Message | None, exc: Exception) -> tuple[str, bool]:
251    def encode_error_response(
252        self, bound_obj: Any, message: Message | None, exc: Exception
253    ) -> tuple[str, bool]:
254        """Given an error, return sysresponse str and whether to log."""
255        response, dolog = self.protocol.error_to_response(exc)
256        response_dict = self.protocol.response_to_dict(response)
257        if self._encode_filter_call is not None:
258            self._encode_filter_call(
259                bound_obj, message, response, response_dict
260            )
261        return self.protocol.encode_dict(response_dict), dolog

Given an error, return sysresponse str and whether to log.

def handle_raw_message(self, bound_obj: Any, msg: str, raise_unregistered: bool = False) -> str:
263    def handle_raw_message(
264        self, bound_obj: Any, msg: str, raise_unregistered: bool = False
265    ) -> str:
266        """Decode, handle, and return an response for a message.
267
268        if 'raise_unregistered' is True, will raise an
269        efro.message.UnregisteredMessageIDError for messages not handled by
270        the protocol. In all other cases local errors will translate to
271        error responses returned to the sender.
272        """
273        assert not self.is_async, "can't call sync handler on async receiver"
274        msg_decoded: Message | None = None
275        try:
276            msg_decoded = self._decode_incoming_message(bound_obj, msg)
277            msgtype = type(msg_decoded)
278            handler = self._handlers.get(msgtype)
279            if handler is None:
280                raise RuntimeError(f'Got unhandled message type: {msgtype}.')
281            response = handler(bound_obj, msg_decoded)
282            assert isinstance(response, Response | None)
283            return self.encode_user_response(bound_obj, msg_decoded, response)
284
285        except Exception as exc:
286            if raise_unregistered and isinstance(
287                exc, UnregisteredMessageIDError
288            ):
289                raise
290            rstr, dolog = self.encode_error_response(
291                bound_obj, msg_decoded, exc
292            )
293            if dolog:
294                if msg_decoded is not None:
295                    msgtype = type(msg_decoded)
296                    logging.exception(
297                        'Error handling %s.%s message.',
298                        msgtype.__module__,
299                        msgtype.__qualname__,
300                    )
301                else:
302                    logging.exception(
303                        'Error handling raw efro.message'
304                        ' (likely a message format incompatibility): %s.',
305                        msg,
306                    )
307            return rstr

Decode, handle, and return an response for a message.

if 'raise_unregistered' is True, will raise an efro.message.UnregisteredMessageIDError for messages not handled by the protocol. In all other cases local errors will translate to error responses returned to the sender.

def handle_raw_message_async( self, bound_obj: Any, msg: str, raise_unregistered: bool = False) -> Awaitable[str]:
309    def handle_raw_message_async(
310        self, bound_obj: Any, msg: str, raise_unregistered: bool = False
311    ) -> Awaitable[str]:
312        """Should be called when the receiver gets a message.
313
314        The return value is the raw response to the message.
315        """
316
317        # Note: This call is synchronous so that the first part of it can
318        # happen synchronously. If the whole call were async we wouldn't be
319        # able to guarantee that messages handlers would be called in the
320        # order the messages were received.
321
322        assert self.is_async, "Can't call async handler on sync receiver."
323        msg_decoded: Message | None = None
324        try:
325            msg_decoded = self._decode_incoming_message(bound_obj, msg)
326            msgtype = type(msg_decoded)
327            handler = self._handlers.get(msgtype)
328            if handler is None:
329                raise RuntimeError(f'Got unhandled message type: {msgtype}.')
330            handler_awaitable = handler(bound_obj, msg_decoded)
331
332        except Exception as exc:
333            if raise_unregistered and isinstance(
334                exc, UnregisteredMessageIDError
335            ):
336                raise
337            return self._handle_raw_message_async_error(
338                bound_obj, msg, msg_decoded, exc
339            )
340
341        # Return an awaitable to handle the rest asynchronously.
342        return self._handle_raw_message_async(
343            bound_obj, msg, msg_decoded, handler_awaitable
344        )

Should be called when the receiver gets a message.

The return value is the raw response to the message.

class BoundMessageReceiver:
400class BoundMessageReceiver:
401    """Base bound receiver class."""
402
403    def __init__(
404        self,
405        obj: Any,
406        receiver: MessageReceiver,
407    ) -> None:
408        assert obj is not None
409        self._obj = obj
410        self._receiver = receiver
411
412    @property
413    def protocol(self) -> MessageProtocol:
414        """Protocol associated with this receiver."""
415        return self._receiver.protocol
416
417    def encode_error_response(self, exc: Exception) -> str:
418        """Given an error, return a response ready to send.
419
420        This should be used for any errors that happen outside of
421        standard handle_raw_message calls. Any errors within those
422        calls will be automatically returned as encoded strings.
423        """
424        # Passing None for Message here; we would only have that available
425        # for things going wrong in the handler (which this is not for).
426        return self._receiver.encode_error_response(self._obj, None, exc)[0]

Base bound receiver class.

BoundMessageReceiver(obj: Any, receiver: MessageReceiver)
403    def __init__(
404        self,
405        obj: Any,
406        receiver: MessageReceiver,
407    ) -> None:
408        assert obj is not None
409        self._obj = obj
410        self._receiver = receiver
protocol: MessageProtocol
412    @property
413    def protocol(self) -> MessageProtocol:
414        """Protocol associated with this receiver."""
415        return self._receiver.protocol

Protocol associated with this receiver.

def encode_error_response(self, exc: Exception) -> str:
417    def encode_error_response(self, exc: Exception) -> str:
418        """Given an error, return a response ready to send.
419
420        This should be used for any errors that happen outside of
421        standard handle_raw_message calls. Any errors within those
422        calls will be automatically returned as encoded strings.
423        """
424        # Passing None for Message here; we would only have that available
425        # for things going wrong in the handler (which this is not for).
426        return self._receiver.encode_error_response(self._obj, None, exc)[0]

Given an error, return a response ready to send.

This should be used for any errors that happen outside of standard handle_raw_message calls. Any errors within those calls will be automatically returned as encoded strings.

def create_sender_module( basename: str, protocol_create_code: str, enable_sync_sends: bool, enable_async_sends: bool, *, private: bool = False, protocol_module_level_import_code: str | None = None, build_time_protocol_create_code: str | None = None) -> str:
18def create_sender_module(
19    basename: str,
20    protocol_create_code: str,
21    enable_sync_sends: bool,
22    enable_async_sends: bool,
23    *,
24    private: bool = False,
25    protocol_module_level_import_code: str | None = None,
26    build_time_protocol_create_code: str | None = None,
27) -> str:
28    """Create a Python module defining a MessageSender subclass.
29
30    This class is primarily for type checking and will contain overrides
31    for the varieties of send calls for message/response types defined
32    in the protocol.
33
34    Code passed for 'protocol_create_code' should import necessary
35    modules and assign an instance of the Protocol to a 'protocol'
36    variable.
37
38    Class names are based on basename; a basename 'FooSender' will
39    result in classes FooSender and BoundFooSender.
40
41    If 'private' is True, class-names will be prefixed with an '_'.
42
43    Note: output code may have long lines and should generally be run
44    through a formatter. We should perhaps move this functionality to
45    efrotools so we can include that functionality inline.
46    """
47    protocol = _protocol_from_code(
48        build_time_protocol_create_code
49        if build_time_protocol_create_code is not None
50        else protocol_create_code
51    )
52    return protocol.do_create_sender_module(
53        basename=basename,
54        protocol_create_code=protocol_create_code,
55        enable_sync_sends=enable_sync_sends,
56        enable_async_sends=enable_async_sends,
57        private=private,
58        protocol_module_level_import_code=protocol_module_level_import_code,
59    )

Create a Python module defining a MessageSender subclass.

This class is primarily for type checking and will contain overrides for the varieties of send calls for message/response types defined in the protocol.

Code passed for 'protocol_create_code' should import necessary modules and assign an instance of the Protocol to a 'protocol' variable.

Class names are based on basename; a basename 'FooSender' will result in classes FooSender and BoundFooSender.

If 'private' is True, class-names will be prefixed with an '_'.

Note: output code may have long lines and should generally be run through a formatter. We should perhaps move this functionality to efrotools so we can include that functionality inline.

def create_receiver_module( basename: str, protocol_create_code: str, is_async: bool, *, private: bool = False, protocol_module_level_import_code: str | None = None, build_time_protocol_create_code: str | None = None) -> str:
 62def create_receiver_module(
 63    basename: str,
 64    protocol_create_code: str,
 65    is_async: bool,
 66    *,
 67    private: bool = False,
 68    protocol_module_level_import_code: str | None = None,
 69    build_time_protocol_create_code: str | None = None,
 70) -> str:
 71    """ "Create a Python module defining a MessageReceiver subclass.
 72
 73    This class is primarily for type checking and will contain overrides
 74    for the register method for message/response types defined in
 75    the protocol.
 76
 77    Class names are based on basename; a basename 'FooReceiver' will
 78    result in FooReceiver and BoundFooReceiver.
 79
 80    If 'is_async' is True, handle_raw_message() will be an async method
 81    and the @handler decorator will expect async methods.
 82
 83    If 'private' is True, class-names will be prefixed with an '_'.
 84
 85    Note that line lengths are not clipped, so output may need to be
 86    run through a formatter to prevent lint warnings about excessive
 87    line lengths.
 88    """
 89    protocol = _protocol_from_code(
 90        build_time_protocol_create_code
 91        if build_time_protocol_create_code is not None
 92        else protocol_create_code
 93    )
 94    return protocol.do_create_receiver_module(
 95        basename=basename,
 96        protocol_create_code=protocol_create_code,
 97        is_async=is_async,
 98        private=private,
 99        protocol_module_level_import_code=protocol_module_level_import_code,
100    )

"Create a Python module defining a MessageReceiver subclass.

This class is primarily for type checking and will contain overrides for the register method for message/response types defined in the protocol.

Class names are based on basename; a basename 'FooReceiver' will result in FooReceiver and BoundFooReceiver.

If 'is_async' is True, handle_raw_message() will be an async method and the @handler decorator will expect async methods.

If 'private' is True, class-names will be prefixed with an '_'.

Note that line lengths are not clipped, so output may need to be run through a formatter to prevent lint warnings about excessive line lengths.

class UnregisteredMessageIDError(builtins.Exception):
20class UnregisteredMessageIDError(Exception):
21    """A message or response id is not covered by our protocol."""

A message or response id is not covered by our protocol.