efro.message

Functionality for sending and responding to messages. Supports static typing for message types and possible return types.

 1# Released under the MIT License. See LICENSE for details.
 2#
 3"""Functionality for sending and responding to messages.
 4Supports static typing for message types and possible return types.
 5"""
 6
 7from __future__ import annotations
 8
 9from efro.util import set_canonical_module_names
10from efro.message._protocol import MessageProtocol
11from efro.message._sender import MessageSender, BoundMessageSender
12from efro.message._receiver import MessageReceiver, BoundMessageReceiver
13from efro.message._module import create_sender_module, create_receiver_module
14from efro.message._message import (
15    Message,
16    Response,
17    SysResponse,
18    EmptySysResponse,
19    ErrorSysResponse,
20    StringResponse,
21    BoolResponse,
22    UnregisteredMessageIDError,
23)
24
25__all__ = [
26    'Message',
27    'Response',
28    'SysResponse',
29    'EmptySysResponse',
30    'ErrorSysResponse',
31    'StringResponse',
32    'BoolResponse',
33    'MessageProtocol',
34    'MessageSender',
35    'BoundMessageSender',
36    'MessageReceiver',
37    'BoundMessageReceiver',
38    'create_sender_module',
39    'create_receiver_module',
40    'UnregisteredMessageIDError',
41]
42
43# Have these things present themselves cleanly as 'thismodule.SomeClass'
44# instead of 'thismodule._internalmodule.SomeClass'
45set_canonical_module_names(globals())
class Message:
24class Message:
25    """Base class for messages."""
26
27    @classmethod
28    def get_response_types(cls) -> list[type[Response] | None]:
29        """Return all Response types this Message can return when sent.
30
31        The default implementation specifies a None return type.
32        """
33        return [None]

Base class for messages.

@classmethod
def get_response_types(cls) -> list[type[Response] | None]:
27    @classmethod
28    def get_response_types(cls) -> list[type[Response] | None]:
29        """Return all Response types this Message can return when sent.
30
31        The default implementation specifies a None return type.
32        """
33        return [None]

Return all Response types this Message can return when sent.

The default implementation specifies a None return type.

class Response:
36class Response:
37    """Base class for responses to messages."""

Base class for responses to messages.

class SysResponse:
40class SysResponse:
41    """Base class for system-responses to messages.
42
43    These are only sent/handled by the messaging system itself;
44    users of the api never see them.
45    """
46
47    def set_local_exception(self, exc: Exception) -> None:
48        """Attach a local exception to facilitate better logging/handling.
49
50        Be aware that this data does not get serialized and only
51        exists on the local object.
52        """
53        setattr(self, '_sr_local_exception', exc)
54
55    def get_local_exception(self) -> Exception | None:
56        """Fetch a local attached exception."""
57        value = getattr(self, '_sr_local_exception', None)
58        assert isinstance(value, Exception | None)
59        return value

Base class for system-responses to messages.

These are only sent/handled by the messaging system itself; users of the api never see them.

def set_local_exception(self, exc: Exception) -> None:
47    def set_local_exception(self, exc: Exception) -> None:
48        """Attach a local exception to facilitate better logging/handling.
49
50        Be aware that this data does not get serialized and only
51        exists on the local object.
52        """
53        setattr(self, '_sr_local_exception', exc)

Attach a local exception to facilitate better logging/handling.

Be aware that this data does not get serialized and only exists on the local object.

def get_local_exception(self) -> Exception | None:
55    def get_local_exception(self) -> Exception | None:
56        """Fetch a local attached exception."""
57        value = getattr(self, '_sr_local_exception', None)
58        assert isinstance(value, Exception | None)
59        return value

Fetch a local attached exception.

@ioprepped
@dataclass
class EmptySysResponse(efro.message.SysResponse):
86@ioprepped
87@dataclass
88class EmptySysResponse(SysResponse):
89    """The response equivalent of None."""

The response equivalent of None.

@ioprepped
@dataclass
class ErrorSysResponse(efro.message.SysResponse):
65@ioprepped
66@dataclass
67class ErrorSysResponse(SysResponse):
68    """SysResponse saying some error has occurred for the send.
69
70    This generally results in an Exception being raised for the caller.
71    """
72
73    class ErrorType(Enum):
74        """Type of error that occurred while sending a message."""
75
76        REMOTE = 0
77        REMOTE_CLEAN = 1
78        LOCAL = 2
79        COMMUNICATION = 3
80        REMOTE_COMMUNICATION = 4
81
82    error_message: Annotated[str, IOAttrs('m')]
83    error_type: Annotated[ErrorType, IOAttrs('e')] = ErrorType.REMOTE

SysResponse saying some error has occurred for the send.

This generally results in an Exception being raised for the caller.

ErrorSysResponse( error_message: Annotated[str, <efro.dataclassio._base.IOAttrs object>], error_type: Annotated[ErrorSysResponse.ErrorType, <efro.dataclassio._base.IOAttrs object>] = <ErrorType.REMOTE: 0>)
error_message: Annotated[str, <efro.dataclassio._base.IOAttrs object at 0x11788cce0>]
error_type: Annotated[ErrorSysResponse.ErrorType, <efro.dataclassio._base.IOAttrs object at 0x11788cfe0>] = <ErrorType.REMOTE: 0>
class ErrorSysResponse.ErrorType(enum.Enum):
73    class ErrorType(Enum):
74        """Type of error that occurred while sending a message."""
75
76        REMOTE = 0
77        REMOTE_CLEAN = 1
78        LOCAL = 2
79        COMMUNICATION = 3
80        REMOTE_COMMUNICATION = 4

Type of error that occurred while sending a message.

REMOTE = <ErrorType.REMOTE: 0>
REMOTE_CLEAN = <ErrorType.REMOTE_CLEAN: 1>
LOCAL = <ErrorType.LOCAL: 2>
COMMUNICATION = <ErrorType.COMMUNICATION: 3>
REMOTE_COMMUNICATION = <ErrorType.REMOTE_COMMUNICATION: 4>
Inherited Members
enum.Enum
name
value
@ioprepped
@dataclass
class StringResponse(efro.message.Response):
104@ioprepped
105@dataclass
106class StringResponse(Response):
107    """A simple string value response."""
108
109    value: Annotated[str, IOAttrs('v')]

A simple string value response.

StringResponse(value: Annotated[str, <efro.dataclassio._base.IOAttrs object>])
value: Annotated[str, <efro.dataclassio._base.IOAttrs object at 0x11788ec60>]
@ioprepped
@dataclass
class BoolResponse(efro.message.Response):
 96@ioprepped
 97@dataclass
 98class BoolResponse(Response):
 99    """A simple bool value response."""
100
101    value: Annotated[bool, IOAttrs('v')]

A simple bool value response.

BoolResponse(value: Annotated[bool, <efro.dataclassio._base.IOAttrs object>])
value: Annotated[bool, <efro.dataclassio._base.IOAttrs object at 0x11788f860>]
class MessageProtocol:
 33class MessageProtocol:
 34    """Wrangles a set of message types, formats, and response types.
 35    Both endpoints must be using a compatible Protocol for communication
 36    to succeed. To maintain Protocol compatibility between revisions,
 37    all message types must retain the same id, message attr storage
 38    names must not change, newly added attrs must have default values,
 39    etc.
 40    """
 41
 42    def __init__(
 43        self,
 44        message_types: dict[int, type[Message]],
 45        response_types: dict[int, type[Response]],
 46        forward_communication_errors: bool = False,
 47        forward_clean_errors: bool = False,
 48        remote_errors_include_stack_traces: bool = False,
 49        log_errors_on_receiver: bool = True,
 50    ) -> None:
 51        """Create a protocol with a given configuration.
 52
 53        If 'forward_communication_errors' is True,
 54        efro.error.CommunicationErrors raised on the receiver end will
 55        result in a matching error raised back on the sender. This can
 56        be useful if the receiver will be in some way forwarding
 57        messages along and the sender doesn't need to know where
 58        communication breakdowns occurred; only that they did.
 59
 60        If 'forward_clean_errors' is True, efro.error.CleanError
 61        exceptions raised on the receiver end will result in a matching
 62        CleanError raised back on the sender.
 63
 64        When an exception is not covered by the optional forwarding
 65        mechanisms above, it will come across as efro.error.RemoteError
 66        and the exception will be logged on the receiver end - at least
 67        by default (see details below).
 68
 69        If 'remote_errors_include_stack_traces' is True, stringified
 70        stack traces will be returned with efro.error.RemoteError
 71        exceptions. This is useful for debugging but should only be
 72        enabled in cases where the sender is trusted to see internal
 73        details of the receiver.
 74
 75        By default, when a message-handling exception will result in an
 76        efro.error.RemoteError being returned to the sender, the
 77        exception will be logged on the receiver. This is because the
 78        goal is usually to avoid returning opaque RemoteErrors and to
 79        instead return something meaningful as part of the expected
 80        response type (even if that value itself represents a logical
 81        error state). If 'log_errors_on_receiver' is False, however, such
 82        exceptions will *not* be logged on the receiver. This can be
 83        useful in combination with 'remote_errors_include_stack_traces'
 84        and 'forward_clean_errors' in situations where all error
 85        logging/management will be happening on the sender end. Be
 86        aware, however, that in that case it may be possible for
 87        communication errors to prevent such error messages from
 88        ever being seen.
 89        """
 90        # pylint: disable=too-many-locals
 91        self.message_types_by_id: dict[int, type[Message]] = {}
 92        self.message_ids_by_type: dict[type[Message], int] = {}
 93        self.response_types_by_id: dict[
 94            int, type[Response] | type[SysResponse]
 95        ] = {}
 96        self.response_ids_by_type: dict[
 97            type[Response] | type[SysResponse], int
 98        ] = {}
 99        for m_id, m_type in message_types.items():
100            # Make sure only valid message types were passed and each
101            # id was assigned only once.
102            assert isinstance(m_id, int)
103            assert m_id >= 0
104            assert is_ioprepped_dataclass(m_type) and issubclass(
105                m_type, Message
106            )
107            assert self.message_types_by_id.get(m_id) is None
108            self.message_types_by_id[m_id] = m_type
109            self.message_ids_by_type[m_type] = m_id
110
111        for r_id, r_type in response_types.items():
112            assert isinstance(r_id, int)
113            assert r_id >= 0
114            assert is_ioprepped_dataclass(r_type) and issubclass(
115                r_type, Response
116            )
117            assert self.response_types_by_id.get(r_id) is None
118            self.response_types_by_id[r_id] = r_type
119            self.response_ids_by_type[r_type] = r_id
120
121        # Register our SysResponse types. These use negative
122        # IDs so as to never overlap with user Response types.
123        def _reg_sys(reg_tp: type[SysResponse], reg_id: int) -> None:
124            assert self.response_types_by_id.get(reg_id) is None
125            self.response_types_by_id[reg_id] = reg_tp
126            self.response_ids_by_type[reg_tp] = reg_id
127
128        _reg_sys(ErrorSysResponse, -1)
129        _reg_sys(EmptySysResponse, -2)
130
131        # Some extra-thorough validation in debug mode.
132        if __debug__:
133            # Make sure all Message types' return types are valid
134            # and have been assigned an ID as well.
135            all_response_types: set[type[Response] | None] = set()
136            for m_id, m_type in message_types.items():
137                m_rtypes = m_type.get_response_types()
138
139                assert isinstance(m_rtypes, list)
140                assert (
141                    m_rtypes
142                ), f'Message type {m_type} specifies no return types.'
143                assert len(set(m_rtypes)) == len(m_rtypes)  # check dups
144                for m_rtype in m_rtypes:
145                    all_response_types.add(m_rtype)
146            for cls in all_response_types:
147                if cls is None:
148                    continue
149                assert is_ioprepped_dataclass(cls)
150                assert issubclass(cls, Response)
151                if cls not in self.response_ids_by_type:
152                    raise ValueError(
153                        f'Possible response type {cls} needs to be included'
154                        f' in response_types for this protocol.'
155                    )
156
157            # Make sure all registered types have unique base names.
158            # We can take advantage of this to generate cleaner looking
159            # protocol modules. Can revisit if this is ever a problem.
160            mtypenames = set(tp.__name__ for tp in self.message_ids_by_type)
161            if len(mtypenames) != len(message_types):
162                raise ValueError(
163                    'message_types contains duplicate __name__s;'
164                    ' all types are required to have unique names.'
165                )
166
167        self.forward_clean_errors = forward_clean_errors
168        self.forward_communication_errors = forward_communication_errors
169        self.remote_errors_include_stack_traces = (
170            remote_errors_include_stack_traces
171        )
172        self.log_errors_on_receiver = log_errors_on_receiver
173
174    @staticmethod
175    def encode_dict(obj: dict) -> str:
176        """Json-encode a provided dict."""
177        return json.dumps(obj, separators=(',', ':'))
178
179    def message_to_dict(self, message: Message) -> dict:
180        """Encode a message to a json ready dict."""
181        return self._to_dict(message, self.message_ids_by_type, 'message')
182
183    def response_to_dict(self, response: Response | SysResponse) -> dict:
184        """Encode a response to a json ready dict."""
185        return self._to_dict(response, self.response_ids_by_type, 'response')
186
187    def error_to_response(self, exc: Exception) -> tuple[SysResponse, bool]:
188        """Translate an Exception to a SysResponse.
189
190        Also returns whether the error should be logged if this happened
191        within handle_raw_message().
192        """
193
194        # If anything goes wrong, return a ErrorSysResponse instead.
195        # (either CLEAN or generic REMOTE)
196        if self.forward_clean_errors and isinstance(exc, CleanError):
197            return (
198                ErrorSysResponse(
199                    error_message=str(exc),
200                    error_type=ErrorSysResponse.ErrorType.REMOTE_CLEAN,
201                ),
202                False,
203            )
204        if self.forward_communication_errors and isinstance(
205            exc, CommunicationError
206        ):
207            return (
208                ErrorSysResponse(
209                    error_message=str(exc),
210                    error_type=ErrorSysResponse.ErrorType.REMOTE_COMMUNICATION,
211                ),
212                False,
213            )
214        return (
215            ErrorSysResponse(
216                error_message=(
217                    # Note: need to format exception ourself here; it
218                    # might not be current so we can't use
219                    # traceback.format_exc().
220                    ''.join(
221                        traceback.format_exception(
222                            type(exc), exc, exc.__traceback__
223                        )
224                    )
225                    if self.remote_errors_include_stack_traces
226                    else 'An internal error has occurred.'
227                ),
228                error_type=ErrorSysResponse.ErrorType.REMOTE,
229            ),
230            self.log_errors_on_receiver,
231        )
232
233    def _to_dict(
234        self, message: Any, ids_by_type: dict[type, int], opname: str
235    ) -> dict:
236        """Encode a message to a json string for transport."""
237
238        m_id: int | None = ids_by_type.get(type(message))
239        if m_id is None:
240            raise TypeError(
241                f'{opname} type is not registered in protocol:'
242                f' {type(message)}'
243            )
244        out = {'t': m_id, 'm': dataclass_to_dict(message)}
245        return out
246
247    @staticmethod
248    def decode_dict(data: str) -> dict:
249        """Decode data to a dict."""
250        out = json.loads(data)
251        assert isinstance(out, dict)
252        return out
253
254    def message_from_dict(self, data: dict) -> Message:
255        """Decode a message from a json string."""
256        out = self._from_dict(data, self.message_types_by_id, 'message')
257        assert isinstance(out, Message)
258        return out
259
260    def response_from_dict(self, data: dict) -> Response | SysResponse:
261        """Decode a response from a json string."""
262        out = self._from_dict(data, self.response_types_by_id, 'response')
263        assert isinstance(out, Response | SysResponse)
264        return out
265
266    # Weeeird; we get mypy errors returning dict[int, type] but
267    # dict[int, typing.Type] or dict[int, type[Any]] works..
268    def _from_dict(
269        self, data: dict, types_by_id: dict[int, type[Any]], opname: str
270    ) -> Any:
271        """Decode a message from a json string."""
272        msgdict: dict | None
273
274        m_id = data.get('t')
275        # Allow omitting 'm' dict if its empty.
276        msgdict = data.get('m', {})
277
278        assert isinstance(m_id, int)
279        assert isinstance(msgdict, dict)
280
281        # Decode this particular type.
282        msgtype = types_by_id.get(m_id)
283        if msgtype is None:
284            raise UnregisteredMessageIDError(
285                f'Got unregistered {opname} id of {m_id}.'
286            )
287        return dataclass_from_dict(msgtype, msgdict)
288
289    def _get_module_header(
290        self,
291        part: Literal['sender', 'receiver'],
292        extra_import_code: str | None,
293        enable_async_sends: bool,
294    ) -> str:
295        """Return common parts of generated modules."""
296        # pylint: disable=too-many-locals
297        # pylint: disable=too-many-branches
298        # pylint: disable=too-many-statements
299        import textwrap
300
301        tpimports: dict[str, list[str]] = {}
302        imports: dict[str, list[str]] = {}
303
304        single_message_type = len(self.message_ids_by_type) == 1
305
306        msgtypes = list(self.message_ids_by_type)
307        if part == 'sender':
308            msgtypes.append(Message)
309        for msgtype in msgtypes:
310            tpimports.setdefault(msgtype.__module__, []).append(
311                msgtype.__name__
312            )
313        rsptypes = list(self.response_ids_by_type)
314        if part == 'sender':
315            rsptypes.append(Response)
316        for rsp_tp in rsptypes:
317            # Skip these as they don't actually show up in code.
318            if rsp_tp is EmptySysResponse or rsp_tp is ErrorSysResponse:
319                continue
320            if (
321                single_message_type
322                and part == 'sender'
323                and rsp_tp is not Response
324            ):
325                # We need to cast to the single supported response type
326                # in this case so need response types at runtime.
327                imports.setdefault(rsp_tp.__module__, []).append(
328                    rsp_tp.__name__
329                )
330            else:
331                tpimports.setdefault(rsp_tp.__module__, []).append(
332                    rsp_tp.__name__
333                )
334
335        import_lines = ''
336        tpimport_lines = ''
337
338        for module, names in sorted(imports.items()):
339            jnames = ', '.join(names)
340            line = f'from {module} import {jnames}'
341            if len(line) > 80:
342                # Recreate in a wrapping-friendly form.
343                line = f'from {module} import ({jnames})'
344            import_lines += f'{line}\n'
345        for module, names in sorted(tpimports.items()):
346            jnames = ', '.join(names)
347            line = f'from {module} import {jnames}'
348            if len(line) > 75:  # Account for indent
349                # Recreate in a wrapping-friendly form.
350                line = f'from {module} import ({jnames})'
351            tpimport_lines += f'{line}\n'
352
353        if part == 'sender':
354            import_lines += (
355                'from efro.message import MessageSender, BoundMessageSender\n'
356            )
357            tpimport_typing_extras = ''
358        else:
359            if single_message_type:
360                import_lines += (
361                    'from efro.message import (MessageReceiver,'
362                    ' BoundMessageReceiver, Message, Response)\n'
363                )
364            else:
365                import_lines += (
366                    'from efro.message import MessageReceiver,'
367                    ' BoundMessageReceiver\n'
368                )
369            tpimport_typing_extras = ', Awaitable'
370
371        if extra_import_code is not None:
372            import_lines += extra_import_code
373
374        ovld = ', overload' if not single_message_type else ''
375        ovld2 = (
376            ', cast, Awaitable'
377            if (single_message_type and part == 'sender' and enable_async_sends)
378            else ''
379        )
380        tpimport_lines = textwrap.indent(tpimport_lines, '    ')
381
382        baseimps = ['Any']
383        if part == 'receiver':
384            baseimps.append('Callable')
385        if part == 'sender' and enable_async_sends:
386            baseimps.append('Awaitable')
387        baseimps_s = ', '.join(baseimps)
388        out = (
389            '# Released under the MIT License. See LICENSE for details.\n'
390            f'#\n'
391            f'"""Auto-generated {part} module. Do not edit by hand."""\n'
392            f'\n'
393            f'from __future__ import annotations\n'
394            f'\n'
395            f'from typing import TYPE_CHECKING{ovld}{ovld2}\n'
396            f'\n'
397            f'{import_lines}'
398            f'\n'
399            f'if TYPE_CHECKING:\n'
400            f'    from typing import {baseimps_s}'
401            f'{tpimport_typing_extras}\n'
402            f'{tpimport_lines}'
403            f'\n'
404            f'\n'
405        )
406        return out
407
408    def do_create_sender_module(
409        self,
410        basename: str,
411        protocol_create_code: str,
412        enable_sync_sends: bool,
413        enable_async_sends: bool,
414        private: bool = False,
415        protocol_module_level_import_code: str | None = None,
416    ) -> str:
417        """Used by create_sender_module(); do not call directly."""
418        # pylint: disable=too-many-locals
419        # pylint: disable=too-many-branches
420        import textwrap
421
422        msgtypes = list(self.message_ids_by_type.keys())
423
424        ppre = '_' if private else ''
425        out = self._get_module_header(
426            'sender',
427            extra_import_code=protocol_module_level_import_code,
428            enable_async_sends=enable_async_sends,
429        )
430        ccind = textwrap.indent(protocol_create_code, '        ')
431        out += (
432            f'class {ppre}{basename}(MessageSender):\n'
433            f'    """Protocol-specific sender."""\n'
434            f'\n'
435            f'    def __init__(self) -> None:\n'
436            f'{ccind}\n'
437            f'        super().__init__(protocol)\n'
438            f'\n'
439            f'    def __get__(\n'
440            f'        self, obj: Any, type_in: Any = None\n'
441            f'    ) -> {ppre}Bound{basename}:\n'
442            f'        return {ppre}Bound{basename}(obj, self)\n'
443            f'\n'
444            f'\n'
445            f'class {ppre}Bound{basename}(BoundMessageSender):\n'
446            f'    """Protocol-specific bound sender."""\n'
447        )
448
449        def _filt_tp_name(rtype: type[Response] | None) -> str:
450            return 'None' if rtype is None else rtype.__name__
451
452        # Define handler() overloads for all registered message types.
453        if msgtypes:
454            for async_pass in False, True:
455                if async_pass and not enable_async_sends:
456                    continue
457                if not async_pass and not enable_sync_sends:
458                    continue
459                pfx = 'async ' if async_pass else ''
460                sfx = '_async' if async_pass else ''
461                # awt = 'await ' if async_pass else ''
462                awt = ''
463                how = 'asynchronously' if async_pass else 'synchronously'
464
465                if len(msgtypes) == 1:
466                    # Special case: with a single message types we don't
467                    # use overloads.
468                    msgtype = msgtypes[0]
469                    msgtypevar = msgtype.__name__
470                    rtypes = msgtype.get_response_types()
471                    if len(rtypes) > 1:
472                        rtypevar = ' | '.join(_filt_tp_name(t) for t in rtypes)
473                    else:
474                        rtypevar = _filt_tp_name(rtypes[0])
475                    if async_pass:
476                        rtypevar = f'Awaitable[{rtypevar}]'
477                    out += (
478                        f'\n'
479                        f'    def send{sfx}(self,'
480                        f' message: {msgtypevar})'
481                        f' -> {rtypevar}:\n'
482                        f'        """Send a message {how}."""\n'
483                        f'        out = {awt}self._sender.'
484                        f'send{sfx}(self._obj, message)\n'
485                    )
486                    if not async_pass:
487                        out += (
488                            f'        assert isinstance(out, {rtypevar})\n'
489                            '        return out\n'
490                        )
491                    else:
492                        out += f'        return cast({rtypevar}, out)\n'
493
494                else:
495                    for msgtype in msgtypes:
496                        msgtypevar = msgtype.__name__
497                        rtypes = msgtype.get_response_types()
498                        if len(rtypes) > 1:
499                            rtypevar = ' | '.join(
500                                _filt_tp_name(t) for t in rtypes
501                            )
502                        else:
503                            rtypevar = _filt_tp_name(rtypes[0])
504                        out += (
505                            f'\n'
506                            f'    @overload\n'
507                            f'    {pfx}def send{sfx}(self,'
508                            f' message: {msgtypevar})'
509                            f' -> {rtypevar}: ...\n'
510                        )
511                    rtypevar = 'Response | None'
512                    if async_pass:
513                        rtypevar = f'Awaitable[{rtypevar}]'
514                    out += (
515                        f'\n'
516                        f'    def send{sfx}(self, message: Message)'
517                        f' -> {rtypevar}:\n'
518                        f'        """Send a message {how}."""\n'
519                        f'        return {awt}self._sender.'
520                        f'send{sfx}(self._obj, message)\n'
521                    )
522
523        return out
524
525    def do_create_receiver_module(
526        self,
527        basename: str,
528        protocol_create_code: str,
529        is_async: bool,
530        private: bool = False,
531        protocol_module_level_import_code: str | None = None,
532    ) -> str:
533        """Used by create_receiver_module(); do not call directly."""
534        # pylint: disable=too-many-locals
535        import textwrap
536
537        desc = 'asynchronous' if is_async else 'synchronous'
538        ppre = '_' if private else ''
539        msgtypes = list(self.message_ids_by_type.keys())
540        out = self._get_module_header(
541            'receiver',
542            extra_import_code=protocol_module_level_import_code,
543            enable_async_sends=False,
544        )
545        ccind = textwrap.indent(protocol_create_code, '        ')
546        out += (
547            f'class {ppre}{basename}(MessageReceiver):\n'
548            f'    """Protocol-specific {desc} receiver."""\n'
549            f'\n'
550            f'    is_async = {is_async}\n'
551            f'\n'
552            f'    def __init__(self) -> None:\n'
553            f'{ccind}\n'
554            f'        super().__init__(protocol)\n'
555            f'\n'
556            f'    def __get__(\n'
557            f'        self,\n'
558            f'        obj: Any,\n'
559            f'        type_in: Any = None,\n'
560            f'    ) -> {ppre}Bound{basename}:\n'
561            f'        return {ppre}Bound{basename}('
562            f'obj, self)\n'
563        )
564
565        # Define handler() overloads for all registered message types.
566
567        def _filt_tp_name(rtype: type[Response] | None) -> str:
568            return 'None' if rtype is None else rtype.__name__
569
570        if msgtypes:
571            cbgn = 'Awaitable[' if is_async else ''
572            cend = ']' if is_async else ''
573            if len(msgtypes) == 1:
574                # Special case: when we have a single message type we don't
575                # use overloads.
576                msgtype = msgtypes[0]
577                msgtypevar = msgtype.__name__
578                rtypes = msgtype.get_response_types()
579                if len(rtypes) > 1:
580                    rtypevar = ' | '.join(_filt_tp_name(t) for t in rtypes)
581                else:
582                    rtypevar = _filt_tp_name(rtypes[0])
583                rtypevar = f'{cbgn}{rtypevar}{cend}'
584                out += (
585                    f'\n'
586                    f'    def handler(\n'
587                    f'        self,\n'
588                    f'        call: Callable[[Any, {msgtypevar}], '
589                    f'{rtypevar}],\n'
590                    f'    )'
591                    f' -> Callable[[Any, {msgtypevar}], {rtypevar}]:\n'
592                    f'        """Decorator to register message handlers."""\n'
593                    f'        from typing import cast, Callable, Any\n'
594                    f'\n'
595                    f'        self.register_handler(cast(Callable'
596                    f'[[Any, Message], Response], call))\n'
597                    f'        return call\n'
598                )
599            else:
600                for msgtype in msgtypes:
601                    msgtypevar = msgtype.__name__
602                    rtypes = msgtype.get_response_types()
603                    if len(rtypes) > 1:
604                        rtypevar = ' | '.join(_filt_tp_name(t) for t in rtypes)
605                    else:
606                        rtypevar = _filt_tp_name(rtypes[0])
607                    rtypevar = f'{cbgn}{rtypevar}{cend}'
608                    out += (
609                        f'\n'
610                        f'    @overload\n'
611                        f'    def handler(\n'
612                        f'        self,\n'
613                        f'        call: Callable[[Any, {msgtypevar}], '
614                        f'{rtypevar}],\n'
615                        f'    )'
616                        f' -> Callable[[Any, {msgtypevar}], {rtypevar}]: ...\n'
617                    )
618                out += (
619                    '\n'
620                    '    def handler(self, call: Callable) -> Callable:\n'
621                    '        """Decorator to register message handlers."""\n'
622                    '        self.register_handler(call)\n'
623                    '        return call\n'
624                )
625
626        out += (
627            f'\n'
628            f'\n'
629            f'class {ppre}Bound{basename}(BoundMessageReceiver):\n'
630            f'    """Protocol-specific bound receiver."""\n'
631        )
632        if is_async:
633            out += (
634                '\n'
635                '    def handle_raw_message(\n'
636                '        self, message: str, raise_unregistered: bool = False\n'
637                '    ) -> Awaitable[str]:\n'
638                '        """Asynchronously handle a raw incoming message."""\n'
639                '        return self._receiver.'
640                'handle_raw_message_async(\n'
641                '            self._obj, message, raise_unregistered\n'
642                '        )\n'
643            )
644
645        else:
646            out += (
647                '\n'
648                '    def handle_raw_message(\n'
649                '        self, message: str, raise_unregistered: bool = False\n'
650                '    ) -> str:\n'
651                '        """Synchronously handle a raw incoming message."""\n'
652                '        return self._receiver.handle_raw_message(\n'
653                '            self._obj, message, raise_unregistered\n'
654                '        )\n'
655            )
656
657        return out

Wrangles a set of message types, formats, and response types. Both endpoints must be using a compatible Protocol for communication to succeed. To maintain Protocol compatibility between revisions, all message types must retain the same id, message attr storage names must not change, newly added attrs must have default values, etc.

MessageProtocol( message_types: dict[int, type[Message]], response_types: dict[int, type[Response]], forward_communication_errors: bool = False, forward_clean_errors: bool = False, remote_errors_include_stack_traces: bool = False, log_errors_on_receiver: bool = True)
 42    def __init__(
 43        self,
 44        message_types: dict[int, type[Message]],
 45        response_types: dict[int, type[Response]],
 46        forward_communication_errors: bool = False,
 47        forward_clean_errors: bool = False,
 48        remote_errors_include_stack_traces: bool = False,
 49        log_errors_on_receiver: bool = True,
 50    ) -> None:
 51        """Create a protocol with a given configuration.
 52
 53        If 'forward_communication_errors' is True,
 54        efro.error.CommunicationErrors raised on the receiver end will
 55        result in a matching error raised back on the sender. This can
 56        be useful if the receiver will be in some way forwarding
 57        messages along and the sender doesn't need to know where
 58        communication breakdowns occurred; only that they did.
 59
 60        If 'forward_clean_errors' is True, efro.error.CleanError
 61        exceptions raised on the receiver end will result in a matching
 62        CleanError raised back on the sender.
 63
 64        When an exception is not covered by the optional forwarding
 65        mechanisms above, it will come across as efro.error.RemoteError
 66        and the exception will be logged on the receiver end - at least
 67        by default (see details below).
 68
 69        If 'remote_errors_include_stack_traces' is True, stringified
 70        stack traces will be returned with efro.error.RemoteError
 71        exceptions. This is useful for debugging but should only be
 72        enabled in cases where the sender is trusted to see internal
 73        details of the receiver.
 74
 75        By default, when a message-handling exception will result in an
 76        efro.error.RemoteError being returned to the sender, the
 77        exception will be logged on the receiver. This is because the
 78        goal is usually to avoid returning opaque RemoteErrors and to
 79        instead return something meaningful as part of the expected
 80        response type (even if that value itself represents a logical
 81        error state). If 'log_errors_on_receiver' is False, however, such
 82        exceptions will *not* be logged on the receiver. This can be
 83        useful in combination with 'remote_errors_include_stack_traces'
 84        and 'forward_clean_errors' in situations where all error
 85        logging/management will be happening on the sender end. Be
 86        aware, however, that in that case it may be possible for
 87        communication errors to prevent such error messages from
 88        ever being seen.
 89        """
 90        # pylint: disable=too-many-locals
 91        self.message_types_by_id: dict[int, type[Message]] = {}
 92        self.message_ids_by_type: dict[type[Message], int] = {}
 93        self.response_types_by_id: dict[
 94            int, type[Response] | type[SysResponse]
 95        ] = {}
 96        self.response_ids_by_type: dict[
 97            type[Response] | type[SysResponse], int
 98        ] = {}
 99        for m_id, m_type in message_types.items():
100            # Make sure only valid message types were passed and each
101            # id was assigned only once.
102            assert isinstance(m_id, int)
103            assert m_id >= 0
104            assert is_ioprepped_dataclass(m_type) and issubclass(
105                m_type, Message
106            )
107            assert self.message_types_by_id.get(m_id) is None
108            self.message_types_by_id[m_id] = m_type
109            self.message_ids_by_type[m_type] = m_id
110
111        for r_id, r_type in response_types.items():
112            assert isinstance(r_id, int)
113            assert r_id >= 0
114            assert is_ioprepped_dataclass(r_type) and issubclass(
115                r_type, Response
116            )
117            assert self.response_types_by_id.get(r_id) is None
118            self.response_types_by_id[r_id] = r_type
119            self.response_ids_by_type[r_type] = r_id
120
121        # Register our SysResponse types. These use negative
122        # IDs so as to never overlap with user Response types.
123        def _reg_sys(reg_tp: type[SysResponse], reg_id: int) -> None:
124            assert self.response_types_by_id.get(reg_id) is None
125            self.response_types_by_id[reg_id] = reg_tp
126            self.response_ids_by_type[reg_tp] = reg_id
127
128        _reg_sys(ErrorSysResponse, -1)
129        _reg_sys(EmptySysResponse, -2)
130
131        # Some extra-thorough validation in debug mode.
132        if __debug__:
133            # Make sure all Message types' return types are valid
134            # and have been assigned an ID as well.
135            all_response_types: set[type[Response] | None] = set()
136            for m_id, m_type in message_types.items():
137                m_rtypes = m_type.get_response_types()
138
139                assert isinstance(m_rtypes, list)
140                assert (
141                    m_rtypes
142                ), f'Message type {m_type} specifies no return types.'
143                assert len(set(m_rtypes)) == len(m_rtypes)  # check dups
144                for m_rtype in m_rtypes:
145                    all_response_types.add(m_rtype)
146            for cls in all_response_types:
147                if cls is None:
148                    continue
149                assert is_ioprepped_dataclass(cls)
150                assert issubclass(cls, Response)
151                if cls not in self.response_ids_by_type:
152                    raise ValueError(
153                        f'Possible response type {cls} needs to be included'
154                        f' in response_types for this protocol.'
155                    )
156
157            # Make sure all registered types have unique base names.
158            # We can take advantage of this to generate cleaner looking
159            # protocol modules. Can revisit if this is ever a problem.
160            mtypenames = set(tp.__name__ for tp in self.message_ids_by_type)
161            if len(mtypenames) != len(message_types):
162                raise ValueError(
163                    'message_types contains duplicate __name__s;'
164                    ' all types are required to have unique names.'
165                )
166
167        self.forward_clean_errors = forward_clean_errors
168        self.forward_communication_errors = forward_communication_errors
169        self.remote_errors_include_stack_traces = (
170            remote_errors_include_stack_traces
171        )
172        self.log_errors_on_receiver = log_errors_on_receiver

Create a protocol with a given configuration.

If 'forward_communication_errors' is True, efro.error.CommunicationErrors raised on the receiver end will result in a matching error raised back on the sender. This can be useful if the receiver will be in some way forwarding messages along and the sender doesn't need to know where communication breakdowns occurred; only that they did.

If 'forward_clean_errors' is True, efro.error.CleanError exceptions raised on the receiver end will result in a matching CleanError raised back on the sender.

When an exception is not covered by the optional forwarding mechanisms above, it will come across as efro.error.RemoteError and the exception will be logged on the receiver end - at least by default (see details below).

If 'remote_errors_include_stack_traces' is True, stringified stack traces will be returned with efro.error.RemoteError exceptions. This is useful for debugging but should only be enabled in cases where the sender is trusted to see internal details of the receiver.

By default, when a message-handling exception will result in an efro.error.RemoteError being returned to the sender, the exception will be logged on the receiver. This is because the goal is usually to avoid returning opaque RemoteErrors and to instead return something meaningful as part of the expected response type (even if that value itself represents a logical error state). If 'log_errors_on_receiver' is False, however, such exceptions will not be logged on the receiver. This can be useful in combination with 'remote_errors_include_stack_traces' and 'forward_clean_errors' in situations where all error logging/management will be happening on the sender end. Be aware, however, that in that case it may be possible for communication errors to prevent such error messages from ever being seen.

message_types_by_id: dict[int, type[Message]]
message_ids_by_type: dict[type[Message], int]
response_types_by_id: dict[int, type[Response] | type[SysResponse]]
response_ids_by_type: dict[type[Response] | type[SysResponse], int]
forward_clean_errors
forward_communication_errors
remote_errors_include_stack_traces
log_errors_on_receiver
@staticmethod
def encode_dict(obj: dict) -> str:
174    @staticmethod
175    def encode_dict(obj: dict) -> str:
176        """Json-encode a provided dict."""
177        return json.dumps(obj, separators=(',', ':'))

Json-encode a provided dict.

def message_to_dict(self, message: Message) -> dict:
179    def message_to_dict(self, message: Message) -> dict:
180        """Encode a message to a json ready dict."""
181        return self._to_dict(message, self.message_ids_by_type, 'message')

Encode a message to a json ready dict.

def response_to_dict( self, response: Response | SysResponse) -> dict:
183    def response_to_dict(self, response: Response | SysResponse) -> dict:
184        """Encode a response to a json ready dict."""
185        return self._to_dict(response, self.response_ids_by_type, 'response')

Encode a response to a json ready dict.

def error_to_response(self, exc: Exception) -> tuple[SysResponse, bool]:
187    def error_to_response(self, exc: Exception) -> tuple[SysResponse, bool]:
188        """Translate an Exception to a SysResponse.
189
190        Also returns whether the error should be logged if this happened
191        within handle_raw_message().
192        """
193
194        # If anything goes wrong, return a ErrorSysResponse instead.
195        # (either CLEAN or generic REMOTE)
196        if self.forward_clean_errors and isinstance(exc, CleanError):
197            return (
198                ErrorSysResponse(
199                    error_message=str(exc),
200                    error_type=ErrorSysResponse.ErrorType.REMOTE_CLEAN,
201                ),
202                False,
203            )
204        if self.forward_communication_errors and isinstance(
205            exc, CommunicationError
206        ):
207            return (
208                ErrorSysResponse(
209                    error_message=str(exc),
210                    error_type=ErrorSysResponse.ErrorType.REMOTE_COMMUNICATION,
211                ),
212                False,
213            )
214        return (
215            ErrorSysResponse(
216                error_message=(
217                    # Note: need to format exception ourself here; it
218                    # might not be current so we can't use
219                    # traceback.format_exc().
220                    ''.join(
221                        traceback.format_exception(
222                            type(exc), exc, exc.__traceback__
223                        )
224                    )
225                    if self.remote_errors_include_stack_traces
226                    else 'An internal error has occurred.'
227                ),
228                error_type=ErrorSysResponse.ErrorType.REMOTE,
229            ),
230            self.log_errors_on_receiver,
231        )

Translate an Exception to a SysResponse.

Also returns whether the error should be logged if this happened within handle_raw_message().

@staticmethod
def decode_dict(data: str) -> dict:
247    @staticmethod
248    def decode_dict(data: str) -> dict:
249        """Decode data to a dict."""
250        out = json.loads(data)
251        assert isinstance(out, dict)
252        return out

Decode data to a dict.

def message_from_dict(self, data: dict) -> Message:
254    def message_from_dict(self, data: dict) -> Message:
255        """Decode a message from a json string."""
256        out = self._from_dict(data, self.message_types_by_id, 'message')
257        assert isinstance(out, Message)
258        return out

Decode a message from a json string.

def response_from_dict( self, data: dict) -> Response | SysResponse:
260    def response_from_dict(self, data: dict) -> Response | SysResponse:
261        """Decode a response from a json string."""
262        out = self._from_dict(data, self.response_types_by_id, 'response')
263        assert isinstance(out, Response | SysResponse)
264        return out

Decode a response from a json string.

def do_create_sender_module( self, basename: str, protocol_create_code: str, enable_sync_sends: bool, enable_async_sends: bool, private: bool = False, protocol_module_level_import_code: str | None = None) -> str:
408    def do_create_sender_module(
409        self,
410        basename: str,
411        protocol_create_code: str,
412        enable_sync_sends: bool,
413        enable_async_sends: bool,
414        private: bool = False,
415        protocol_module_level_import_code: str | None = None,
416    ) -> str:
417        """Used by create_sender_module(); do not call directly."""
418        # pylint: disable=too-many-locals
419        # pylint: disable=too-many-branches
420        import textwrap
421
422        msgtypes = list(self.message_ids_by_type.keys())
423
424        ppre = '_' if private else ''
425        out = self._get_module_header(
426            'sender',
427            extra_import_code=protocol_module_level_import_code,
428            enable_async_sends=enable_async_sends,
429        )
430        ccind = textwrap.indent(protocol_create_code, '        ')
431        out += (
432            f'class {ppre}{basename}(MessageSender):\n'
433            f'    """Protocol-specific sender."""\n'
434            f'\n'
435            f'    def __init__(self) -> None:\n'
436            f'{ccind}\n'
437            f'        super().__init__(protocol)\n'
438            f'\n'
439            f'    def __get__(\n'
440            f'        self, obj: Any, type_in: Any = None\n'
441            f'    ) -> {ppre}Bound{basename}:\n'
442            f'        return {ppre}Bound{basename}(obj, self)\n'
443            f'\n'
444            f'\n'
445            f'class {ppre}Bound{basename}(BoundMessageSender):\n'
446            f'    """Protocol-specific bound sender."""\n'
447        )
448
449        def _filt_tp_name(rtype: type[Response] | None) -> str:
450            return 'None' if rtype is None else rtype.__name__
451
452        # Define handler() overloads for all registered message types.
453        if msgtypes:
454            for async_pass in False, True:
455                if async_pass and not enable_async_sends:
456                    continue
457                if not async_pass and not enable_sync_sends:
458                    continue
459                pfx = 'async ' if async_pass else ''
460                sfx = '_async' if async_pass else ''
461                # awt = 'await ' if async_pass else ''
462                awt = ''
463                how = 'asynchronously' if async_pass else 'synchronously'
464
465                if len(msgtypes) == 1:
466                    # Special case: with a single message types we don't
467                    # use overloads.
468                    msgtype = msgtypes[0]
469                    msgtypevar = msgtype.__name__
470                    rtypes = msgtype.get_response_types()
471                    if len(rtypes) > 1:
472                        rtypevar = ' | '.join(_filt_tp_name(t) for t in rtypes)
473                    else:
474                        rtypevar = _filt_tp_name(rtypes[0])
475                    if async_pass:
476                        rtypevar = f'Awaitable[{rtypevar}]'
477                    out += (
478                        f'\n'
479                        f'    def send{sfx}(self,'
480                        f' message: {msgtypevar})'
481                        f' -> {rtypevar}:\n'
482                        f'        """Send a message {how}."""\n'
483                        f'        out = {awt}self._sender.'
484                        f'send{sfx}(self._obj, message)\n'
485                    )
486                    if not async_pass:
487                        out += (
488                            f'        assert isinstance(out, {rtypevar})\n'
489                            '        return out\n'
490                        )
491                    else:
492                        out += f'        return cast({rtypevar}, out)\n'
493
494                else:
495                    for msgtype in msgtypes:
496                        msgtypevar = msgtype.__name__
497                        rtypes = msgtype.get_response_types()
498                        if len(rtypes) > 1:
499                            rtypevar = ' | '.join(
500                                _filt_tp_name(t) for t in rtypes
501                            )
502                        else:
503                            rtypevar = _filt_tp_name(rtypes[0])
504                        out += (
505                            f'\n'
506                            f'    @overload\n'
507                            f'    {pfx}def send{sfx}(self,'
508                            f' message: {msgtypevar})'
509                            f' -> {rtypevar}: ...\n'
510                        )
511                    rtypevar = 'Response | None'
512                    if async_pass:
513                        rtypevar = f'Awaitable[{rtypevar}]'
514                    out += (
515                        f'\n'
516                        f'    def send{sfx}(self, message: Message)'
517                        f' -> {rtypevar}:\n'
518                        f'        """Send a message {how}."""\n'
519                        f'        return {awt}self._sender.'
520                        f'send{sfx}(self._obj, message)\n'
521                    )
522
523        return out

Used by create_sender_module(); do not call directly.

def do_create_receiver_module( self, basename: str, protocol_create_code: str, is_async: bool, private: bool = False, protocol_module_level_import_code: str | None = None) -> str:
525    def do_create_receiver_module(
526        self,
527        basename: str,
528        protocol_create_code: str,
529        is_async: bool,
530        private: bool = False,
531        protocol_module_level_import_code: str | None = None,
532    ) -> str:
533        """Used by create_receiver_module(); do not call directly."""
534        # pylint: disable=too-many-locals
535        import textwrap
536
537        desc = 'asynchronous' if is_async else 'synchronous'
538        ppre = '_' if private else ''
539        msgtypes = list(self.message_ids_by_type.keys())
540        out = self._get_module_header(
541            'receiver',
542            extra_import_code=protocol_module_level_import_code,
543            enable_async_sends=False,
544        )
545        ccind = textwrap.indent(protocol_create_code, '        ')
546        out += (
547            f'class {ppre}{basename}(MessageReceiver):\n'
548            f'    """Protocol-specific {desc} receiver."""\n'
549            f'\n'
550            f'    is_async = {is_async}\n'
551            f'\n'
552            f'    def __init__(self) -> None:\n'
553            f'{ccind}\n'
554            f'        super().__init__(protocol)\n'
555            f'\n'
556            f'    def __get__(\n'
557            f'        self,\n'
558            f'        obj: Any,\n'
559            f'        type_in: Any = None,\n'
560            f'    ) -> {ppre}Bound{basename}:\n'
561            f'        return {ppre}Bound{basename}('
562            f'obj, self)\n'
563        )
564
565        # Define handler() overloads for all registered message types.
566
567        def _filt_tp_name(rtype: type[Response] | None) -> str:
568            return 'None' if rtype is None else rtype.__name__
569
570        if msgtypes:
571            cbgn = 'Awaitable[' if is_async else ''
572            cend = ']' if is_async else ''
573            if len(msgtypes) == 1:
574                # Special case: when we have a single message type we don't
575                # use overloads.
576                msgtype = msgtypes[0]
577                msgtypevar = msgtype.__name__
578                rtypes = msgtype.get_response_types()
579                if len(rtypes) > 1:
580                    rtypevar = ' | '.join(_filt_tp_name(t) for t in rtypes)
581                else:
582                    rtypevar = _filt_tp_name(rtypes[0])
583                rtypevar = f'{cbgn}{rtypevar}{cend}'
584                out += (
585                    f'\n'
586                    f'    def handler(\n'
587                    f'        self,\n'
588                    f'        call: Callable[[Any, {msgtypevar}], '
589                    f'{rtypevar}],\n'
590                    f'    )'
591                    f' -> Callable[[Any, {msgtypevar}], {rtypevar}]:\n'
592                    f'        """Decorator to register message handlers."""\n'
593                    f'        from typing import cast, Callable, Any\n'
594                    f'\n'
595                    f'        self.register_handler(cast(Callable'
596                    f'[[Any, Message], Response], call))\n'
597                    f'        return call\n'
598                )
599            else:
600                for msgtype in msgtypes:
601                    msgtypevar = msgtype.__name__
602                    rtypes = msgtype.get_response_types()
603                    if len(rtypes) > 1:
604                        rtypevar = ' | '.join(_filt_tp_name(t) for t in rtypes)
605                    else:
606                        rtypevar = _filt_tp_name(rtypes[0])
607                    rtypevar = f'{cbgn}{rtypevar}{cend}'
608                    out += (
609                        f'\n'
610                        f'    @overload\n'
611                        f'    def handler(\n'
612                        f'        self,\n'
613                        f'        call: Callable[[Any, {msgtypevar}], '
614                        f'{rtypevar}],\n'
615                        f'    )'
616                        f' -> Callable[[Any, {msgtypevar}], {rtypevar}]: ...\n'
617                    )
618                out += (
619                    '\n'
620                    '    def handler(self, call: Callable) -> Callable:\n'
621                    '        """Decorator to register message handlers."""\n'
622                    '        self.register_handler(call)\n'
623                    '        return call\n'
624                )
625
626        out += (
627            f'\n'
628            f'\n'
629            f'class {ppre}Bound{basename}(BoundMessageReceiver):\n'
630            f'    """Protocol-specific bound receiver."""\n'
631        )
632        if is_async:
633            out += (
634                '\n'
635                '    def handle_raw_message(\n'
636                '        self, message: str, raise_unregistered: bool = False\n'
637                '    ) -> Awaitable[str]:\n'
638                '        """Asynchronously handle a raw incoming message."""\n'
639                '        return self._receiver.'
640                'handle_raw_message_async(\n'
641                '            self._obj, message, raise_unregistered\n'
642                '        )\n'
643            )
644
645        else:
646            out += (
647                '\n'
648                '    def handle_raw_message(\n'
649                '        self, message: str, raise_unregistered: bool = False\n'
650                '    ) -> str:\n'
651                '        """Synchronously handle a raw incoming message."""\n'
652                '        return self._receiver.handle_raw_message(\n'
653                '            self._obj, message, raise_unregistered\n'
654                '        )\n'
655            )
656
657        return out

Used by create_receiver_module(); do not call directly.

class MessageSender:
 22class MessageSender:
 23    """Facilitates sending messages to a target and receiving responses.
 24
 25    These are instantiated at the class level and used to register unbound
 26    class methods to handle raw message sending. Generally this class is not
 27    used directly, but instead autogenerated subclasses which provide type
 28    safe overloads are used instead.
 29
 30    Example:
 31      (In this example, MyMessageSender is an autogenerated class that
 32      inherits from MessageSender).
 33
 34    class MyClass:
 35        msg = MyMessageSender()
 36
 37        @msg.send_method
 38        def send_raw_message(self, message: str) -> str:
 39            # Actually send the message here.
 40
 41    obj = MyClass()
 42
 43    # The MyMessageSender generated class would provides overloads for
 44    # send(), send_async(), etc. to provide type-safety for message types
 45    # and their associated response types.
 46    # Thus, given the statement below, a type-checker would know that
 47    # 'response' is a SomeResponseType or whatever is associated with
 48    # SomeMessageType.
 49    response = obj.msg.send(SomeMessageType())
 50
 51    """
 52
 53    def __init__(self, protocol: MessageProtocol) -> None:
 54        self.protocol = protocol
 55        self._send_raw_message_call: Callable[[Any, str], str] | None = None
 56        self._send_raw_message_ex_call: (
 57            Callable[[Any, str, Message], str] | None
 58        ) = None
 59        self._send_async_raw_message_call: (
 60            Callable[[Any, str], Awaitable[str]] | None
 61        ) = None
 62        self._send_async_raw_message_ex_call: (
 63            Callable[[Any, str, Message], Awaitable[str]] | None
 64        ) = None
 65        self._encode_filter_call: (
 66            Callable[[Any, Message, dict], None] | None
 67        ) = None
 68        self._decode_filter_call: (
 69            Callable[[Any, Message, dict, Response | SysResponse], None] | None
 70        ) = None
 71        self._peer_desc_call: Callable[[Any], str] | None = None
 72
 73    def send_method(
 74        self, call: Callable[[Any, str], str]
 75    ) -> Callable[[Any, str], str]:
 76        """Function decorator for setting raw send method.
 77
 78        Send methods take strings and should return strings.
 79        CommunicationErrors raised here will be returned to the sender
 80        as such; all other exceptions will result in a RuntimeError for
 81        the sender.
 82        """
 83        assert self._send_raw_message_call is None
 84        self._send_raw_message_call = call
 85        return call
 86
 87    def send_ex_method(
 88        self, call: Callable[[Any, str, Message], str]
 89    ) -> Callable[[Any, str, Message], str]:
 90        """Function decorator for extended send method.
 91
 92        Version of send_method which is also is passed the original
 93        unencoded message; can be useful for cases where metadata is sent
 94        along with messages referring to their payloads/etc.
 95        """
 96        assert self._send_raw_message_ex_call is None
 97        self._send_raw_message_ex_call = call
 98        return call
 99
100    def send_async_method(
101        self, call: Callable[[Any, str], Awaitable[str]]
102    ) -> Callable[[Any, str], Awaitable[str]]:
103        """Function decorator for setting raw send-async method.
104
105        Send methods take strings and should return strings.
106        CommunicationErrors raised here will be returned to the sender
107        as such; all other exceptions will result in a RuntimeError for
108        the sender.
109
110        IMPORTANT: Generally async send methods should not be implemented
111        as 'async' methods, but instead should be regular methods that
112        return awaitable objects. This way it can be guaranteed that
113        outgoing messages are synchronously enqueued in the correct
114        order, and then async calls can be returned which finish each
115        send. If the entire call is async, they may be enqueued out of
116        order in rare cases.
117        """
118        assert self._send_async_raw_message_call is None
119        self._send_async_raw_message_call = call
120        return call
121
122    def send_async_ex_method(
123        self, call: Callable[[Any, str, Message], Awaitable[str]]
124    ) -> Callable[[Any, str, Message], Awaitable[str]]:
125        """Function decorator for extended send-async method.
126
127        Version of send_async_method which is also is passed the original
128        unencoded message; can be useful for cases where metadata is sent
129        along with messages referring to their payloads/etc.
130        """
131        assert self._send_async_raw_message_ex_call is None
132        self._send_async_raw_message_ex_call = call
133        return call
134
135    def encode_filter_method(
136        self, call: Callable[[Any, Message, dict], None]
137    ) -> Callable[[Any, Message, dict], None]:
138        """Function decorator for defining an encode filter.
139
140        Encode filters can be used to add extra data to the message
141        dict before is is encoded to a string and sent out.
142        """
143        assert self._encode_filter_call is None
144        self._encode_filter_call = call
145        return call
146
147    def decode_filter_method(
148        self, call: Callable[[Any, Message, dict, Response | SysResponse], None]
149    ) -> Callable[[Any, Message, dict, Response], None]:
150        """Function decorator for defining a decode filter.
151
152        Decode filters can be used to extract extra data from incoming
153        message dicts.
154        """
155        assert self._decode_filter_call is None
156        self._decode_filter_call = call
157        return call
158
159    def peer_desc_method(
160        self, call: Callable[[Any], str]
161    ) -> Callable[[Any], str]:
162        """Function decorator for defining peer descriptions.
163
164        These are included in error messages or other diagnostics.
165        """
166        assert self._peer_desc_call is None
167        self._peer_desc_call = call
168        return call
169
170    def send(self, bound_obj: Any, message: Message) -> Response | None:
171        """Send a message synchronously."""
172        return self.unpack_raw_response(
173            bound_obj=bound_obj,
174            message=message,
175            raw_response=self.fetch_raw_response(
176                bound_obj=bound_obj,
177                message=message,
178            ),
179        )
180
181    def send_async(
182        self, bound_obj: Any, message: Message
183    ) -> Awaitable[Response | None]:
184        """Send a message asynchronously."""
185
186        # Note: This call is synchronous so that the first part of it can
187        # happen synchronously. If the whole call were async we wouldn't be
188        # able to guarantee that messages sent in order would actually go
189        # out in order.
190        raw_response_awaitable = self.fetch_raw_response_async(
191            bound_obj=bound_obj,
192            message=message,
193        )
194        # Now return an awaitable that will finish the send.
195        return self._send_async_awaitable(
196            bound_obj, message, raw_response_awaitable
197        )
198
199    async def _send_async_awaitable(
200        self,
201        bound_obj: Any,
202        message: Message,
203        raw_response_awaitable: Awaitable[Response | SysResponse],
204    ) -> Response | None:
205        return self.unpack_raw_response(
206            bound_obj=bound_obj,
207            message=message,
208            raw_response=await raw_response_awaitable,
209        )
210
211    def fetch_raw_response(
212        self, bound_obj: Any, message: Message
213    ) -> Response | SysResponse:
214        """Send a message synchronously.
215
216        Generally you can just call send(); these split versions are
217        for when message sending and response handling need to happen
218        in different contexts/threads.
219        """
220        if (
221            self._send_raw_message_call is None
222            and self._send_raw_message_ex_call is None
223        ):
224            raise RuntimeError('send() is unimplemented for this type.')
225
226        msg_encoded = self._encode_message(bound_obj, message)
227        try:
228            if self._send_raw_message_ex_call is not None:
229                response_encoded = self._send_raw_message_ex_call(
230                    bound_obj, msg_encoded, message
231                )
232            else:
233                assert self._send_raw_message_call is not None
234                response_encoded = self._send_raw_message_call(
235                    bound_obj, msg_encoded
236                )
237        except Exception as exc:
238            response = ErrorSysResponse(
239                error_message='Error in MessageSender @send_method.',
240                error_type=(
241                    ErrorSysResponse.ErrorType.COMMUNICATION
242                    if isinstance(exc, CommunicationError)
243                    else ErrorSysResponse.ErrorType.LOCAL
244                ),
245            )
246            # Can include the actual exception since we'll be looking at
247            # this locally; might be helpful.
248            response.set_local_exception(exc)
249            return response
250        return self._decode_raw_response(bound_obj, message, response_encoded)
251
252    def fetch_raw_response_async(
253        self, bound_obj: Any, message: Message
254    ) -> Awaitable[Response | SysResponse]:
255        """Fetch a raw message response awaitable.
256
257        The result of this should be awaited and then passed to
258        unpack_raw_response() to produce the final message result.
259
260        Generally you can just call send(); calling fetch and unpack
261        manually is for when message sending and response handling need
262        to happen in different contexts/threads.
263        """
264
265        # Note: This call is synchronous so that the first part of it can
266        # happen synchronously. If the whole call were async we wouldn't be
267        # able to guarantee that messages sent in order would actually go
268        # out in order.
269        if (
270            self._send_async_raw_message_call is None
271            and self._send_async_raw_message_ex_call is None
272        ):
273            raise RuntimeError('send_async() is unimplemented for this type.')
274
275        msg_encoded = self._encode_message(bound_obj, message)
276        try:
277            if self._send_async_raw_message_ex_call is not None:
278                send_awaitable = self._send_async_raw_message_ex_call(
279                    bound_obj, msg_encoded, message
280                )
281            else:
282                assert self._send_async_raw_message_call is not None
283                send_awaitable = self._send_async_raw_message_call(
284                    bound_obj, msg_encoded
285                )
286        except Exception as exc:
287            return self._error_awaitable(exc)
288
289        # Now return an awaitable to finish the job.
290        return self._fetch_raw_response_awaitable(
291            bound_obj, message, send_awaitable
292        )
293
294    async def _error_awaitable(self, exc: Exception) -> SysResponse:
295        response = ErrorSysResponse(
296            error_message='Error in MessageSender @send_async_method.',
297            error_type=(
298                ErrorSysResponse.ErrorType.COMMUNICATION
299                if isinstance(exc, CommunicationError)
300                else ErrorSysResponse.ErrorType.LOCAL
301            ),
302        )
303        # Can include the actual exception since we'll be looking at
304        # this locally; might be helpful.
305        response.set_local_exception(exc)
306        return response
307
308    async def _fetch_raw_response_awaitable(
309        self, bound_obj: Any, message: Message, send_awaitable: Awaitable[str]
310    ) -> Response | SysResponse:
311        try:
312            response_encoded = await send_awaitable
313        except Exception as exc:
314            response = ErrorSysResponse(
315                error_message='Error in MessageSender @send_async_method.',
316                error_type=(
317                    ErrorSysResponse.ErrorType.COMMUNICATION
318                    if isinstance(exc, CommunicationError)
319                    else ErrorSysResponse.ErrorType.LOCAL
320                ),
321            )
322            # Can include the actual exception since we'll be looking at
323            # this locally; might be helpful.
324            response.set_local_exception(exc)
325            return response
326        return self._decode_raw_response(bound_obj, message, response_encoded)
327
328    def unpack_raw_response(
329        self,
330        bound_obj: Any,
331        message: Message,
332        raw_response: Response | SysResponse,
333    ) -> Response | None:
334        """Convert a raw fetched response into a final response/error/etc.
335
336        Generally you can just call send(); calling fetch and unpack
337        manually is for when message sending and response handling need
338        to happen in different contexts/threads.
339        """
340        response = self._unpack_raw_response(bound_obj, raw_response)
341        assert (
342            response is None
343            or type(response) in type(message).get_response_types()
344        )
345        return response
346
347    def _encode_message(self, bound_obj: Any, message: Message) -> str:
348        """Encode a message for sending."""
349        msg_dict = self.protocol.message_to_dict(message)
350        if self._encode_filter_call is not None:
351            self._encode_filter_call(bound_obj, message, msg_dict)
352        return self.protocol.encode_dict(msg_dict)
353
354    def _decode_raw_response(
355        self, bound_obj: Any, message: Message, response_encoded: str
356    ) -> Response | SysResponse:
357        """Create a Response from returned data.
358
359        These Responses may encapsulate things like remote errors and
360        should not be handed directly to users. _unpack_raw_response()
361        should be used to translate to special values like None or raise
362        Exceptions. This function itself should never raise Exceptions.
363        """
364        response: Response | SysResponse
365        try:
366            response_dict = self.protocol.decode_dict(response_encoded)
367            response = self.protocol.response_from_dict(response_dict)
368            if self._decode_filter_call is not None:
369                self._decode_filter_call(
370                    bound_obj, message, response_dict, response
371                )
372        except Exception as exc:
373            response = ErrorSysResponse(
374                error_message='Error decoding raw response.',
375                error_type=ErrorSysResponse.ErrorType.LOCAL,
376            )
377            # Since we'll be looking at this locally, we can include
378            # extra info for logging/etc.
379            response.set_local_exception(exc)
380        return response
381
382    def _unpack_raw_response(
383        self, bound_obj: Any, raw_response: Response | SysResponse
384    ) -> Response | None:
385        """Given a raw Response, unpacks to special values or Exceptions.
386
387        The result of this call is what should be passed to users.
388        For complex messaging situations such as response callbacks
389        operating across different threads, this last stage should be
390        run such that any raised Exception is active when the callback
391        fires; not on the thread where the message was sent.
392        """
393        # EmptySysResponse translates to None
394        if isinstance(raw_response, EmptySysResponse):
395            return None
396
397        # Some error occurred. Raise a local Exception for it.
398        if isinstance(raw_response, ErrorSysResponse):
399            # Errors that happened locally can attach their exceptions
400            # here for extra logging goodness.
401            local_exception = raw_response.get_local_exception()
402
403            if (
404                raw_response.error_type
405                is ErrorSysResponse.ErrorType.COMMUNICATION
406            ):
407                raise CommunicationError(
408                    raw_response.error_message
409                ) from local_exception
410
411            # If something went wrong on *our* end of the connection,
412            # don't say it was a remote error.
413            if raw_response.error_type is ErrorSysResponse.ErrorType.LOCAL:
414                raise RuntimeError(
415                    raw_response.error_message
416                ) from local_exception
417
418            # If they want to support clean errors, do those.
419            if (
420                self.protocol.forward_clean_errors
421                and raw_response.error_type
422                is ErrorSysResponse.ErrorType.REMOTE_CLEAN
423            ):
424                raise CleanError(
425                    raw_response.error_message
426                ) from local_exception
427
428            if (
429                self.protocol.forward_communication_errors
430                and raw_response.error_type
431                is ErrorSysResponse.ErrorType.REMOTE_COMMUNICATION
432            ):
433                raise CommunicationError(
434                    raw_response.error_message
435                ) from local_exception
436
437            # Everything else gets lumped in as a remote error.
438            raise RemoteError(
439                raw_response.error_message,
440                peer_desc=(
441                    'peer'
442                    if self._peer_desc_call is None
443                    else self._peer_desc_call(bound_obj)
444                ),
445            ) from local_exception
446
447        assert isinstance(raw_response, Response)
448        return raw_response

Facilitates sending messages to a target and receiving responses.

These are instantiated at the class level and used to register unbound class methods to handle raw message sending. Generally this class is not used directly, but instead autogenerated subclasses which provide type safe overloads are used instead.

Example: (In this example, MyMessageSender is an autogenerated class that inherits from MessageSender).

class MyClass: msg = MyMessageSender()

@msg.send_method
def send_raw_message(self, message: str) -> str:
    # Actually send the message here.

obj = MyClass()

The MyMessageSender generated class would provides overloads for

send(), send_async(), etc. to provide type-safety for message types

and their associated response types.

Thus, given the statement below, a type-checker would know that

'response' is a SomeResponseType or whatever is associated with

SomeMessageType.

response = obj.msg.send(SomeMessageType())

MessageSender(protocol: MessageProtocol)
53    def __init__(self, protocol: MessageProtocol) -> None:
54        self.protocol = protocol
55        self._send_raw_message_call: Callable[[Any, str], str] | None = None
56        self._send_raw_message_ex_call: (
57            Callable[[Any, str, Message], str] | None
58        ) = None
59        self._send_async_raw_message_call: (
60            Callable[[Any, str], Awaitable[str]] | None
61        ) = None
62        self._send_async_raw_message_ex_call: (
63            Callable[[Any, str, Message], Awaitable[str]] | None
64        ) = None
65        self._encode_filter_call: (
66            Callable[[Any, Message, dict], None] | None
67        ) = None
68        self._decode_filter_call: (
69            Callable[[Any, Message, dict, Response | SysResponse], None] | None
70        ) = None
71        self._peer_desc_call: Callable[[Any], str] | None = None
protocol
def send_method(self, call: Callable[[Any, str], str]) -> Callable[[Any, str], str]:
73    def send_method(
74        self, call: Callable[[Any, str], str]
75    ) -> Callable[[Any, str], str]:
76        """Function decorator for setting raw send method.
77
78        Send methods take strings and should return strings.
79        CommunicationErrors raised here will be returned to the sender
80        as such; all other exceptions will result in a RuntimeError for
81        the sender.
82        """
83        assert self._send_raw_message_call is None
84        self._send_raw_message_call = call
85        return call

Function decorator for setting raw send method.

Send methods take strings and should return strings. CommunicationErrors raised here will be returned to the sender as such; all other exceptions will result in a RuntimeError for the sender.

def send_ex_method( self, call: Callable[[Any, str, Message], str]) -> Callable[[Any, str, Message], str]:
87    def send_ex_method(
88        self, call: Callable[[Any, str, Message], str]
89    ) -> Callable[[Any, str, Message], str]:
90        """Function decorator for extended send method.
91
92        Version of send_method which is also is passed the original
93        unencoded message; can be useful for cases where metadata is sent
94        along with messages referring to their payloads/etc.
95        """
96        assert self._send_raw_message_ex_call is None
97        self._send_raw_message_ex_call = call
98        return call

Function decorator for extended send method.

Version of send_method which is also is passed the original unencoded message; can be useful for cases where metadata is sent along with messages referring to their payloads/etc.

def send_async_method( self, call: Callable[[Any, str], Awaitable[str]]) -> Callable[[Any, str], Awaitable[str]]:
100    def send_async_method(
101        self, call: Callable[[Any, str], Awaitable[str]]
102    ) -> Callable[[Any, str], Awaitable[str]]:
103        """Function decorator for setting raw send-async method.
104
105        Send methods take strings and should return strings.
106        CommunicationErrors raised here will be returned to the sender
107        as such; all other exceptions will result in a RuntimeError for
108        the sender.
109
110        IMPORTANT: Generally async send methods should not be implemented
111        as 'async' methods, but instead should be regular methods that
112        return awaitable objects. This way it can be guaranteed that
113        outgoing messages are synchronously enqueued in the correct
114        order, and then async calls can be returned which finish each
115        send. If the entire call is async, they may be enqueued out of
116        order in rare cases.
117        """
118        assert self._send_async_raw_message_call is None
119        self._send_async_raw_message_call = call
120        return call

Function decorator for setting raw send-async method.

Send methods take strings and should return strings. CommunicationErrors raised here will be returned to the sender as such; all other exceptions will result in a RuntimeError for the sender.

IMPORTANT: Generally async send methods should not be implemented as 'async' methods, but instead should be regular methods that return awaitable objects. This way it can be guaranteed that outgoing messages are synchronously enqueued in the correct order, and then async calls can be returned which finish each send. If the entire call is async, they may be enqueued out of order in rare cases.

def send_async_ex_method( self, call: Callable[[Any, str, Message], Awaitable[str]]) -> Callable[[Any, str, Message], Awaitable[str]]:
122    def send_async_ex_method(
123        self, call: Callable[[Any, str, Message], Awaitable[str]]
124    ) -> Callable[[Any, str, Message], Awaitable[str]]:
125        """Function decorator for extended send-async method.
126
127        Version of send_async_method which is also is passed the original
128        unencoded message; can be useful for cases where metadata is sent
129        along with messages referring to their payloads/etc.
130        """
131        assert self._send_async_raw_message_ex_call is None
132        self._send_async_raw_message_ex_call = call
133        return call

Function decorator for extended send-async method.

Version of send_async_method which is also is passed the original unencoded message; can be useful for cases where metadata is sent along with messages referring to their payloads/etc.

def encode_filter_method( self, call: Callable[[Any, Message, dict], NoneType]) -> Callable[[Any, Message, dict], NoneType]:
135    def encode_filter_method(
136        self, call: Callable[[Any, Message, dict], None]
137    ) -> Callable[[Any, Message, dict], None]:
138        """Function decorator for defining an encode filter.
139
140        Encode filters can be used to add extra data to the message
141        dict before is is encoded to a string and sent out.
142        """
143        assert self._encode_filter_call is None
144        self._encode_filter_call = call
145        return call

Function decorator for defining an encode filter.

Encode filters can be used to add extra data to the message dict before is is encoded to a string and sent out.

def decode_filter_method( self, call: Callable[[Any, Message, dict, Response | SysResponse], NoneType]) -> Callable[[Any, Message, dict, Response], NoneType]:
147    def decode_filter_method(
148        self, call: Callable[[Any, Message, dict, Response | SysResponse], None]
149    ) -> Callable[[Any, Message, dict, Response], None]:
150        """Function decorator for defining a decode filter.
151
152        Decode filters can be used to extract extra data from incoming
153        message dicts.
154        """
155        assert self._decode_filter_call is None
156        self._decode_filter_call = call
157        return call

Function decorator for defining a decode filter.

Decode filters can be used to extract extra data from incoming message dicts.

def peer_desc_method(self, call: Callable[[Any], str]) -> Callable[[Any], str]:
159    def peer_desc_method(
160        self, call: Callable[[Any], str]
161    ) -> Callable[[Any], str]:
162        """Function decorator for defining peer descriptions.
163
164        These are included in error messages or other diagnostics.
165        """
166        assert self._peer_desc_call is None
167        self._peer_desc_call = call
168        return call

Function decorator for defining peer descriptions.

These are included in error messages or other diagnostics.

def send( self, bound_obj: Any, message: Message) -> Response | None:
170    def send(self, bound_obj: Any, message: Message) -> Response | None:
171        """Send a message synchronously."""
172        return self.unpack_raw_response(
173            bound_obj=bound_obj,
174            message=message,
175            raw_response=self.fetch_raw_response(
176                bound_obj=bound_obj,
177                message=message,
178            ),
179        )

Send a message synchronously.

def send_async( self, bound_obj: Any, message: Message) -> Awaitable[Response | None]:
181    def send_async(
182        self, bound_obj: Any, message: Message
183    ) -> Awaitable[Response | None]:
184        """Send a message asynchronously."""
185
186        # Note: This call is synchronous so that the first part of it can
187        # happen synchronously. If the whole call were async we wouldn't be
188        # able to guarantee that messages sent in order would actually go
189        # out in order.
190        raw_response_awaitable = self.fetch_raw_response_async(
191            bound_obj=bound_obj,
192            message=message,
193        )
194        # Now return an awaitable that will finish the send.
195        return self._send_async_awaitable(
196            bound_obj, message, raw_response_awaitable
197        )

Send a message asynchronously.

def fetch_raw_response( self, bound_obj: Any, message: Message) -> Response | SysResponse:
211    def fetch_raw_response(
212        self, bound_obj: Any, message: Message
213    ) -> Response | SysResponse:
214        """Send a message synchronously.
215
216        Generally you can just call send(); these split versions are
217        for when message sending and response handling need to happen
218        in different contexts/threads.
219        """
220        if (
221            self._send_raw_message_call is None
222            and self._send_raw_message_ex_call is None
223        ):
224            raise RuntimeError('send() is unimplemented for this type.')
225
226        msg_encoded = self._encode_message(bound_obj, message)
227        try:
228            if self._send_raw_message_ex_call is not None:
229                response_encoded = self._send_raw_message_ex_call(
230                    bound_obj, msg_encoded, message
231                )
232            else:
233                assert self._send_raw_message_call is not None
234                response_encoded = self._send_raw_message_call(
235                    bound_obj, msg_encoded
236                )
237        except Exception as exc:
238            response = ErrorSysResponse(
239                error_message='Error in MessageSender @send_method.',
240                error_type=(
241                    ErrorSysResponse.ErrorType.COMMUNICATION
242                    if isinstance(exc, CommunicationError)
243                    else ErrorSysResponse.ErrorType.LOCAL
244                ),
245            )
246            # Can include the actual exception since we'll be looking at
247            # this locally; might be helpful.
248            response.set_local_exception(exc)
249            return response
250        return self._decode_raw_response(bound_obj, message, response_encoded)

Send a message synchronously.

Generally you can just call send(); these split versions are for when message sending and response handling need to happen in different contexts/threads.

def fetch_raw_response_async( self, bound_obj: Any, message: Message) -> Awaitable[Response | SysResponse]:
252    def fetch_raw_response_async(
253        self, bound_obj: Any, message: Message
254    ) -> Awaitable[Response | SysResponse]:
255        """Fetch a raw message response awaitable.
256
257        The result of this should be awaited and then passed to
258        unpack_raw_response() to produce the final message result.
259
260        Generally you can just call send(); calling fetch and unpack
261        manually is for when message sending and response handling need
262        to happen in different contexts/threads.
263        """
264
265        # Note: This call is synchronous so that the first part of it can
266        # happen synchronously. If the whole call were async we wouldn't be
267        # able to guarantee that messages sent in order would actually go
268        # out in order.
269        if (
270            self._send_async_raw_message_call is None
271            and self._send_async_raw_message_ex_call is None
272        ):
273            raise RuntimeError('send_async() is unimplemented for this type.')
274
275        msg_encoded = self._encode_message(bound_obj, message)
276        try:
277            if self._send_async_raw_message_ex_call is not None:
278                send_awaitable = self._send_async_raw_message_ex_call(
279                    bound_obj, msg_encoded, message
280                )
281            else:
282                assert self._send_async_raw_message_call is not None
283                send_awaitable = self._send_async_raw_message_call(
284                    bound_obj, msg_encoded
285                )
286        except Exception as exc:
287            return self._error_awaitable(exc)
288
289        # Now return an awaitable to finish the job.
290        return self._fetch_raw_response_awaitable(
291            bound_obj, message, send_awaitable
292        )

Fetch a raw message response awaitable.

The result of this should be awaited and then passed to unpack_raw_response() to produce the final message result.

Generally you can just call send(); calling fetch and unpack manually is for when message sending and response handling need to happen in different contexts/threads.

def unpack_raw_response( self, bound_obj: Any, message: Message, raw_response: Response | SysResponse) -> Response | None:
328    def unpack_raw_response(
329        self,
330        bound_obj: Any,
331        message: Message,
332        raw_response: Response | SysResponse,
333    ) -> Response | None:
334        """Convert a raw fetched response into a final response/error/etc.
335
336        Generally you can just call send(); calling fetch and unpack
337        manually is for when message sending and response handling need
338        to happen in different contexts/threads.
339        """
340        response = self._unpack_raw_response(bound_obj, raw_response)
341        assert (
342            response is None
343            or type(response) in type(message).get_response_types()
344        )
345        return response

Convert a raw fetched response into a final response/error/etc.

Generally you can just call send(); calling fetch and unpack manually is for when message sending and response handling need to happen in different contexts/threads.

class BoundMessageSender:
451class BoundMessageSender:
452    """Base class for bound senders."""
453
454    def __init__(self, obj: Any, sender: MessageSender) -> None:
455        # Note: not checking obj here since we want to support
456        # at least our protocol property when accessed via type.
457        self._obj = obj
458        self._sender = sender
459
460    @property
461    def protocol(self) -> MessageProtocol:
462        """Protocol associated with this sender."""
463        return self._sender.protocol
464
465    def send_untyped(self, message: Message) -> Response | None:
466        """Send a message synchronously.
467
468        Whenever possible, use the send() call provided by generated
469        subclasses instead of this; it will provide better type safety.
470        """
471        assert self._obj is not None
472        return self._sender.send(bound_obj=self._obj, message=message)
473
474    def send_async_untyped(
475        self, message: Message
476    ) -> Awaitable[Response | None]:
477        """Send a message asynchronously.
478
479        Whenever possible, use the send_async() call provided by generated
480        subclasses instead of this; it will provide better type safety.
481        """
482        assert self._obj is not None
483        return self._sender.send_async(bound_obj=self._obj, message=message)
484
485    def fetch_raw_response_async_untyped(
486        self, message: Message
487    ) -> Awaitable[Response | SysResponse]:
488        """Split send (part 1 of 2)."""
489        assert self._obj is not None
490        return self._sender.fetch_raw_response_async(
491            bound_obj=self._obj, message=message
492        )
493
494    def unpack_raw_response_untyped(
495        self, message: Message, raw_response: Response | SysResponse
496    ) -> Response | None:
497        """Split send (part 2 of 2)."""
498        return self._sender.unpack_raw_response(
499            bound_obj=self._obj, message=message, raw_response=raw_response
500        )

Base class for bound senders.

BoundMessageSender(obj: Any, sender: MessageSender)
454    def __init__(self, obj: Any, sender: MessageSender) -> None:
455        # Note: not checking obj here since we want to support
456        # at least our protocol property when accessed via type.
457        self._obj = obj
458        self._sender = sender
protocol: MessageProtocol
460    @property
461    def protocol(self) -> MessageProtocol:
462        """Protocol associated with this sender."""
463        return self._sender.protocol

Protocol associated with this sender.

def send_untyped( self, message: Message) -> Response | None:
465    def send_untyped(self, message: Message) -> Response | None:
466        """Send a message synchronously.
467
468        Whenever possible, use the send() call provided by generated
469        subclasses instead of this; it will provide better type safety.
470        """
471        assert self._obj is not None
472        return self._sender.send(bound_obj=self._obj, message=message)

Send a message synchronously.

Whenever possible, use the send() call provided by generated subclasses instead of this; it will provide better type safety.

def send_async_untyped( self, message: Message) -> Awaitable[Response | None]:
474    def send_async_untyped(
475        self, message: Message
476    ) -> Awaitable[Response | None]:
477        """Send a message asynchronously.
478
479        Whenever possible, use the send_async() call provided by generated
480        subclasses instead of this; it will provide better type safety.
481        """
482        assert self._obj is not None
483        return self._sender.send_async(bound_obj=self._obj, message=message)

Send a message asynchronously.

Whenever possible, use the send_async() call provided by generated subclasses instead of this; it will provide better type safety.

def fetch_raw_response_async_untyped( self, message: Message) -> Awaitable[Response | SysResponse]:
485    def fetch_raw_response_async_untyped(
486        self, message: Message
487    ) -> Awaitable[Response | SysResponse]:
488        """Split send (part 1 of 2)."""
489        assert self._obj is not None
490        return self._sender.fetch_raw_response_async(
491            bound_obj=self._obj, message=message
492        )

Split send (part 1 of 2).

def unpack_raw_response_untyped( self, message: Message, raw_response: Response | SysResponse) -> Response | None:
494    def unpack_raw_response_untyped(
495        self, message: Message, raw_response: Response | SysResponse
496    ) -> Response | None:
497        """Split send (part 2 of 2)."""
498        return self._sender.unpack_raw_response(
499            bound_obj=self._obj, message=message, raw_response=raw_response
500        )

Split send (part 2 of 2).

class MessageReceiver:
 29class MessageReceiver:
 30    """Facilitates receiving & responding to messages from a remote source.
 31
 32    This is instantiated at the class level with unbound methods registered
 33    as handlers for different message types in the protocol.
 34
 35    Example:
 36
 37    class MyClass:
 38        receiver = MyMessageReceiver()
 39
 40        # MyMessageReceiver fills out handler() overloads to ensure all
 41        # registered handlers have valid types/return-types.
 42
 43        @receiver.handler
 44        def handle_some_message_type(self, message: SomeMsg) -> SomeResponse:
 45            # Deal with this message type here.
 46
 47    # This will trigger the registered handler being called.
 48    obj = MyClass()
 49    obj.receiver.handle_raw_message(some_raw_data)
 50
 51    Any unhandled Exception occurring during message handling will result in
 52    an efro.error.RemoteError being raised on the sending end.
 53    """
 54
 55    is_async = False
 56
 57    def __init__(self, protocol: MessageProtocol) -> None:
 58        self.protocol = protocol
 59        self._handlers: dict[type[Message], Callable] = {}
 60        self._decode_filter_call: (
 61            Callable[[Any, dict, Message], None] | None
 62        ) = None
 63        self._encode_filter_call: (
 64            Callable[[Any, Message | None, Response | SysResponse, dict], None]
 65            | None
 66        ) = None
 67
 68    # noinspection PyProtectedMember
 69    def register_handler(
 70        self, call: Callable[[Any, Message], Response | None]
 71    ) -> None:
 72        """Register a handler call.
 73
 74        The message type handled by the call is determined by its
 75        type annotation.
 76        """
 77        # TODO: can use types.GenericAlias in 3.9.
 78        # (hmm though now that we're there,  it seems a drop-in
 79        # replace gives us errors. Should re-test in 3.11 as it seems
 80        # that typing_extensions handles it differently in that case)
 81        from typing import _GenericAlias  # type: ignore
 82        from typing import get_type_hints, get_args
 83
 84        sig = inspect.getfullargspec(call)
 85
 86        # The provided callable should be a method taking one 'msg' arg.
 87        expectedsig = ['self', 'msg']
 88        if sig.args != expectedsig:
 89            raise ValueError(
 90                f'Expected callable signature of {expectedsig};'
 91                f' got {sig.args}'
 92            )
 93
 94        # Check annotation types to determine what message types we handle.
 95        # Return-type annotation can be a Union, but we probably don't
 96        # have it available at runtime. Explicitly pull it in.
 97        # UPDATE: we've updated our pylint filter to where we should
 98        # have all annotations available.
 99        # anns = get_type_hints(call, localns={'Union': Union})
100        anns = get_type_hints(call)
101
102        msgtype = anns.get('msg')
103        if not isinstance(msgtype, type):
104            raise TypeError(
105                f'expected a type for "msg" annotation; got {type(msgtype)}.'
106            )
107        assert issubclass(msgtype, Message)
108
109        ret = anns.get('return')
110        responsetypes: tuple[type[Any] | None, ...]
111
112        # Return types can be a single type or a union of types.
113        if isinstance(ret, (_GenericAlias, types.UnionType)):
114            targs = get_args(ret)
115            if not all(isinstance(a, (type, type(None))) for a in targs):
116                raise TypeError(
117                    f'expected only types for "return" annotation;'
118                    f' got {targs}.'
119                )
120            responsetypes = targs
121        else:
122            if not isinstance(ret, (type, type(None))):
123                raise TypeError(
124                    f'expected one or more types for'
125                    f' "return" annotation; got a {type(ret)}.'
126                )
127            # This seems like maybe a mypy bug. Appeared after adding
128            # types.UnionType above.
129            responsetypes = (ret,)
130
131        # This will contain NoneType for empty return cases, but
132        # we expect it to be None.
133        # noinspection PyPep8
134        responsetypes = tuple(
135            None if r is type(None) else r for r in responsetypes
136        )
137
138        # Make sure our protocol has this message type registered and our
139        # return types exactly match. (Technically we could return a subset
140        # of the supported types; can allow this in the future if it makes
141        # sense).
142        registered_types = self.protocol.message_ids_by_type.keys()
143
144        if msgtype not in registered_types:
145            raise TypeError(
146                f'Message type {msgtype} is not registered'
147                f' in this Protocol.'
148            )
149
150        if msgtype in self._handlers:
151            raise TypeError(
152                f'Message type {msgtype} already has a registered handler.'
153            )
154
155        # Make sure the responses exactly matches what the message expects.
156        if set(responsetypes) != set(msgtype.get_response_types()):
157            raise TypeError(
158                f'Provided response types {responsetypes} do not'
159                f' match the set expected by message type {msgtype}: '
160                f'({msgtype.get_response_types()})'
161            )
162
163        # Ok; we're good!
164        self._handlers[msgtype] = call
165
166    def decode_filter_method(
167        self, call: Callable[[Any, dict, Message], None]
168    ) -> Callable[[Any, dict, Message], None]:
169        """Function decorator for defining a decode filter.
170
171        Decode filters can be used to extract extra data from incoming
172        message dicts. This version will work for both handle_raw_message()
173        and handle_raw_message_async()
174        """
175        assert self._decode_filter_call is None
176        self._decode_filter_call = call
177        return call
178
179    def encode_filter_method(
180        self,
181        call: Callable[
182            [Any, Message | None, Response | SysResponse, dict], None
183        ],
184    ) -> Callable[[Any, Message | None, Response, dict], None]:
185        """Function decorator for defining an encode filter.
186
187        Encode filters can be used to add extra data to the message
188        dict before is is encoded to a string and sent out.
189        """
190        assert self._encode_filter_call is None
191        self._encode_filter_call = call
192        return call
193
194    def validate(self, log_only: bool = False) -> None:
195        """Check for handler completeness, valid types, etc."""
196        for msgtype in self.protocol.message_ids_by_type.keys():
197            if issubclass(msgtype, Response):
198                continue
199            if msgtype not in self._handlers:
200                msg = (
201                    f'Protocol message type {msgtype} is not handled'
202                    f' by receiver type {type(self)}.'
203                )
204                if log_only:
205                    logging.error(msg)
206                else:
207                    raise TypeError(msg)
208
209    def _decode_incoming_message_base(
210        self, bound_obj: Any, msg: str
211    ) -> tuple[Any, dict, Message]:
212        # Decode the incoming message.
213        msg_dict = self.protocol.decode_dict(msg)
214        msg_decoded = self.protocol.message_from_dict(msg_dict)
215        assert isinstance(msg_decoded, Message)
216        if self._decode_filter_call is not None:
217            self._decode_filter_call(bound_obj, msg_dict, msg_decoded)
218        return bound_obj, msg_dict, msg_decoded
219
220    def _decode_incoming_message(self, bound_obj: Any, msg: str) -> Message:
221        bound_obj, _msg_dict, msg_decoded = self._decode_incoming_message_base(
222            bound_obj=bound_obj, msg=msg
223        )
224        return msg_decoded
225
226    def encode_user_response(
227        self, bound_obj: Any, message: Message, response: Response | None
228    ) -> str:
229        """Encode a response provided by the user for sending."""
230
231        assert isinstance(response, Response | None)
232        # (user should never explicitly return error-responses)
233        assert (
234            response is None or type(response) in message.get_response_types()
235        )
236
237        # A return value of None equals EmptySysResponse.
238        out_response: Response | SysResponse
239        if response is None:
240            out_response = EmptySysResponse()
241        else:
242            out_response = response
243
244        response_dict = self.protocol.response_to_dict(out_response)
245        if self._encode_filter_call is not None:
246            self._encode_filter_call(
247                bound_obj, message, out_response, response_dict
248            )
249        return self.protocol.encode_dict(response_dict)
250
251    def encode_error_response(
252        self, bound_obj: Any, message: Message | None, exc: Exception
253    ) -> tuple[str, bool]:
254        """Given an error, return sysresponse str and whether to log."""
255        response, dolog = self.protocol.error_to_response(exc)
256        response_dict = self.protocol.response_to_dict(response)
257        if self._encode_filter_call is not None:
258            self._encode_filter_call(
259                bound_obj, message, response, response_dict
260            )
261        return self.protocol.encode_dict(response_dict), dolog
262
263    def handle_raw_message(
264        self, bound_obj: Any, msg: str, raise_unregistered: bool = False
265    ) -> str:
266        """Decode, handle, and return an response for a message.
267
268        if 'raise_unregistered' is True, will raise an
269        efro.message.UnregisteredMessageIDError for messages not handled by
270        the protocol. In all other cases local errors will translate to
271        error responses returned to the sender.
272        """
273        assert not self.is_async, "can't call sync handler on async receiver"
274        msg_decoded: Message | None = None
275        try:
276            msg_decoded = self._decode_incoming_message(bound_obj, msg)
277            msgtype = type(msg_decoded)
278            handler = self._handlers.get(msgtype)
279            if handler is None:
280                raise RuntimeError(f'Got unhandled message type: {msgtype}.')
281            response = handler(bound_obj, msg_decoded)
282            assert isinstance(response, Response | None)
283            return self.encode_user_response(bound_obj, msg_decoded, response)
284
285        except Exception as exc:
286            if raise_unregistered and isinstance(
287                exc, UnregisteredMessageIDError
288            ):
289                raise
290            rstr, dolog = self.encode_error_response(
291                bound_obj, msg_decoded, exc
292            )
293            if dolog:
294                if msg_decoded is not None:
295                    msgtype = type(msg_decoded)
296                    logging.exception(
297                        'Error handling %s.%s message.',
298                        msgtype.__module__,
299                        msgtype.__qualname__,
300                    )
301                else:
302                    logging.exception(
303                        'Error handling raw efro.message'
304                        ' (likely a message format incompatibility): %s.',
305                        msg,
306                    )
307            return rstr
308
309    def handle_raw_message_async(
310        self, bound_obj: Any, msg: str, raise_unregistered: bool = False
311    ) -> Awaitable[str]:
312        """Should be called when the receiver gets a message.
313
314        The return value is the raw response to the message.
315        """
316
317        # Note: This call is synchronous so that the first part of it can
318        # happen synchronously. If the whole call were async we wouldn't be
319        # able to guarantee that messages handlers would be called in the
320        # order the messages were received.
321
322        assert self.is_async, "Can't call async handler on sync receiver."
323        msg_decoded: Message | None = None
324        try:
325            msg_decoded = self._decode_incoming_message(bound_obj, msg)
326            msgtype = type(msg_decoded)
327            handler = self._handlers.get(msgtype)
328            if handler is None:
329                raise RuntimeError(f'Got unhandled message type: {msgtype}.')
330            handler_awaitable = handler(bound_obj, msg_decoded)
331
332        except Exception as exc:
333            if raise_unregistered and isinstance(
334                exc, UnregisteredMessageIDError
335            ):
336                raise
337            return self._handle_raw_message_async_error(
338                bound_obj, msg, msg_decoded, exc
339            )
340
341        # Return an awaitable to handle the rest asynchronously.
342        return self._handle_raw_message_async(
343            bound_obj, msg, msg_decoded, handler_awaitable
344        )
345
346    async def _handle_raw_message_async_error(
347        self,
348        bound_obj: Any,
349        msg_raw: str,
350        msg_decoded: Message | None,
351        exc: Exception,
352    ) -> str:
353        rstr, dolog = self.encode_error_response(bound_obj, msg_decoded, exc)
354        if dolog:
355            if msg_decoded is not None:
356                msgtype = type(msg_decoded)
357                logging.exception(
358                    'Error handling %s.%s message.',
359                    msgtype.__module__,
360                    msgtype.__qualname__,
361                    # We need to explicitly provide the exception here,
362                    # otherwise it shows up at None. I assume related to
363                    # the fact that we're an async function.
364                    exc_info=exc,
365                )
366            else:
367                logging.exception(
368                    'Error handling raw async efro.message'
369                    ' (likely a message format incompatibility): %s.',
370                    msg_raw,
371                    # We need to explicitly provide the exception here,
372                    # otherwise it shows up at None. I assume related to
373                    # the fact that we're an async function.
374                    exc_info=exc,
375                )
376        return rstr
377
378    async def _handle_raw_message_async(
379        self,
380        bound_obj: Any,
381        msg_raw: str,
382        msg_decoded: Message,
383        handler_awaitable: Awaitable[Response | None],
384    ) -> str:
385        """Should be called when the receiver gets a message.
386
387        The return value is the raw response to the message.
388        """
389        try:
390            response = await handler_awaitable
391            assert isinstance(response, Response | None)
392            return self.encode_user_response(bound_obj, msg_decoded, response)
393
394        except Exception as exc:
395            return await self._handle_raw_message_async_error(
396                bound_obj, msg_raw, msg_decoded, exc
397            )

Facilitates receiving & responding to messages from a remote source.

This is instantiated at the class level with unbound methods registered as handlers for different message types in the protocol.

Example:

class MyClass: receiver = MyMessageReceiver()

# MyMessageReceiver fills out handler() overloads to ensure all
# registered handlers have valid types/return-types.

@receiver.handler
def handle_some_message_type(self, message: SomeMsg) -> SomeResponse:
    # Deal with this message type here.

This will trigger the registered handler being called.

obj = MyClass() obj.receiver.handle_raw_message(some_raw_data)

Any unhandled Exception occurring during message handling will result in an efro.error.RemoteError being raised on the sending end.

MessageReceiver(protocol: MessageProtocol)
57    def __init__(self, protocol: MessageProtocol) -> None:
58        self.protocol = protocol
59        self._handlers: dict[type[Message], Callable] = {}
60        self._decode_filter_call: (
61            Callable[[Any, dict, Message], None] | None
62        ) = None
63        self._encode_filter_call: (
64            Callable[[Any, Message | None, Response | SysResponse, dict], None]
65            | None
66        ) = None
is_async = False
protocol
def register_handler( self, call: Callable[[Any, Message], Response | None]) -> None:
 69    def register_handler(
 70        self, call: Callable[[Any, Message], Response | None]
 71    ) -> None:
 72        """Register a handler call.
 73
 74        The message type handled by the call is determined by its
 75        type annotation.
 76        """
 77        # TODO: can use types.GenericAlias in 3.9.
 78        # (hmm though now that we're there,  it seems a drop-in
 79        # replace gives us errors. Should re-test in 3.11 as it seems
 80        # that typing_extensions handles it differently in that case)
 81        from typing import _GenericAlias  # type: ignore
 82        from typing import get_type_hints, get_args
 83
 84        sig = inspect.getfullargspec(call)
 85
 86        # The provided callable should be a method taking one 'msg' arg.
 87        expectedsig = ['self', 'msg']
 88        if sig.args != expectedsig:
 89            raise ValueError(
 90                f'Expected callable signature of {expectedsig};'
 91                f' got {sig.args}'
 92            )
 93
 94        # Check annotation types to determine what message types we handle.
 95        # Return-type annotation can be a Union, but we probably don't
 96        # have it available at runtime. Explicitly pull it in.
 97        # UPDATE: we've updated our pylint filter to where we should
 98        # have all annotations available.
 99        # anns = get_type_hints(call, localns={'Union': Union})
100        anns = get_type_hints(call)
101
102        msgtype = anns.get('msg')
103        if not isinstance(msgtype, type):
104            raise TypeError(
105                f'expected a type for "msg" annotation; got {type(msgtype)}.'
106            )
107        assert issubclass(msgtype, Message)
108
109        ret = anns.get('return')
110        responsetypes: tuple[type[Any] | None, ...]
111
112        # Return types can be a single type or a union of types.
113        if isinstance(ret, (_GenericAlias, types.UnionType)):
114            targs = get_args(ret)
115            if not all(isinstance(a, (type, type(None))) for a in targs):
116                raise TypeError(
117                    f'expected only types for "return" annotation;'
118                    f' got {targs}.'
119                )
120            responsetypes = targs
121        else:
122            if not isinstance(ret, (type, type(None))):
123                raise TypeError(
124                    f'expected one or more types for'
125                    f' "return" annotation; got a {type(ret)}.'
126                )
127            # This seems like maybe a mypy bug. Appeared after adding
128            # types.UnionType above.
129            responsetypes = (ret,)
130
131        # This will contain NoneType for empty return cases, but
132        # we expect it to be None.
133        # noinspection PyPep8
134        responsetypes = tuple(
135            None if r is type(None) else r for r in responsetypes
136        )
137
138        # Make sure our protocol has this message type registered and our
139        # return types exactly match. (Technically we could return a subset
140        # of the supported types; can allow this in the future if it makes
141        # sense).
142        registered_types = self.protocol.message_ids_by_type.keys()
143
144        if msgtype not in registered_types:
145            raise TypeError(
146                f'Message type {msgtype} is not registered'
147                f' in this Protocol.'
148            )
149
150        if msgtype in self._handlers:
151            raise TypeError(
152                f'Message type {msgtype} already has a registered handler.'
153            )
154
155        # Make sure the responses exactly matches what the message expects.
156        if set(responsetypes) != set(msgtype.get_response_types()):
157            raise TypeError(
158                f'Provided response types {responsetypes} do not'
159                f' match the set expected by message type {msgtype}: '
160                f'({msgtype.get_response_types()})'
161            )
162
163        # Ok; we're good!
164        self._handlers[msgtype] = call

Register a handler call.

The message type handled by the call is determined by its type annotation.

def decode_filter_method( self, call: Callable[[Any, dict, Message], NoneType]) -> Callable[[Any, dict, Message], NoneType]:
166    def decode_filter_method(
167        self, call: Callable[[Any, dict, Message], None]
168    ) -> Callable[[Any, dict, Message], None]:
169        """Function decorator for defining a decode filter.
170
171        Decode filters can be used to extract extra data from incoming
172        message dicts. This version will work for both handle_raw_message()
173        and handle_raw_message_async()
174        """
175        assert self._decode_filter_call is None
176        self._decode_filter_call = call
177        return call

Function decorator for defining a decode filter.

Decode filters can be used to extract extra data from incoming message dicts. This version will work for both handle_raw_message() and handle_raw_message_async()

def encode_filter_method( self, call: Callable[[Any, Message | None, Response | SysResponse, dict], NoneType]) -> Callable[[Any, Message | None, Response, dict], NoneType]:
179    def encode_filter_method(
180        self,
181        call: Callable[
182            [Any, Message | None, Response | SysResponse, dict], None
183        ],
184    ) -> Callable[[Any, Message | None, Response, dict], None]:
185        """Function decorator for defining an encode filter.
186
187        Encode filters can be used to add extra data to the message
188        dict before is is encoded to a string and sent out.
189        """
190        assert self._encode_filter_call is None
191        self._encode_filter_call = call
192        return call

Function decorator for defining an encode filter.

Encode filters can be used to add extra data to the message dict before is is encoded to a string and sent out.

def validate(self, log_only: bool = False) -> None:
194    def validate(self, log_only: bool = False) -> None:
195        """Check for handler completeness, valid types, etc."""
196        for msgtype in self.protocol.message_ids_by_type.keys():
197            if issubclass(msgtype, Response):
198                continue
199            if msgtype not in self._handlers:
200                msg = (
201                    f'Protocol message type {msgtype} is not handled'
202                    f' by receiver type {type(self)}.'
203                )
204                if log_only:
205                    logging.error(msg)
206                else:
207                    raise TypeError(msg)

Check for handler completeness, valid types, etc.

def encode_user_response( self, bound_obj: Any, message: Message, response: Response | None) -> str:
226    def encode_user_response(
227        self, bound_obj: Any, message: Message, response: Response | None
228    ) -> str:
229        """Encode a response provided by the user for sending."""
230
231        assert isinstance(response, Response | None)
232        # (user should never explicitly return error-responses)
233        assert (
234            response is None or type(response) in message.get_response_types()
235        )
236
237        # A return value of None equals EmptySysResponse.
238        out_response: Response | SysResponse
239        if response is None:
240            out_response = EmptySysResponse()
241        else:
242            out_response = response
243
244        response_dict = self.protocol.response_to_dict(out_response)
245        if self._encode_filter_call is not None:
246            self._encode_filter_call(
247                bound_obj, message, out_response, response_dict
248            )
249        return self.protocol.encode_dict(response_dict)

Encode a response provided by the user for sending.

def encode_error_response( self, bound_obj: Any, message: Message | None, exc: Exception) -> tuple[str, bool]:
251    def encode_error_response(
252        self, bound_obj: Any, message: Message | None, exc: Exception
253    ) -> tuple[str, bool]:
254        """Given an error, return sysresponse str and whether to log."""
255        response, dolog = self.protocol.error_to_response(exc)
256        response_dict = self.protocol.response_to_dict(response)
257        if self._encode_filter_call is not None:
258            self._encode_filter_call(
259                bound_obj, message, response, response_dict
260            )
261        return self.protocol.encode_dict(response_dict), dolog

Given an error, return sysresponse str and whether to log.

def handle_raw_message(self, bound_obj: Any, msg: str, raise_unregistered: bool = False) -> str:
263    def handle_raw_message(
264        self, bound_obj: Any, msg: str, raise_unregistered: bool = False
265    ) -> str:
266        """Decode, handle, and return an response for a message.
267
268        if 'raise_unregistered' is True, will raise an
269        efro.message.UnregisteredMessageIDError for messages not handled by
270        the protocol. In all other cases local errors will translate to
271        error responses returned to the sender.
272        """
273        assert not self.is_async, "can't call sync handler on async receiver"
274        msg_decoded: Message | None = None
275        try:
276            msg_decoded = self._decode_incoming_message(bound_obj, msg)
277            msgtype = type(msg_decoded)
278            handler = self._handlers.get(msgtype)
279            if handler is None:
280                raise RuntimeError(f'Got unhandled message type: {msgtype}.')
281            response = handler(bound_obj, msg_decoded)
282            assert isinstance(response, Response | None)
283            return self.encode_user_response(bound_obj, msg_decoded, response)
284
285        except Exception as exc:
286            if raise_unregistered and isinstance(
287                exc, UnregisteredMessageIDError
288            ):
289                raise
290            rstr, dolog = self.encode_error_response(
291                bound_obj, msg_decoded, exc
292            )
293            if dolog:
294                if msg_decoded is not None:
295                    msgtype = type(msg_decoded)
296                    logging.exception(
297                        'Error handling %s.%s message.',
298                        msgtype.__module__,
299                        msgtype.__qualname__,
300                    )
301                else:
302                    logging.exception(
303                        'Error handling raw efro.message'
304                        ' (likely a message format incompatibility): %s.',
305                        msg,
306                    )
307            return rstr

Decode, handle, and return an response for a message.

if 'raise_unregistered' is True, will raise an efro.message.UnregisteredMessageIDError for messages not handled by the protocol. In all other cases local errors will translate to error responses returned to the sender.

def handle_raw_message_async( self, bound_obj: Any, msg: str, raise_unregistered: bool = False) -> Awaitable[str]:
309    def handle_raw_message_async(
310        self, bound_obj: Any, msg: str, raise_unregistered: bool = False
311    ) -> Awaitable[str]:
312        """Should be called when the receiver gets a message.
313
314        The return value is the raw response to the message.
315        """
316
317        # Note: This call is synchronous so that the first part of it can
318        # happen synchronously. If the whole call were async we wouldn't be
319        # able to guarantee that messages handlers would be called in the
320        # order the messages were received.
321
322        assert self.is_async, "Can't call async handler on sync receiver."
323        msg_decoded: Message | None = None
324        try:
325            msg_decoded = self._decode_incoming_message(bound_obj, msg)
326            msgtype = type(msg_decoded)
327            handler = self._handlers.get(msgtype)
328            if handler is None:
329                raise RuntimeError(f'Got unhandled message type: {msgtype}.')
330            handler_awaitable = handler(bound_obj, msg_decoded)
331
332        except Exception as exc:
333            if raise_unregistered and isinstance(
334                exc, UnregisteredMessageIDError
335            ):
336                raise
337            return self._handle_raw_message_async_error(
338                bound_obj, msg, msg_decoded, exc
339            )
340
341        # Return an awaitable to handle the rest asynchronously.
342        return self._handle_raw_message_async(
343            bound_obj, msg, msg_decoded, handler_awaitable
344        )

Should be called when the receiver gets a message.

The return value is the raw response to the message.

class BoundMessageReceiver:
400class BoundMessageReceiver:
401    """Base bound receiver class."""
402
403    def __init__(
404        self,
405        obj: Any,
406        receiver: MessageReceiver,
407    ) -> None:
408        assert obj is not None
409        self._obj = obj
410        self._receiver = receiver
411
412    @property
413    def protocol(self) -> MessageProtocol:
414        """Protocol associated with this receiver."""
415        return self._receiver.protocol
416
417    def encode_error_response(self, exc: Exception) -> str:
418        """Given an error, return a response ready to send.
419
420        This should be used for any errors that happen outside of
421        standard handle_raw_message calls. Any errors within those
422        calls will be automatically returned as encoded strings.
423        """
424        # Passing None for Message here; we would only have that available
425        # for things going wrong in the handler (which this is not for).
426        return self._receiver.encode_error_response(self._obj, None, exc)[0]

Base bound receiver class.

BoundMessageReceiver(obj: Any, receiver: MessageReceiver)
403    def __init__(
404        self,
405        obj: Any,
406        receiver: MessageReceiver,
407    ) -> None:
408        assert obj is not None
409        self._obj = obj
410        self._receiver = receiver
protocol: MessageProtocol
412    @property
413    def protocol(self) -> MessageProtocol:
414        """Protocol associated with this receiver."""
415        return self._receiver.protocol

Protocol associated with this receiver.

def encode_error_response(self, exc: Exception) -> str:
417    def encode_error_response(self, exc: Exception) -> str:
418        """Given an error, return a response ready to send.
419
420        This should be used for any errors that happen outside of
421        standard handle_raw_message calls. Any errors within those
422        calls will be automatically returned as encoded strings.
423        """
424        # Passing None for Message here; we would only have that available
425        # for things going wrong in the handler (which this is not for).
426        return self._receiver.encode_error_response(self._obj, None, exc)[0]

Given an error, return a response ready to send.

This should be used for any errors that happen outside of standard handle_raw_message calls. Any errors within those calls will be automatically returned as encoded strings.

def create_sender_module( basename: str, protocol_create_code: str, enable_sync_sends: bool, enable_async_sends: bool, private: bool = False, protocol_module_level_import_code: str | None = None, build_time_protocol_create_code: str | None = None) -> str:
18def create_sender_module(
19    basename: str,
20    protocol_create_code: str,
21    enable_sync_sends: bool,
22    enable_async_sends: bool,
23    private: bool = False,
24    protocol_module_level_import_code: str | None = None,
25    build_time_protocol_create_code: str | None = None,
26) -> str:
27    """Create a Python module defining a MessageSender subclass.
28
29    This class is primarily for type checking and will contain overrides
30    for the varieties of send calls for message/response types defined
31    in the protocol.
32
33    Code passed for 'protocol_create_code' should import necessary
34    modules and assign an instance of the Protocol to a 'protocol'
35    variable.
36
37    Class names are based on basename; a basename 'FooSender' will
38    result in classes FooSender and BoundFooSender.
39
40    If 'private' is True, class-names will be prefixed with an '_'.
41
42    Note: output code may have long lines and should generally be run
43    through a formatter. We should perhaps move this functionality to
44    efrotools so we can include that functionality inline.
45    """
46    protocol = _protocol_from_code(
47        build_time_protocol_create_code
48        if build_time_protocol_create_code is not None
49        else protocol_create_code
50    )
51    return protocol.do_create_sender_module(
52        basename=basename,
53        protocol_create_code=protocol_create_code,
54        enable_sync_sends=enable_sync_sends,
55        enable_async_sends=enable_async_sends,
56        private=private,
57        protocol_module_level_import_code=protocol_module_level_import_code,
58    )

Create a Python module defining a MessageSender subclass.

This class is primarily for type checking and will contain overrides for the varieties of send calls for message/response types defined in the protocol.

Code passed for 'protocol_create_code' should import necessary modules and assign an instance of the Protocol to a 'protocol' variable.

Class names are based on basename; a basename 'FooSender' will result in classes FooSender and BoundFooSender.

If 'private' is True, class-names will be prefixed with an '_'.

Note: output code may have long lines and should generally be run through a formatter. We should perhaps move this functionality to efrotools so we can include that functionality inline.

def create_receiver_module( basename: str, protocol_create_code: str, is_async: bool, private: bool = False, protocol_module_level_import_code: str | None = None, build_time_protocol_create_code: str | None = None) -> str:
61def create_receiver_module(
62    basename: str,
63    protocol_create_code: str,
64    is_async: bool,
65    private: bool = False,
66    protocol_module_level_import_code: str | None = None,
67    build_time_protocol_create_code: str | None = None,
68) -> str:
69    """ "Create a Python module defining a MessageReceiver subclass.
70
71    This class is primarily for type checking and will contain overrides
72    for the register method for message/response types defined in
73    the protocol.
74
75    Class names are based on basename; a basename 'FooReceiver' will
76    result in FooReceiver and BoundFooReceiver.
77
78    If 'is_async' is True, handle_raw_message() will be an async method
79    and the @handler decorator will expect async methods.
80
81    If 'private' is True, class-names will be prefixed with an '_'.
82
83    Note that line lengths are not clipped, so output may need to be
84    run through a formatter to prevent lint warnings about excessive
85    line lengths.
86    """
87    protocol = _protocol_from_code(
88        build_time_protocol_create_code
89        if build_time_protocol_create_code is not None
90        else protocol_create_code
91    )
92    return protocol.do_create_receiver_module(
93        basename=basename,
94        protocol_create_code=protocol_create_code,
95        is_async=is_async,
96        private=private,
97        protocol_module_level_import_code=protocol_module_level_import_code,
98    )

"Create a Python module defining a MessageReceiver subclass.

This class is primarily for type checking and will contain overrides for the register method for message/response types defined in the protocol.

Class names are based on basename; a basename 'FooReceiver' will result in FooReceiver and BoundFooReceiver.

If 'is_async' is True, handle_raw_message() will be an async method and the @handler decorator will expect async methods.

If 'private' is True, class-names will be prefixed with an '_'.

Note that line lengths are not clipped, so output may need to be run through a formatter to prevent lint warnings about excessive line lengths.

class UnregisteredMessageIDError(builtins.Exception):
20class UnregisteredMessageIDError(Exception):
21    """A message or response id is not covered by our protocol."""

A message or response id is not covered by our protocol.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
add_note
args