efro.message

Functionality for sending and responding to messages. Supports static typing for message types and possible return types.

 1# Released under the MIT License. See LICENSE for details.
 2#
 3"""Functionality for sending and responding to messages.
 4Supports static typing for message types and possible return types.
 5"""
 6
 7from __future__ import annotations
 8
 9from efro.util import set_canonical_module_names
10from efro.message._protocol import MessageProtocol
11from efro.message._sender import MessageSender, BoundMessageSender
12from efro.message._receiver import MessageReceiver, BoundMessageReceiver
13from efro.message._module import create_sender_module, create_receiver_module
14from efro.message._message import (
15    Message,
16    Response,
17    SysResponse,
18    EmptySysResponse,
19    ErrorSysResponse,
20    StringResponse,
21    BoolResponse,
22    UnregisteredMessageIDError,
23)
24
25__all__ = [
26    'Message',
27    'Response',
28    'SysResponse',
29    'EmptySysResponse',
30    'ErrorSysResponse',
31    'StringResponse',
32    'BoolResponse',
33    'MessageProtocol',
34    'MessageSender',
35    'BoundMessageSender',
36    'MessageReceiver',
37    'BoundMessageReceiver',
38    'create_sender_module',
39    'create_receiver_module',
40    'UnregisteredMessageIDError',
41]
42
43# Have these things present themselves cleanly as 'thismodule.SomeClass'
44# instead of 'thismodule._internalmodule.SomeClass'
45set_canonical_module_names(globals())
class Message:
24class Message:
25    """Base class for messages."""
26
27    @classmethod
28    def get_response_types(cls) -> list[type[Response] | None]:
29        """Return all Response types this Message can return when sent.
30
31        The default implementation specifies a None return type.
32        """
33        return [None]

Base class for messages.

@classmethod
def get_response_types(cls) -> list[type[Response] | None]:
27    @classmethod
28    def get_response_types(cls) -> list[type[Response] | None]:
29        """Return all Response types this Message can return when sent.
30
31        The default implementation specifies a None return type.
32        """
33        return [None]

Return all Response types this Message can return when sent.

The default implementation specifies a None return type.

class Response:
36class Response:
37    """Base class for responses to messages."""

Base class for responses to messages.

class SysResponse:
40class SysResponse:
41    """Base class for system-responses to messages.
42
43    These are only sent/handled by the messaging system itself;
44    users of the api never see them.
45    """
46
47    def set_local_exception(self, exc: Exception) -> None:
48        """Attach a local exception to facilitate better logging/handling.
49
50        Be aware that this data does not get serialized and only
51        exists on the local object.
52        """
53        setattr(self, '_sr_local_exception', exc)
54
55    def get_local_exception(self) -> Exception | None:
56        """Fetch a local attached exception."""
57        value = getattr(self, '_sr_local_exception', None)
58        assert isinstance(value, Exception | None)
59        return value

Base class for system-responses to messages.

These are only sent/handled by the messaging system itself; users of the api never see them.

def set_local_exception(self, exc: Exception) -> None:
47    def set_local_exception(self, exc: Exception) -> None:
48        """Attach a local exception to facilitate better logging/handling.
49
50        Be aware that this data does not get serialized and only
51        exists on the local object.
52        """
53        setattr(self, '_sr_local_exception', exc)

Attach a local exception to facilitate better logging/handling.

Be aware that this data does not get serialized and only exists on the local object.

def get_local_exception(self) -> Exception | None:
55    def get_local_exception(self) -> Exception | None:
56        """Fetch a local attached exception."""
57        value = getattr(self, '_sr_local_exception', None)
58        assert isinstance(value, Exception | None)
59        return value

Fetch a local attached exception.

@ioprepped
@dataclass
class EmptySysResponse(efro.message.SysResponse):
86@ioprepped
87@dataclass
88class EmptySysResponse(SysResponse):
89    """The response equivalent of None."""

The response equivalent of None.

@ioprepped
@dataclass
class ErrorSysResponse(efro.message.SysResponse):
65@ioprepped
66@dataclass
67class ErrorSysResponse(SysResponse):
68    """SysResponse saying some error has occurred for the send.
69
70    This generally results in an Exception being raised for the caller.
71    """
72
73    class ErrorType(Enum):
74        """Type of error that occurred while sending a message."""
75
76        REMOTE = 0
77        REMOTE_CLEAN = 1
78        LOCAL = 2
79        COMMUNICATION = 3
80        REMOTE_COMMUNICATION = 4
81
82    error_message: Annotated[str, IOAttrs('m')]
83    error_type: Annotated[ErrorType, IOAttrs('e')] = ErrorType.REMOTE

SysResponse saying some error has occurred for the send.

This generally results in an Exception being raised for the caller.

ErrorSysResponse( error_message: Annotated[str, <efro.dataclassio._base.IOAttrs object>], error_type: Annotated[ErrorSysResponse.ErrorType, <efro.dataclassio._base.IOAttrs object>] = <ErrorType.REMOTE: 0>)
error_message: Annotated[str, <efro.dataclassio._base.IOAttrs object at 0x1195e2270>]
error_type: Annotated[ErrorSysResponse.ErrorType, <efro.dataclassio._base.IOAttrs object at 0x1195e2450>] = <ErrorType.REMOTE: 0>
class ErrorSysResponse.ErrorType(enum.Enum):
73    class ErrorType(Enum):
74        """Type of error that occurred while sending a message."""
75
76        REMOTE = 0
77        REMOTE_CLEAN = 1
78        LOCAL = 2
79        COMMUNICATION = 3
80        REMOTE_COMMUNICATION = 4

Type of error that occurred while sending a message.

REMOTE = <ErrorType.REMOTE: 0>
REMOTE_CLEAN = <ErrorType.REMOTE_CLEAN: 1>
LOCAL = <ErrorType.LOCAL: 2>
COMMUNICATION = <ErrorType.COMMUNICATION: 3>
REMOTE_COMMUNICATION = <ErrorType.REMOTE_COMMUNICATION: 4>
Inherited Members
enum.Enum
name
value
@ioprepped
@dataclass
class StringResponse(efro.message.Response):
104@ioprepped
105@dataclass
106class StringResponse(Response):
107    """A simple string value response."""
108
109    value: Annotated[str, IOAttrs('v')]

A simple string value response.

StringResponse(value: Annotated[str, <efro.dataclassio._base.IOAttrs object>])
value: Annotated[str, <efro.dataclassio._base.IOAttrs object at 0x1196002c0>]
@ioprepped
@dataclass
class BoolResponse(efro.message.Response):
 96@ioprepped
 97@dataclass
 98class BoolResponse(Response):
 99    """A simple bool value response."""
100
101    value: Annotated[bool, IOAttrs('v')]

A simple bool value response.

BoolResponse(value: Annotated[bool, <efro.dataclassio._base.IOAttrs object>])
value: Annotated[bool, <efro.dataclassio._base.IOAttrs object at 0x119600350>]
class MessageProtocol:
 33class MessageProtocol:
 34    """Wrangles a set of message types, formats, and response types.
 35    Both endpoints must be using a compatible Protocol for communication
 36    to succeed. To maintain Protocol compatibility between revisions,
 37    all message types must retain the same id, message attr storage
 38    names must not change, newly added attrs must have default values,
 39    etc.
 40    """
 41
 42    def __init__(
 43        self,
 44        message_types: dict[int, type[Message]],
 45        response_types: dict[int, type[Response]],
 46        forward_communication_errors: bool = False,
 47        forward_clean_errors: bool = False,
 48        remote_errors_include_stack_traces: bool = False,
 49        log_remote_errors: bool = True,
 50    ) -> None:
 51        """Create a protocol with a given configuration.
 52
 53        If 'forward_communication_errors' is True,
 54        efro.error.CommunicationErrors raised on the receiver end will
 55        result in a matching error raised back on the sender. This can
 56        be useful if the receiver will be in some way forwarding
 57        messages along and the sender doesn't need to know where
 58        communication breakdowns occurred; only that they did.
 59
 60        If 'forward_clean_errors' is True, efro.error.CleanError
 61        exceptions raised on the receiver end will result in a matching
 62        CleanError raised back on the sender.
 63
 64        When an exception is not covered by the optional forwarding
 65        mechanisms above, it will come across as efro.error.RemoteError
 66        and the exception will be logged on the receiver
 67        end - at least by default (see details below).
 68
 69        If 'remote_errors_include_stack_traces' is True, stringified
 70        stack traces will be returned with efro.error.RemoteError
 71        exceptions. This is useful for debugging but should only be
 72        enabled in cases where the sender is trusted to see internal
 73        details of the receiver.
 74
 75        By default, when a message-handling exception will result in an
 76        efro.error.RemoteError being returned to the sender, the
 77        exception will be logged on the receiver. This is because the
 78        goal is usually to avoid returning opaque RemoteErrors and to
 79        instead return something meaningful as part of the expected
 80        response type (even if that value itself represents a logical
 81        error state). If 'log_remote_errors' is False, however, such
 82        exceptions will not be logged on the receiver. This can be
 83        useful in combination with 'remote_errors_include_stack_traces'
 84        and 'forward_clean_errors' in situations where all error
 85        logging/management will be happening on the sender end. Be
 86        aware, however, that in that case it may be possible for
 87        communication errors to prevent such error messages from
 88        ever being seen.
 89        """
 90        # pylint: disable=too-many-locals
 91        self.message_types_by_id: dict[int, type[Message]] = {}
 92        self.message_ids_by_type: dict[type[Message], int] = {}
 93        self.response_types_by_id: dict[
 94            int, type[Response] | type[SysResponse]
 95        ] = {}
 96        self.response_ids_by_type: dict[
 97            type[Response] | type[SysResponse], int
 98        ] = {}
 99        for m_id, m_type in message_types.items():
100            # Make sure only valid message types were passed and each
101            # id was assigned only once.
102            assert isinstance(m_id, int)
103            assert m_id >= 0
104            assert is_ioprepped_dataclass(m_type) and issubclass(
105                m_type, Message
106            )
107            assert self.message_types_by_id.get(m_id) is None
108            self.message_types_by_id[m_id] = m_type
109            self.message_ids_by_type[m_type] = m_id
110
111        for r_id, r_type in response_types.items():
112            assert isinstance(r_id, int)
113            assert r_id >= 0
114            assert is_ioprepped_dataclass(r_type) and issubclass(
115                r_type, Response
116            )
117            assert self.response_types_by_id.get(r_id) is None
118            self.response_types_by_id[r_id] = r_type
119            self.response_ids_by_type[r_type] = r_id
120
121        # Register our SysResponse types. These use negative
122        # IDs so as to never overlap with user Response types.
123        def _reg_sys(reg_tp: type[SysResponse], reg_id: int) -> None:
124            assert self.response_types_by_id.get(reg_id) is None
125            self.response_types_by_id[reg_id] = reg_tp
126            self.response_ids_by_type[reg_tp] = reg_id
127
128        _reg_sys(ErrorSysResponse, -1)
129        _reg_sys(EmptySysResponse, -2)
130
131        # Some extra-thorough validation in debug mode.
132        if __debug__:
133            # Make sure all Message types' return types are valid
134            # and have been assigned an ID as well.
135            all_response_types: set[type[Response] | None] = set()
136            for m_id, m_type in message_types.items():
137                m_rtypes = m_type.get_response_types()
138
139                assert isinstance(m_rtypes, list)
140                assert (
141                    m_rtypes
142                ), f'Message type {m_type} specifies no return types.'
143                assert len(set(m_rtypes)) == len(m_rtypes)  # check dups
144                for m_rtype in m_rtypes:
145                    all_response_types.add(m_rtype)
146            for cls in all_response_types:
147                if cls is None:
148                    continue
149                assert is_ioprepped_dataclass(cls)
150                assert issubclass(cls, Response)
151                if cls not in self.response_ids_by_type:
152                    raise ValueError(
153                        f'Possible response type {cls} needs to be included'
154                        f' in response_types for this protocol.'
155                    )
156
157            # Make sure all registered types have unique base names.
158            # We can take advantage of this to generate cleaner looking
159            # protocol modules. Can revisit if this is ever a problem.
160            mtypenames = set(tp.__name__ for tp in self.message_ids_by_type)
161            if len(mtypenames) != len(message_types):
162                raise ValueError(
163                    'message_types contains duplicate __name__s;'
164                    ' all types are required to have unique names.'
165                )
166
167        self.forward_clean_errors = forward_clean_errors
168        self.forward_communication_errors = forward_communication_errors
169        self.remote_errors_include_stack_traces = (
170            remote_errors_include_stack_traces
171        )
172        self.log_remote_errors = log_remote_errors
173
174    @staticmethod
175    def encode_dict(obj: dict) -> str:
176        """Json-encode a provided dict."""
177        return json.dumps(obj, separators=(',', ':'))
178
179    def message_to_dict(self, message: Message) -> dict:
180        """Encode a message to a json ready dict."""
181        return self._to_dict(message, self.message_ids_by_type, 'message')
182
183    def response_to_dict(self, response: Response | SysResponse) -> dict:
184        """Encode a response to a json ready dict."""
185        return self._to_dict(response, self.response_ids_by_type, 'response')
186
187    def error_to_response(self, exc: Exception) -> tuple[SysResponse, bool]:
188        """Translate an Exception to a SysResponse.
189
190        Also returns whether the error should be logged if this happened
191        within handle_raw_message().
192        """
193
194        # If anything goes wrong, return a ErrorSysResponse instead.
195        # (either CLEAN or generic REMOTE)
196        if self.forward_clean_errors and isinstance(exc, CleanError):
197            return (
198                ErrorSysResponse(
199                    error_message=str(exc),
200                    error_type=ErrorSysResponse.ErrorType.REMOTE_CLEAN,
201                ),
202                False,
203            )
204        if self.forward_communication_errors and isinstance(
205            exc, CommunicationError
206        ):
207            return (
208                ErrorSysResponse(
209                    error_message=str(exc),
210                    error_type=ErrorSysResponse.ErrorType.REMOTE_COMMUNICATION,
211                ),
212                False,
213            )
214        return (
215            ErrorSysResponse(
216                error_message=(
217                    traceback.format_exc()
218                    if self.remote_errors_include_stack_traces
219                    else 'An internal error has occurred.'
220                ),
221                error_type=ErrorSysResponse.ErrorType.REMOTE,
222            ),
223            self.log_remote_errors,
224        )
225
226    def _to_dict(
227        self, message: Any, ids_by_type: dict[type, int], opname: str
228    ) -> dict:
229        """Encode a message to a json string for transport."""
230
231        m_id: int | None = ids_by_type.get(type(message))
232        if m_id is None:
233            raise TypeError(
234                f'{opname} type is not registered in protocol:'
235                f' {type(message)}'
236            )
237        out = {'t': m_id, 'm': dataclass_to_dict(message)}
238        return out
239
240    @staticmethod
241    def decode_dict(data: str) -> dict:
242        """Decode data to a dict."""
243        out = json.loads(data)
244        assert isinstance(out, dict)
245        return out
246
247    def message_from_dict(self, data: dict) -> Message:
248        """Decode a message from a json string."""
249        out = self._from_dict(data, self.message_types_by_id, 'message')
250        assert isinstance(out, Message)
251        return out
252
253    def response_from_dict(self, data: dict) -> Response | SysResponse:
254        """Decode a response from a json string."""
255        out = self._from_dict(data, self.response_types_by_id, 'response')
256        assert isinstance(out, Response | SysResponse)
257        return out
258
259    # Weeeird; we get mypy errors returning dict[int, type] but
260    # dict[int, typing.Type] or dict[int, type[Any]] works..
261    def _from_dict(
262        self, data: dict, types_by_id: dict[int, type[Any]], opname: str
263    ) -> Any:
264        """Decode a message from a json string."""
265        msgdict: dict | None
266
267        m_id = data.get('t')
268        # Allow omitting 'm' dict if its empty.
269        msgdict = data.get('m', {})
270
271        assert isinstance(m_id, int)
272        assert isinstance(msgdict, dict)
273
274        # Decode this particular type.
275        msgtype = types_by_id.get(m_id)
276        if msgtype is None:
277            raise UnregisteredMessageIDError(
278                f'Got unregistered {opname} id of {m_id}.'
279            )
280        return dataclass_from_dict(msgtype, msgdict)
281
282    def _get_module_header(
283        self,
284        part: Literal['sender', 'receiver'],
285        extra_import_code: str | None,
286        enable_async_sends: bool,
287    ) -> str:
288        """Return common parts of generated modules."""
289        # pylint: disable=too-many-locals
290        # pylint: disable=too-many-branches
291        # pylint: disable=too-many-statements
292        import textwrap
293
294        tpimports: dict[str, list[str]] = {}
295        imports: dict[str, list[str]] = {}
296
297        single_message_type = len(self.message_ids_by_type) == 1
298
299        msgtypes = list(self.message_ids_by_type)
300        if part == 'sender':
301            msgtypes.append(Message)
302        for msgtype in msgtypes:
303            tpimports.setdefault(msgtype.__module__, []).append(
304                msgtype.__name__
305            )
306        rsptypes = list(self.response_ids_by_type)
307        if part == 'sender':
308            rsptypes.append(Response)
309        for rsp_tp in rsptypes:
310            # Skip these as they don't actually show up in code.
311            if rsp_tp is EmptySysResponse or rsp_tp is ErrorSysResponse:
312                continue
313            if (
314                single_message_type
315                and part == 'sender'
316                and rsp_tp is not Response
317            ):
318                # We need to cast to the single supported response type
319                # in this case so need response types at runtime.
320                imports.setdefault(rsp_tp.__module__, []).append(
321                    rsp_tp.__name__
322                )
323            else:
324                tpimports.setdefault(rsp_tp.__module__, []).append(
325                    rsp_tp.__name__
326                )
327
328        import_lines = ''
329        tpimport_lines = ''
330
331        for module, names in sorted(imports.items()):
332            jnames = ', '.join(names)
333            line = f'from {module} import {jnames}'
334            if len(line) > 80:
335                # Recreate in a wrapping-friendly form.
336                line = f'from {module} import ({jnames})'
337            import_lines += f'{line}\n'
338        for module, names in sorted(tpimports.items()):
339            jnames = ', '.join(names)
340            line = f'from {module} import {jnames}'
341            if len(line) > 75:  # Account for indent
342                # Recreate in a wrapping-friendly form.
343                line = f'from {module} import ({jnames})'
344            tpimport_lines += f'{line}\n'
345
346        if part == 'sender':
347            import_lines += (
348                'from efro.message import MessageSender, BoundMessageSender\n'
349            )
350            tpimport_typing_extras = ''
351        else:
352            if single_message_type:
353                import_lines += (
354                    'from efro.message import (MessageReceiver,'
355                    ' BoundMessageReceiver, Message, Response)\n'
356                )
357            else:
358                import_lines += (
359                    'from efro.message import MessageReceiver,'
360                    ' BoundMessageReceiver\n'
361                )
362            tpimport_typing_extras = ', Awaitable'
363
364        if extra_import_code is not None:
365            import_lines += extra_import_code
366
367        ovld = ', overload' if not single_message_type else ''
368        ovld2 = (
369            ', cast, Awaitable'
370            if (single_message_type and part == 'sender' and enable_async_sends)
371            else ''
372        )
373        tpimport_lines = textwrap.indent(tpimport_lines, '    ')
374
375        baseimps = ['Any']
376        if part == 'receiver':
377            baseimps.append('Callable')
378        if part == 'sender' and enable_async_sends:
379            baseimps.append('Awaitable')
380        baseimps_s = ', '.join(baseimps)
381        out = (
382            '# Released under the MIT License. See LICENSE for details.\n'
383            f'#\n'
384            f'"""Auto-generated {part} module. Do not edit by hand."""\n'
385            f'\n'
386            f'from __future__ import annotations\n'
387            f'\n'
388            f'from typing import TYPE_CHECKING{ovld}{ovld2}\n'
389            f'\n'
390            f'{import_lines}'
391            f'\n'
392            f'if TYPE_CHECKING:\n'
393            f'    from typing import {baseimps_s}'
394            f'{tpimport_typing_extras}\n'
395            f'{tpimport_lines}'
396            f'\n'
397            f'\n'
398        )
399        return out
400
401    def do_create_sender_module(
402        self,
403        basename: str,
404        protocol_create_code: str,
405        enable_sync_sends: bool,
406        enable_async_sends: bool,
407        private: bool = False,
408        protocol_module_level_import_code: str | None = None,
409    ) -> str:
410        """Used by create_sender_module(); do not call directly."""
411        # pylint: disable=too-many-locals
412        # pylint: disable=too-many-branches
413        import textwrap
414
415        msgtypes = list(self.message_ids_by_type.keys())
416
417        ppre = '_' if private else ''
418        out = self._get_module_header(
419            'sender',
420            extra_import_code=protocol_module_level_import_code,
421            enable_async_sends=enable_async_sends,
422        )
423        ccind = textwrap.indent(protocol_create_code, '        ')
424        out += (
425            f'class {ppre}{basename}(MessageSender):\n'
426            f'    """Protocol-specific sender."""\n'
427            f'\n'
428            f'    def __init__(self) -> None:\n'
429            f'{ccind}\n'
430            f'        super().__init__(protocol)\n'
431            f'\n'
432            f'    def __get__(\n'
433            f'        self, obj: Any, type_in: Any = None\n'
434            f'    ) -> {ppre}Bound{basename}:\n'
435            f'        return {ppre}Bound{basename}(obj, self)\n'
436            f'\n'
437            f'\n'
438            f'class {ppre}Bound{basename}(BoundMessageSender):\n'
439            f'    """Protocol-specific bound sender."""\n'
440        )
441
442        def _filt_tp_name(rtype: type[Response] | None) -> str:
443            return 'None' if rtype is None else rtype.__name__
444
445        # Define handler() overloads for all registered message types.
446        if msgtypes:
447            for async_pass in False, True:
448                if async_pass and not enable_async_sends:
449                    continue
450                if not async_pass and not enable_sync_sends:
451                    continue
452                pfx = 'async ' if async_pass else ''
453                sfx = '_async' if async_pass else ''
454                # awt = 'await ' if async_pass else ''
455                awt = ''
456                how = 'asynchronously' if async_pass else 'synchronously'
457
458                if len(msgtypes) == 1:
459                    # Special case: with a single message types we don't
460                    # use overloads.
461                    msgtype = msgtypes[0]
462                    msgtypevar = msgtype.__name__
463                    rtypes = msgtype.get_response_types()
464                    if len(rtypes) > 1:
465                        rtypevar = ' | '.join(_filt_tp_name(t) for t in rtypes)
466                    else:
467                        rtypevar = _filt_tp_name(rtypes[0])
468                    if async_pass:
469                        rtypevar = f'Awaitable[{rtypevar}]'
470                    out += (
471                        f'\n'
472                        f'    def send{sfx}(self,'
473                        f' message: {msgtypevar})'
474                        f' -> {rtypevar}:\n'
475                        f'        """Send a message {how}."""\n'
476                        f'        out = {awt}self._sender.'
477                        f'send{sfx}(self._obj, message)\n'
478                    )
479                    if not async_pass:
480                        out += (
481                            f'        assert isinstance(out, {rtypevar})\n'
482                            '        return out\n'
483                        )
484                    else:
485                        out += f'        return cast({rtypevar}, out)\n'
486
487                else:
488                    for msgtype in msgtypes:
489                        msgtypevar = msgtype.__name__
490                        rtypes = msgtype.get_response_types()
491                        if len(rtypes) > 1:
492                            rtypevar = ' | '.join(
493                                _filt_tp_name(t) for t in rtypes
494                            )
495                        else:
496                            rtypevar = _filt_tp_name(rtypes[0])
497                        out += (
498                            f'\n'
499                            f'    @overload\n'
500                            f'    {pfx}def send{sfx}(self,'
501                            f' message: {msgtypevar})'
502                            f' -> {rtypevar}: ...\n'
503                        )
504                    rtypevar = 'Response | None'
505                    if async_pass:
506                        rtypevar = f'Awaitable[{rtypevar}]'
507                    out += (
508                        f'\n'
509                        f'    def send{sfx}(self, message: Message)'
510                        f' -> {rtypevar}:\n'
511                        f'        """Send a message {how}."""\n'
512                        f'        return {awt}self._sender.'
513                        f'send{sfx}(self._obj, message)\n'
514                    )
515
516        return out
517
518    def do_create_receiver_module(
519        self,
520        basename: str,
521        protocol_create_code: str,
522        is_async: bool,
523        private: bool = False,
524        protocol_module_level_import_code: str | None = None,
525    ) -> str:
526        """Used by create_receiver_module(); do not call directly."""
527        # pylint: disable=too-many-locals
528        import textwrap
529
530        desc = 'asynchronous' if is_async else 'synchronous'
531        ppre = '_' if private else ''
532        msgtypes = list(self.message_ids_by_type.keys())
533        out = self._get_module_header(
534            'receiver',
535            extra_import_code=protocol_module_level_import_code,
536            enable_async_sends=False,
537        )
538        ccind = textwrap.indent(protocol_create_code, '        ')
539        out += (
540            f'class {ppre}{basename}(MessageReceiver):\n'
541            f'    """Protocol-specific {desc} receiver."""\n'
542            f'\n'
543            f'    is_async = {is_async}\n'
544            f'\n'
545            f'    def __init__(self) -> None:\n'
546            f'{ccind}\n'
547            f'        super().__init__(protocol)\n'
548            f'\n'
549            f'    def __get__(\n'
550            f'        self,\n'
551            f'        obj: Any,\n'
552            f'        type_in: Any = None,\n'
553            f'    ) -> {ppre}Bound{basename}:\n'
554            f'        return {ppre}Bound{basename}('
555            f'obj, self)\n'
556        )
557
558        # Define handler() overloads for all registered message types.
559
560        def _filt_tp_name(rtype: type[Response] | None) -> str:
561            return 'None' if rtype is None else rtype.__name__
562
563        if msgtypes:
564            cbgn = 'Awaitable[' if is_async else ''
565            cend = ']' if is_async else ''
566            if len(msgtypes) == 1:
567                # Special case: when we have a single message type we don't
568                # use overloads.
569                msgtype = msgtypes[0]
570                msgtypevar = msgtype.__name__
571                rtypes = msgtype.get_response_types()
572                if len(rtypes) > 1:
573                    rtypevar = ' | '.join(_filt_tp_name(t) for t in rtypes)
574                else:
575                    rtypevar = _filt_tp_name(rtypes[0])
576                rtypevar = f'{cbgn}{rtypevar}{cend}'
577                out += (
578                    f'\n'
579                    f'    def handler(\n'
580                    f'        self,\n'
581                    f'        call: Callable[[Any, {msgtypevar}], '
582                    f'{rtypevar}],\n'
583                    f'    )'
584                    f' -> Callable[[Any, {msgtypevar}], {rtypevar}]:\n'
585                    f'        """Decorator to register message handlers."""\n'
586                    f'        from typing import cast, Callable, Any\n'
587                    f'\n'
588                    f'        self.register_handler(cast(Callable'
589                    f'[[Any, Message], Response], call))\n'
590                    f'        return call\n'
591                )
592            else:
593                for msgtype in msgtypes:
594                    msgtypevar = msgtype.__name__
595                    rtypes = msgtype.get_response_types()
596                    if len(rtypes) > 1:
597                        rtypevar = ' | '.join(_filt_tp_name(t) for t in rtypes)
598                    else:
599                        rtypevar = _filt_tp_name(rtypes[0])
600                    rtypevar = f'{cbgn}{rtypevar}{cend}'
601                    out += (
602                        f'\n'
603                        f'    @overload\n'
604                        f'    def handler(\n'
605                        f'        self,\n'
606                        f'        call: Callable[[Any, {msgtypevar}], '
607                        f'{rtypevar}],\n'
608                        f'    )'
609                        f' -> Callable[[Any, {msgtypevar}], {rtypevar}]: ...\n'
610                    )
611                out += (
612                    '\n'
613                    '    def handler(self, call: Callable) -> Callable:\n'
614                    '        """Decorator to register message handlers."""\n'
615                    '        self.register_handler(call)\n'
616                    '        return call\n'
617                )
618
619        out += (
620            f'\n'
621            f'\n'
622            f'class {ppre}Bound{basename}(BoundMessageReceiver):\n'
623            f'    """Protocol-specific bound receiver."""\n'
624        )
625        if is_async:
626            out += (
627                '\n'
628                '    def handle_raw_message(\n'
629                '        self, message: str, raise_unregistered: bool = False\n'
630                '    ) -> Awaitable[str]:\n'
631                '        """Asynchronously handle a raw incoming message."""\n'
632                '        return self._receiver.'
633                'handle_raw_message_async(\n'
634                '            self._obj, message, raise_unregistered\n'
635                '        )\n'
636            )
637
638        else:
639            out += (
640                '\n'
641                '    def handle_raw_message(\n'
642                '        self, message: str, raise_unregistered: bool = False\n'
643                '    ) -> str:\n'
644                '        """Synchronously handle a raw incoming message."""\n'
645                '        return self._receiver.handle_raw_message(\n'
646                '            self._obj, message, raise_unregistered\n'
647                '        )\n'
648            )
649
650        return out

Wrangles a set of message types, formats, and response types. Both endpoints must be using a compatible Protocol for communication to succeed. To maintain Protocol compatibility between revisions, all message types must retain the same id, message attr storage names must not change, newly added attrs must have default values, etc.

MessageProtocol( message_types: dict[int, type[Message]], response_types: dict[int, type[Response]], forward_communication_errors: bool = False, forward_clean_errors: bool = False, remote_errors_include_stack_traces: bool = False, log_remote_errors: bool = True)
 42    def __init__(
 43        self,
 44        message_types: dict[int, type[Message]],
 45        response_types: dict[int, type[Response]],
 46        forward_communication_errors: bool = False,
 47        forward_clean_errors: bool = False,
 48        remote_errors_include_stack_traces: bool = False,
 49        log_remote_errors: bool = True,
 50    ) -> None:
 51        """Create a protocol with a given configuration.
 52
 53        If 'forward_communication_errors' is True,
 54        efro.error.CommunicationErrors raised on the receiver end will
 55        result in a matching error raised back on the sender. This can
 56        be useful if the receiver will be in some way forwarding
 57        messages along and the sender doesn't need to know where
 58        communication breakdowns occurred; only that they did.
 59
 60        If 'forward_clean_errors' is True, efro.error.CleanError
 61        exceptions raised on the receiver end will result in a matching
 62        CleanError raised back on the sender.
 63
 64        When an exception is not covered by the optional forwarding
 65        mechanisms above, it will come across as efro.error.RemoteError
 66        and the exception will be logged on the receiver
 67        end - at least by default (see details below).
 68
 69        If 'remote_errors_include_stack_traces' is True, stringified
 70        stack traces will be returned with efro.error.RemoteError
 71        exceptions. This is useful for debugging but should only be
 72        enabled in cases where the sender is trusted to see internal
 73        details of the receiver.
 74
 75        By default, when a message-handling exception will result in an
 76        efro.error.RemoteError being returned to the sender, the
 77        exception will be logged on the receiver. This is because the
 78        goal is usually to avoid returning opaque RemoteErrors and to
 79        instead return something meaningful as part of the expected
 80        response type (even if that value itself represents a logical
 81        error state). If 'log_remote_errors' is False, however, such
 82        exceptions will not be logged on the receiver. This can be
 83        useful in combination with 'remote_errors_include_stack_traces'
 84        and 'forward_clean_errors' in situations where all error
 85        logging/management will be happening on the sender end. Be
 86        aware, however, that in that case it may be possible for
 87        communication errors to prevent such error messages from
 88        ever being seen.
 89        """
 90        # pylint: disable=too-many-locals
 91        self.message_types_by_id: dict[int, type[Message]] = {}
 92        self.message_ids_by_type: dict[type[Message], int] = {}
 93        self.response_types_by_id: dict[
 94            int, type[Response] | type[SysResponse]
 95        ] = {}
 96        self.response_ids_by_type: dict[
 97            type[Response] | type[SysResponse], int
 98        ] = {}
 99        for m_id, m_type in message_types.items():
100            # Make sure only valid message types were passed and each
101            # id was assigned only once.
102            assert isinstance(m_id, int)
103            assert m_id >= 0
104            assert is_ioprepped_dataclass(m_type) and issubclass(
105                m_type, Message
106            )
107            assert self.message_types_by_id.get(m_id) is None
108            self.message_types_by_id[m_id] = m_type
109            self.message_ids_by_type[m_type] = m_id
110
111        for r_id, r_type in response_types.items():
112            assert isinstance(r_id, int)
113            assert r_id >= 0
114            assert is_ioprepped_dataclass(r_type) and issubclass(
115                r_type, Response
116            )
117            assert self.response_types_by_id.get(r_id) is None
118            self.response_types_by_id[r_id] = r_type
119            self.response_ids_by_type[r_type] = r_id
120
121        # Register our SysResponse types. These use negative
122        # IDs so as to never overlap with user Response types.
123        def _reg_sys(reg_tp: type[SysResponse], reg_id: int) -> None:
124            assert self.response_types_by_id.get(reg_id) is None
125            self.response_types_by_id[reg_id] = reg_tp
126            self.response_ids_by_type[reg_tp] = reg_id
127
128        _reg_sys(ErrorSysResponse, -1)
129        _reg_sys(EmptySysResponse, -2)
130
131        # Some extra-thorough validation in debug mode.
132        if __debug__:
133            # Make sure all Message types' return types are valid
134            # and have been assigned an ID as well.
135            all_response_types: set[type[Response] | None] = set()
136            for m_id, m_type in message_types.items():
137                m_rtypes = m_type.get_response_types()
138
139                assert isinstance(m_rtypes, list)
140                assert (
141                    m_rtypes
142                ), f'Message type {m_type} specifies no return types.'
143                assert len(set(m_rtypes)) == len(m_rtypes)  # check dups
144                for m_rtype in m_rtypes:
145                    all_response_types.add(m_rtype)
146            for cls in all_response_types:
147                if cls is None:
148                    continue
149                assert is_ioprepped_dataclass(cls)
150                assert issubclass(cls, Response)
151                if cls not in self.response_ids_by_type:
152                    raise ValueError(
153                        f'Possible response type {cls} needs to be included'
154                        f' in response_types for this protocol.'
155                    )
156
157            # Make sure all registered types have unique base names.
158            # We can take advantage of this to generate cleaner looking
159            # protocol modules. Can revisit if this is ever a problem.
160            mtypenames = set(tp.__name__ for tp in self.message_ids_by_type)
161            if len(mtypenames) != len(message_types):
162                raise ValueError(
163                    'message_types contains duplicate __name__s;'
164                    ' all types are required to have unique names.'
165                )
166
167        self.forward_clean_errors = forward_clean_errors
168        self.forward_communication_errors = forward_communication_errors
169        self.remote_errors_include_stack_traces = (
170            remote_errors_include_stack_traces
171        )
172        self.log_remote_errors = log_remote_errors

Create a protocol with a given configuration.

If 'forward_communication_errors' is True, efro.error.CommunicationErrors raised on the receiver end will result in a matching error raised back on the sender. This can be useful if the receiver will be in some way forwarding messages along and the sender doesn't need to know where communication breakdowns occurred; only that they did.

If 'forward_clean_errors' is True, efro.error.CleanError exceptions raised on the receiver end will result in a matching CleanError raised back on the sender.

When an exception is not covered by the optional forwarding mechanisms above, it will come across as efro.error.RemoteError and the exception will be logged on the receiver end - at least by default (see details below).

If 'remote_errors_include_stack_traces' is True, stringified stack traces will be returned with efro.error.RemoteError exceptions. This is useful for debugging but should only be enabled in cases where the sender is trusted to see internal details of the receiver.

By default, when a message-handling exception will result in an efro.error.RemoteError being returned to the sender, the exception will be logged on the receiver. This is because the goal is usually to avoid returning opaque RemoteErrors and to instead return something meaningful as part of the expected response type (even if that value itself represents a logical error state). If 'log_remote_errors' is False, however, such exceptions will not be logged on the receiver. This can be useful in combination with 'remote_errors_include_stack_traces' and 'forward_clean_errors' in situations where all error logging/management will be happening on the sender end. Be aware, however, that in that case it may be possible for communication errors to prevent such error messages from ever being seen.

message_types_by_id: dict[int, type[Message]]
message_ids_by_type: dict[type[Message], int]
response_types_by_id: dict[int, type[Response] | type[SysResponse]]
response_ids_by_type: dict[type[Response] | type[SysResponse], int]
forward_clean_errors
forward_communication_errors
remote_errors_include_stack_traces
log_remote_errors
@staticmethod
def encode_dict(obj: dict) -> str:
174    @staticmethod
175    def encode_dict(obj: dict) -> str:
176        """Json-encode a provided dict."""
177        return json.dumps(obj, separators=(',', ':'))

Json-encode a provided dict.

def message_to_dict(self, message: Message) -> dict:
179    def message_to_dict(self, message: Message) -> dict:
180        """Encode a message to a json ready dict."""
181        return self._to_dict(message, self.message_ids_by_type, 'message')

Encode a message to a json ready dict.

def response_to_dict( self, response: Response | SysResponse) -> dict:
183    def response_to_dict(self, response: Response | SysResponse) -> dict:
184        """Encode a response to a json ready dict."""
185        return self._to_dict(response, self.response_ids_by_type, 'response')

Encode a response to a json ready dict.

def error_to_response(self, exc: Exception) -> tuple[SysResponse, bool]:
187    def error_to_response(self, exc: Exception) -> tuple[SysResponse, bool]:
188        """Translate an Exception to a SysResponse.
189
190        Also returns whether the error should be logged if this happened
191        within handle_raw_message().
192        """
193
194        # If anything goes wrong, return a ErrorSysResponse instead.
195        # (either CLEAN or generic REMOTE)
196        if self.forward_clean_errors and isinstance(exc, CleanError):
197            return (
198                ErrorSysResponse(
199                    error_message=str(exc),
200                    error_type=ErrorSysResponse.ErrorType.REMOTE_CLEAN,
201                ),
202                False,
203            )
204        if self.forward_communication_errors and isinstance(
205            exc, CommunicationError
206        ):
207            return (
208                ErrorSysResponse(
209                    error_message=str(exc),
210                    error_type=ErrorSysResponse.ErrorType.REMOTE_COMMUNICATION,
211                ),
212                False,
213            )
214        return (
215            ErrorSysResponse(
216                error_message=(
217                    traceback.format_exc()
218                    if self.remote_errors_include_stack_traces
219                    else 'An internal error has occurred.'
220                ),
221                error_type=ErrorSysResponse.ErrorType.REMOTE,
222            ),
223            self.log_remote_errors,
224        )

Translate an Exception to a SysResponse.

Also returns whether the error should be logged if this happened within handle_raw_message().

@staticmethod
def decode_dict(data: str) -> dict:
240    @staticmethod
241    def decode_dict(data: str) -> dict:
242        """Decode data to a dict."""
243        out = json.loads(data)
244        assert isinstance(out, dict)
245        return out

Decode data to a dict.

def message_from_dict(self, data: dict) -> Message:
247    def message_from_dict(self, data: dict) -> Message:
248        """Decode a message from a json string."""
249        out = self._from_dict(data, self.message_types_by_id, 'message')
250        assert isinstance(out, Message)
251        return out

Decode a message from a json string.

def response_from_dict( self, data: dict) -> Response | SysResponse:
253    def response_from_dict(self, data: dict) -> Response | SysResponse:
254        """Decode a response from a json string."""
255        out = self._from_dict(data, self.response_types_by_id, 'response')
256        assert isinstance(out, Response | SysResponse)
257        return out

Decode a response from a json string.

def do_create_sender_module( self, basename: str, protocol_create_code: str, enable_sync_sends: bool, enable_async_sends: bool, private: bool = False, protocol_module_level_import_code: str | None = None) -> str:
401    def do_create_sender_module(
402        self,
403        basename: str,
404        protocol_create_code: str,
405        enable_sync_sends: bool,
406        enable_async_sends: bool,
407        private: bool = False,
408        protocol_module_level_import_code: str | None = None,
409    ) -> str:
410        """Used by create_sender_module(); do not call directly."""
411        # pylint: disable=too-many-locals
412        # pylint: disable=too-many-branches
413        import textwrap
414
415        msgtypes = list(self.message_ids_by_type.keys())
416
417        ppre = '_' if private else ''
418        out = self._get_module_header(
419            'sender',
420            extra_import_code=protocol_module_level_import_code,
421            enable_async_sends=enable_async_sends,
422        )
423        ccind = textwrap.indent(protocol_create_code, '        ')
424        out += (
425            f'class {ppre}{basename}(MessageSender):\n'
426            f'    """Protocol-specific sender."""\n'
427            f'\n'
428            f'    def __init__(self) -> None:\n'
429            f'{ccind}\n'
430            f'        super().__init__(protocol)\n'
431            f'\n'
432            f'    def __get__(\n'
433            f'        self, obj: Any, type_in: Any = None\n'
434            f'    ) -> {ppre}Bound{basename}:\n'
435            f'        return {ppre}Bound{basename}(obj, self)\n'
436            f'\n'
437            f'\n'
438            f'class {ppre}Bound{basename}(BoundMessageSender):\n'
439            f'    """Protocol-specific bound sender."""\n'
440        )
441
442        def _filt_tp_name(rtype: type[Response] | None) -> str:
443            return 'None' if rtype is None else rtype.__name__
444
445        # Define handler() overloads for all registered message types.
446        if msgtypes:
447            for async_pass in False, True:
448                if async_pass and not enable_async_sends:
449                    continue
450                if not async_pass and not enable_sync_sends:
451                    continue
452                pfx = 'async ' if async_pass else ''
453                sfx = '_async' if async_pass else ''
454                # awt = 'await ' if async_pass else ''
455                awt = ''
456                how = 'asynchronously' if async_pass else 'synchronously'
457
458                if len(msgtypes) == 1:
459                    # Special case: with a single message types we don't
460                    # use overloads.
461                    msgtype = msgtypes[0]
462                    msgtypevar = msgtype.__name__
463                    rtypes = msgtype.get_response_types()
464                    if len(rtypes) > 1:
465                        rtypevar = ' | '.join(_filt_tp_name(t) for t in rtypes)
466                    else:
467                        rtypevar = _filt_tp_name(rtypes[0])
468                    if async_pass:
469                        rtypevar = f'Awaitable[{rtypevar}]'
470                    out += (
471                        f'\n'
472                        f'    def send{sfx}(self,'
473                        f' message: {msgtypevar})'
474                        f' -> {rtypevar}:\n'
475                        f'        """Send a message {how}."""\n'
476                        f'        out = {awt}self._sender.'
477                        f'send{sfx}(self._obj, message)\n'
478                    )
479                    if not async_pass:
480                        out += (
481                            f'        assert isinstance(out, {rtypevar})\n'
482                            '        return out\n'
483                        )
484                    else:
485                        out += f'        return cast({rtypevar}, out)\n'
486
487                else:
488                    for msgtype in msgtypes:
489                        msgtypevar = msgtype.__name__
490                        rtypes = msgtype.get_response_types()
491                        if len(rtypes) > 1:
492                            rtypevar = ' | '.join(
493                                _filt_tp_name(t) for t in rtypes
494                            )
495                        else:
496                            rtypevar = _filt_tp_name(rtypes[0])
497                        out += (
498                            f'\n'
499                            f'    @overload\n'
500                            f'    {pfx}def send{sfx}(self,'
501                            f' message: {msgtypevar})'
502                            f' -> {rtypevar}: ...\n'
503                        )
504                    rtypevar = 'Response | None'
505                    if async_pass:
506                        rtypevar = f'Awaitable[{rtypevar}]'
507                    out += (
508                        f'\n'
509                        f'    def send{sfx}(self, message: Message)'
510                        f' -> {rtypevar}:\n'
511                        f'        """Send a message {how}."""\n'
512                        f'        return {awt}self._sender.'
513                        f'send{sfx}(self._obj, message)\n'
514                    )
515
516        return out

Used by create_sender_module(); do not call directly.

def do_create_receiver_module( self, basename: str, protocol_create_code: str, is_async: bool, private: bool = False, protocol_module_level_import_code: str | None = None) -> str:
518    def do_create_receiver_module(
519        self,
520        basename: str,
521        protocol_create_code: str,
522        is_async: bool,
523        private: bool = False,
524        protocol_module_level_import_code: str | None = None,
525    ) -> str:
526        """Used by create_receiver_module(); do not call directly."""
527        # pylint: disable=too-many-locals
528        import textwrap
529
530        desc = 'asynchronous' if is_async else 'synchronous'
531        ppre = '_' if private else ''
532        msgtypes = list(self.message_ids_by_type.keys())
533        out = self._get_module_header(
534            'receiver',
535            extra_import_code=protocol_module_level_import_code,
536            enable_async_sends=False,
537        )
538        ccind = textwrap.indent(protocol_create_code, '        ')
539        out += (
540            f'class {ppre}{basename}(MessageReceiver):\n'
541            f'    """Protocol-specific {desc} receiver."""\n'
542            f'\n'
543            f'    is_async = {is_async}\n'
544            f'\n'
545            f'    def __init__(self) -> None:\n'
546            f'{ccind}\n'
547            f'        super().__init__(protocol)\n'
548            f'\n'
549            f'    def __get__(\n'
550            f'        self,\n'
551            f'        obj: Any,\n'
552            f'        type_in: Any = None,\n'
553            f'    ) -> {ppre}Bound{basename}:\n'
554            f'        return {ppre}Bound{basename}('
555            f'obj, self)\n'
556        )
557
558        # Define handler() overloads for all registered message types.
559
560        def _filt_tp_name(rtype: type[Response] | None) -> str:
561            return 'None' if rtype is None else rtype.__name__
562
563        if msgtypes:
564            cbgn = 'Awaitable[' if is_async else ''
565            cend = ']' if is_async else ''
566            if len(msgtypes) == 1:
567                # Special case: when we have a single message type we don't
568                # use overloads.
569                msgtype = msgtypes[0]
570                msgtypevar = msgtype.__name__
571                rtypes = msgtype.get_response_types()
572                if len(rtypes) > 1:
573                    rtypevar = ' | '.join(_filt_tp_name(t) for t in rtypes)
574                else:
575                    rtypevar = _filt_tp_name(rtypes[0])
576                rtypevar = f'{cbgn}{rtypevar}{cend}'
577                out += (
578                    f'\n'
579                    f'    def handler(\n'
580                    f'        self,\n'
581                    f'        call: Callable[[Any, {msgtypevar}], '
582                    f'{rtypevar}],\n'
583                    f'    )'
584                    f' -> Callable[[Any, {msgtypevar}], {rtypevar}]:\n'
585                    f'        """Decorator to register message handlers."""\n'
586                    f'        from typing import cast, Callable, Any\n'
587                    f'\n'
588                    f'        self.register_handler(cast(Callable'
589                    f'[[Any, Message], Response], call))\n'
590                    f'        return call\n'
591                )
592            else:
593                for msgtype in msgtypes:
594                    msgtypevar = msgtype.__name__
595                    rtypes = msgtype.get_response_types()
596                    if len(rtypes) > 1:
597                        rtypevar = ' | '.join(_filt_tp_name(t) for t in rtypes)
598                    else:
599                        rtypevar = _filt_tp_name(rtypes[0])
600                    rtypevar = f'{cbgn}{rtypevar}{cend}'
601                    out += (
602                        f'\n'
603                        f'    @overload\n'
604                        f'    def handler(\n'
605                        f'        self,\n'
606                        f'        call: Callable[[Any, {msgtypevar}], '
607                        f'{rtypevar}],\n'
608                        f'    )'
609                        f' -> Callable[[Any, {msgtypevar}], {rtypevar}]: ...\n'
610                    )
611                out += (
612                    '\n'
613                    '    def handler(self, call: Callable) -> Callable:\n'
614                    '        """Decorator to register message handlers."""\n'
615                    '        self.register_handler(call)\n'
616                    '        return call\n'
617                )
618
619        out += (
620            f'\n'
621            f'\n'
622            f'class {ppre}Bound{basename}(BoundMessageReceiver):\n'
623            f'    """Protocol-specific bound receiver."""\n'
624        )
625        if is_async:
626            out += (
627                '\n'
628                '    def handle_raw_message(\n'
629                '        self, message: str, raise_unregistered: bool = False\n'
630                '    ) -> Awaitable[str]:\n'
631                '        """Asynchronously handle a raw incoming message."""\n'
632                '        return self._receiver.'
633                'handle_raw_message_async(\n'
634                '            self._obj, message, raise_unregistered\n'
635                '        )\n'
636            )
637
638        else:
639            out += (
640                '\n'
641                '    def handle_raw_message(\n'
642                '        self, message: str, raise_unregistered: bool = False\n'
643                '    ) -> str:\n'
644                '        """Synchronously handle a raw incoming message."""\n'
645                '        return self._receiver.handle_raw_message(\n'
646                '            self._obj, message, raise_unregistered\n'
647                '        )\n'
648            )
649
650        return out

Used by create_receiver_module(); do not call directly.

class MessageSender:
 22class MessageSender:
 23    """Facilitates sending messages to a target and receiving responses.
 24    This is instantiated at the class level and used to register unbound
 25    class methods to handle raw message sending.
 26
 27    Example:
 28
 29    class MyClass:
 30        msg = MyMessageSender(some_protocol)
 31
 32        @msg.send_method
 33        def send_raw_message(self, message: str) -> str:
 34            # Actually send the message here.
 35
 36    # MyMessageSender class should provide overloads for send(), send_async(),
 37    # etc. to ensure all sending happens with valid types.
 38    obj = MyClass()
 39    obj.msg.send(SomeMessageType())
 40    """
 41
 42    def __init__(self, protocol: MessageProtocol) -> None:
 43        self.protocol = protocol
 44        self._send_raw_message_call: Callable[[Any, str], str] | None = None
 45        self._send_async_raw_message_call: (
 46            Callable[[Any, str], Awaitable[str]] | None
 47        ) = None
 48        self._send_async_raw_message_ex_call: (
 49            Callable[[Any, str, Message], Awaitable[str]] | None
 50        ) = None
 51        self._encode_filter_call: (
 52            Callable[[Any, Message, dict], None] | None
 53        ) = None
 54        self._decode_filter_call: (
 55            Callable[[Any, Message, dict, Response | SysResponse], None] | None
 56        ) = None
 57        self._peer_desc_call: Callable[[Any], str] | None = None
 58
 59    def send_method(
 60        self, call: Callable[[Any, str], str]
 61    ) -> Callable[[Any, str], str]:
 62        """Function decorator for setting raw send method.
 63
 64        Send methods take strings and should return strings.
 65        CommunicationErrors raised here will be returned to the sender
 66        as such; all other exceptions will result in a RuntimeError for
 67        the sender.
 68        """
 69        assert self._send_raw_message_call is None
 70        self._send_raw_message_call = call
 71        return call
 72
 73    def send_async_method(
 74        self, call: Callable[[Any, str], Awaitable[str]]
 75    ) -> Callable[[Any, str], Awaitable[str]]:
 76        """Function decorator for setting raw send-async method.
 77
 78        Send methods take strings and should return strings.
 79        CommunicationErrors raised here will be returned to the sender
 80        as such; all other exceptions will result in a RuntimeError for
 81        the sender.
 82
 83        IMPORTANT: Generally async send methods should not be implemented
 84        as 'async' methods, but instead should be regular methods that
 85        return awaitable objects. This way it can be guaranteed that
 86        outgoing messages are synchronously enqueued in the correct
 87        order, and then async calls can be returned which finish each
 88        send. If the entire call is async, they may be enqueued out of
 89        order in rare cases.
 90        """
 91        assert self._send_async_raw_message_call is None
 92        self._send_async_raw_message_call = call
 93        return call
 94
 95    def send_async_ex_method(
 96        self, call: Callable[[Any, str, Message], Awaitable[str]]
 97    ) -> Callable[[Any, str, Message], Awaitable[str]]:
 98        """Function decorator for extended send-async method.
 99
100        Version of send_async_method which is also is passed the original
101        unencoded message; can be useful for cases where metadata is sent
102        along with messages referring to their payloads/etc.
103        """
104        assert self._send_async_raw_message_ex_call is None
105        self._send_async_raw_message_ex_call = call
106        return call
107
108    def encode_filter_method(
109        self, call: Callable[[Any, Message, dict], None]
110    ) -> Callable[[Any, Message, dict], None]:
111        """Function decorator for defining an encode filter.
112
113        Encode filters can be used to add extra data to the message
114        dict before is is encoded to a string and sent out.
115        """
116        assert self._encode_filter_call is None
117        self._encode_filter_call = call
118        return call
119
120    def decode_filter_method(
121        self, call: Callable[[Any, Message, dict, Response | SysResponse], None]
122    ) -> Callable[[Any, Message, dict, Response], None]:
123        """Function decorator for defining a decode filter.
124
125        Decode filters can be used to extract extra data from incoming
126        message dicts.
127        """
128        assert self._decode_filter_call is None
129        self._decode_filter_call = call
130        return call
131
132    def peer_desc_method(
133        self, call: Callable[[Any], str]
134    ) -> Callable[[Any], str]:
135        """Function decorator for defining peer descriptions.
136
137        These are included in error messages or other diagnostics.
138        """
139        assert self._peer_desc_call is None
140        self._peer_desc_call = call
141        return call
142
143    def send(self, bound_obj: Any, message: Message) -> Response | None:
144        """Send a message synchronously."""
145        return self.unpack_raw_response(
146            bound_obj=bound_obj,
147            message=message,
148            raw_response=self.fetch_raw_response(
149                bound_obj=bound_obj,
150                message=message,
151            ),
152        )
153
154    def send_async(
155        self, bound_obj: Any, message: Message
156    ) -> Awaitable[Response | None]:
157        """Send a message asynchronously."""
158
159        # Note: This call is synchronous so that the first part of it can
160        # happen synchronously. If the whole call were async we wouldn't be
161        # able to guarantee that messages sent in order would actually go
162        # out in order.
163        raw_response_awaitable = self.fetch_raw_response_async(
164            bound_obj=bound_obj,
165            message=message,
166        )
167        # Now return an awaitable that will finish the send.
168        return self._send_async_awaitable(
169            bound_obj, message, raw_response_awaitable
170        )
171
172    async def _send_async_awaitable(
173        self,
174        bound_obj: Any,
175        message: Message,
176        raw_response_awaitable: Awaitable[Response | SysResponse],
177    ) -> Response | None:
178        return self.unpack_raw_response(
179            bound_obj=bound_obj,
180            message=message,
181            raw_response=await raw_response_awaitable,
182        )
183
184    def fetch_raw_response(
185        self, bound_obj: Any, message: Message
186    ) -> Response | SysResponse:
187        """Send a message synchronously.
188
189        Generally you can just call send(); these split versions are
190        for when message sending and response handling need to happen
191        in different contexts/threads.
192        """
193        if self._send_raw_message_call is None:
194            raise RuntimeError('send() is unimplemented for this type.')
195
196        msg_encoded = self._encode_message(bound_obj, message)
197        try:
198            response_encoded = self._send_raw_message_call(
199                bound_obj, msg_encoded
200            )
201        except Exception as exc:
202            response = ErrorSysResponse(
203                error_message='Error in MessageSender @send_method.',
204                error_type=(
205                    ErrorSysResponse.ErrorType.COMMUNICATION
206                    if isinstance(exc, CommunicationError)
207                    else ErrorSysResponse.ErrorType.LOCAL
208                ),
209            )
210            # Can include the actual exception since we'll be looking at
211            # this locally; might be helpful.
212            response.set_local_exception(exc)
213            return response
214        return self._decode_raw_response(bound_obj, message, response_encoded)
215
216    def fetch_raw_response_async(
217        self, bound_obj: Any, message: Message
218    ) -> Awaitable[Response | SysResponse]:
219        """Fetch a raw message response awaitable.
220
221        The result of this should be awaited and then passed to
222        unpack_raw_response() to produce the final message result.
223
224        Generally you can just call send(); calling fetch and unpack
225        manually is for when message sending and response handling need
226        to happen in different contexts/threads.
227        """
228
229        # Note: This call is synchronous so that the first part of it can
230        # happen synchronously. If the whole call were async we wouldn't be
231        # able to guarantee that messages sent in order would actually go
232        # out in order.
233        if (
234            self._send_async_raw_message_call is None
235            and self._send_async_raw_message_ex_call is None
236        ):
237            raise RuntimeError('send_async() is unimplemented for this type.')
238
239        msg_encoded = self._encode_message(bound_obj, message)
240        try:
241            if self._send_async_raw_message_ex_call is not None:
242                send_awaitable = self._send_async_raw_message_ex_call(
243                    bound_obj, msg_encoded, message
244                )
245            else:
246                assert self._send_async_raw_message_call is not None
247                send_awaitable = self._send_async_raw_message_call(
248                    bound_obj, msg_encoded
249                )
250        except Exception as exc:
251            return self._error_awaitable(exc)
252
253        # Now return an awaitable to finish the job.
254        return self._fetch_raw_response_awaitable(
255            bound_obj, message, send_awaitable
256        )
257
258    async def _error_awaitable(self, exc: Exception) -> SysResponse:
259        response = ErrorSysResponse(
260            error_message='Error in MessageSender @send_async_method.',
261            error_type=(
262                ErrorSysResponse.ErrorType.COMMUNICATION
263                if isinstance(exc, CommunicationError)
264                else ErrorSysResponse.ErrorType.LOCAL
265            ),
266        )
267        # Can include the actual exception since we'll be looking at
268        # this locally; might be helpful.
269        response.set_local_exception(exc)
270        return response
271
272    async def _fetch_raw_response_awaitable(
273        self, bound_obj: Any, message: Message, send_awaitable: Awaitable[str]
274    ) -> Response | SysResponse:
275        try:
276            response_encoded = await send_awaitable
277        except Exception as exc:
278            response = ErrorSysResponse(
279                error_message='Error in MessageSender @send_async_method.',
280                error_type=(
281                    ErrorSysResponse.ErrorType.COMMUNICATION
282                    if isinstance(exc, CommunicationError)
283                    else ErrorSysResponse.ErrorType.LOCAL
284                ),
285            )
286            # Can include the actual exception since we'll be looking at
287            # this locally; might be helpful.
288            response.set_local_exception(exc)
289            return response
290        return self._decode_raw_response(bound_obj, message, response_encoded)
291
292    def unpack_raw_response(
293        self,
294        bound_obj: Any,
295        message: Message,
296        raw_response: Response | SysResponse,
297    ) -> Response | None:
298        """Convert a raw fetched response into a final response/error/etc.
299
300        Generally you can just call send(); calling fetch and unpack
301        manually is for when message sending and response handling need
302        to happen in different contexts/threads.
303        """
304        response = self._unpack_raw_response(bound_obj, raw_response)
305        assert (
306            response is None
307            or type(response) in type(message).get_response_types()
308        )
309        return response
310
311    def _encode_message(self, bound_obj: Any, message: Message) -> str:
312        """Encode a message for sending."""
313        msg_dict = self.protocol.message_to_dict(message)
314        if self._encode_filter_call is not None:
315            self._encode_filter_call(bound_obj, message, msg_dict)
316        return self.protocol.encode_dict(msg_dict)
317
318    def _decode_raw_response(
319        self, bound_obj: Any, message: Message, response_encoded: str
320    ) -> Response | SysResponse:
321        """Create a Response from returned data.
322
323        These Responses may encapsulate things like remote errors and
324        should not be handed directly to users. _unpack_raw_response()
325        should be used to translate to special values like None or raise
326        Exceptions. This function itself should never raise Exceptions.
327        """
328        response: Response | SysResponse
329        try:
330            response_dict = self.protocol.decode_dict(response_encoded)
331            response = self.protocol.response_from_dict(response_dict)
332            if self._decode_filter_call is not None:
333                self._decode_filter_call(
334                    bound_obj, message, response_dict, response
335                )
336        except Exception as exc:
337            response = ErrorSysResponse(
338                error_message='Error decoding raw response.',
339                error_type=ErrorSysResponse.ErrorType.LOCAL,
340            )
341            # Since we'll be looking at this locally, we can include
342            # extra info for logging/etc.
343            response.set_local_exception(exc)
344        return response
345
346    def _unpack_raw_response(
347        self, bound_obj: Any, raw_response: Response | SysResponse
348    ) -> Response | None:
349        """Given a raw Response, unpacks to special values or Exceptions.
350
351        The result of this call is what should be passed to users.
352        For complex messaging situations such as response callbacks
353        operating across different threads, this last stage should be
354        run such that any raised Exception is active when the callback
355        fires; not on the thread where the message was sent.
356        """
357        # EmptySysResponse translates to None
358        if isinstance(raw_response, EmptySysResponse):
359            return None
360
361        # Some error occurred. Raise a local Exception for it.
362        if isinstance(raw_response, ErrorSysResponse):
363            # Errors that happened locally can attach their exceptions
364            # here for extra logging goodness.
365            local_exception = raw_response.get_local_exception()
366
367            if (
368                raw_response.error_type
369                is ErrorSysResponse.ErrorType.COMMUNICATION
370            ):
371                raise CommunicationError(
372                    raw_response.error_message
373                ) from local_exception
374
375            # If something went wrong on *our* end of the connection,
376            # don't say it was a remote error.
377            if raw_response.error_type is ErrorSysResponse.ErrorType.LOCAL:
378                raise RuntimeError(
379                    raw_response.error_message
380                ) from local_exception
381
382            # If they want to support clean errors, do those.
383            if (
384                self.protocol.forward_clean_errors
385                and raw_response.error_type
386                is ErrorSysResponse.ErrorType.REMOTE_CLEAN
387            ):
388                raise CleanError(
389                    raw_response.error_message
390                ) from local_exception
391
392            if (
393                self.protocol.forward_communication_errors
394                and raw_response.error_type
395                is ErrorSysResponse.ErrorType.REMOTE_COMMUNICATION
396            ):
397                raise CommunicationError(
398                    raw_response.error_message
399                ) from local_exception
400
401            # Everything else gets lumped in as a remote error.
402            raise RemoteError(
403                raw_response.error_message,
404                peer_desc=(
405                    'peer'
406                    if self._peer_desc_call is None
407                    else self._peer_desc_call(bound_obj)
408                ),
409            ) from local_exception
410
411        assert isinstance(raw_response, Response)
412        return raw_response

Facilitates sending messages to a target and receiving responses. This is instantiated at the class level and used to register unbound class methods to handle raw message sending.

Example:

class MyClass: msg = MyMessageSender(some_protocol)

@msg.send_method
def send_raw_message(self, message: str) -> str:
    # Actually send the message here.

MyMessageSender class should provide overloads for send(), send_async(),

etc. to ensure all sending happens with valid types.

obj = MyClass() obj.msg.send(SomeMessageType())

MessageSender(protocol: MessageProtocol)
42    def __init__(self, protocol: MessageProtocol) -> None:
43        self.protocol = protocol
44        self._send_raw_message_call: Callable[[Any, str], str] | None = None
45        self._send_async_raw_message_call: (
46            Callable[[Any, str], Awaitable[str]] | None
47        ) = None
48        self._send_async_raw_message_ex_call: (
49            Callable[[Any, str, Message], Awaitable[str]] | None
50        ) = None
51        self._encode_filter_call: (
52            Callable[[Any, Message, dict], None] | None
53        ) = None
54        self._decode_filter_call: (
55            Callable[[Any, Message, dict, Response | SysResponse], None] | None
56        ) = None
57        self._peer_desc_call: Callable[[Any], str] | None = None
protocol
def send_method(self, call: Callable[[Any, str], str]) -> Callable[[Any, str], str]:
59    def send_method(
60        self, call: Callable[[Any, str], str]
61    ) -> Callable[[Any, str], str]:
62        """Function decorator for setting raw send method.
63
64        Send methods take strings and should return strings.
65        CommunicationErrors raised here will be returned to the sender
66        as such; all other exceptions will result in a RuntimeError for
67        the sender.
68        """
69        assert self._send_raw_message_call is None
70        self._send_raw_message_call = call
71        return call

Function decorator for setting raw send method.

Send methods take strings and should return strings. CommunicationErrors raised here will be returned to the sender as such; all other exceptions will result in a RuntimeError for the sender.

def send_async_method( self, call: Callable[[Any, str], Awaitable[str]]) -> Callable[[Any, str], Awaitable[str]]:
73    def send_async_method(
74        self, call: Callable[[Any, str], Awaitable[str]]
75    ) -> Callable[[Any, str], Awaitable[str]]:
76        """Function decorator for setting raw send-async method.
77
78        Send methods take strings and should return strings.
79        CommunicationErrors raised here will be returned to the sender
80        as such; all other exceptions will result in a RuntimeError for
81        the sender.
82
83        IMPORTANT: Generally async send methods should not be implemented
84        as 'async' methods, but instead should be regular methods that
85        return awaitable objects. This way it can be guaranteed that
86        outgoing messages are synchronously enqueued in the correct
87        order, and then async calls can be returned which finish each
88        send. If the entire call is async, they may be enqueued out of
89        order in rare cases.
90        """
91        assert self._send_async_raw_message_call is None
92        self._send_async_raw_message_call = call
93        return call

Function decorator for setting raw send-async method.

Send methods take strings and should return strings. CommunicationErrors raised here will be returned to the sender as such; all other exceptions will result in a RuntimeError for the sender.

IMPORTANT: Generally async send methods should not be implemented as 'async' methods, but instead should be regular methods that return awaitable objects. This way it can be guaranteed that outgoing messages are synchronously enqueued in the correct order, and then async calls can be returned which finish each send. If the entire call is async, they may be enqueued out of order in rare cases.

def send_async_ex_method( self, call: Callable[[Any, str, Message], Awaitable[str]]) -> Callable[[Any, str, Message], Awaitable[str]]:
 95    def send_async_ex_method(
 96        self, call: Callable[[Any, str, Message], Awaitable[str]]
 97    ) -> Callable[[Any, str, Message], Awaitable[str]]:
 98        """Function decorator for extended send-async method.
 99
100        Version of send_async_method which is also is passed the original
101        unencoded message; can be useful for cases where metadata is sent
102        along with messages referring to their payloads/etc.
103        """
104        assert self._send_async_raw_message_ex_call is None
105        self._send_async_raw_message_ex_call = call
106        return call

Function decorator for extended send-async method.

Version of send_async_method which is also is passed the original unencoded message; can be useful for cases where metadata is sent along with messages referring to their payloads/etc.

def encode_filter_method( self, call: Callable[[Any, Message, dict], NoneType]) -> Callable[[Any, Message, dict], NoneType]:
108    def encode_filter_method(
109        self, call: Callable[[Any, Message, dict], None]
110    ) -> Callable[[Any, Message, dict], None]:
111        """Function decorator for defining an encode filter.
112
113        Encode filters can be used to add extra data to the message
114        dict before is is encoded to a string and sent out.
115        """
116        assert self._encode_filter_call is None
117        self._encode_filter_call = call
118        return call

Function decorator for defining an encode filter.

Encode filters can be used to add extra data to the message dict before is is encoded to a string and sent out.

def decode_filter_method( self, call: Callable[[Any, Message, dict, Response | SysResponse], NoneType]) -> Callable[[Any, Message, dict, Response], NoneType]:
120    def decode_filter_method(
121        self, call: Callable[[Any, Message, dict, Response | SysResponse], None]
122    ) -> Callable[[Any, Message, dict, Response], None]:
123        """Function decorator for defining a decode filter.
124
125        Decode filters can be used to extract extra data from incoming
126        message dicts.
127        """
128        assert self._decode_filter_call is None
129        self._decode_filter_call = call
130        return call

Function decorator for defining a decode filter.

Decode filters can be used to extract extra data from incoming message dicts.

def peer_desc_method(self, call: Callable[[Any], str]) -> Callable[[Any], str]:
132    def peer_desc_method(
133        self, call: Callable[[Any], str]
134    ) -> Callable[[Any], str]:
135        """Function decorator for defining peer descriptions.
136
137        These are included in error messages or other diagnostics.
138        """
139        assert self._peer_desc_call is None
140        self._peer_desc_call = call
141        return call

Function decorator for defining peer descriptions.

These are included in error messages or other diagnostics.

def send( self, bound_obj: Any, message: Message) -> Response | None:
143    def send(self, bound_obj: Any, message: Message) -> Response | None:
144        """Send a message synchronously."""
145        return self.unpack_raw_response(
146            bound_obj=bound_obj,
147            message=message,
148            raw_response=self.fetch_raw_response(
149                bound_obj=bound_obj,
150                message=message,
151            ),
152        )

Send a message synchronously.

def send_async( self, bound_obj: Any, message: Message) -> Awaitable[Response | None]:
154    def send_async(
155        self, bound_obj: Any, message: Message
156    ) -> Awaitable[Response | None]:
157        """Send a message asynchronously."""
158
159        # Note: This call is synchronous so that the first part of it can
160        # happen synchronously. If the whole call were async we wouldn't be
161        # able to guarantee that messages sent in order would actually go
162        # out in order.
163        raw_response_awaitable = self.fetch_raw_response_async(
164            bound_obj=bound_obj,
165            message=message,
166        )
167        # Now return an awaitable that will finish the send.
168        return self._send_async_awaitable(
169            bound_obj, message, raw_response_awaitable
170        )

Send a message asynchronously.

def fetch_raw_response( self, bound_obj: Any, message: Message) -> Response | SysResponse:
184    def fetch_raw_response(
185        self, bound_obj: Any, message: Message
186    ) -> Response | SysResponse:
187        """Send a message synchronously.
188
189        Generally you can just call send(); these split versions are
190        for when message sending and response handling need to happen
191        in different contexts/threads.
192        """
193        if self._send_raw_message_call is None:
194            raise RuntimeError('send() is unimplemented for this type.')
195
196        msg_encoded = self._encode_message(bound_obj, message)
197        try:
198            response_encoded = self._send_raw_message_call(
199                bound_obj, msg_encoded
200            )
201        except Exception as exc:
202            response = ErrorSysResponse(
203                error_message='Error in MessageSender @send_method.',
204                error_type=(
205                    ErrorSysResponse.ErrorType.COMMUNICATION
206                    if isinstance(exc, CommunicationError)
207                    else ErrorSysResponse.ErrorType.LOCAL
208                ),
209            )
210            # Can include the actual exception since we'll be looking at
211            # this locally; might be helpful.
212            response.set_local_exception(exc)
213            return response
214        return self._decode_raw_response(bound_obj, message, response_encoded)

Send a message synchronously.

Generally you can just call send(); these split versions are for when message sending and response handling need to happen in different contexts/threads.

def fetch_raw_response_async( self, bound_obj: Any, message: Message) -> Awaitable[Response | SysResponse]:
216    def fetch_raw_response_async(
217        self, bound_obj: Any, message: Message
218    ) -> Awaitable[Response | SysResponse]:
219        """Fetch a raw message response awaitable.
220
221        The result of this should be awaited and then passed to
222        unpack_raw_response() to produce the final message result.
223
224        Generally you can just call send(); calling fetch and unpack
225        manually is for when message sending and response handling need
226        to happen in different contexts/threads.
227        """
228
229        # Note: This call is synchronous so that the first part of it can
230        # happen synchronously. If the whole call were async we wouldn't be
231        # able to guarantee that messages sent in order would actually go
232        # out in order.
233        if (
234            self._send_async_raw_message_call is None
235            and self._send_async_raw_message_ex_call is None
236        ):
237            raise RuntimeError('send_async() is unimplemented for this type.')
238
239        msg_encoded = self._encode_message(bound_obj, message)
240        try:
241            if self._send_async_raw_message_ex_call is not None:
242                send_awaitable = self._send_async_raw_message_ex_call(
243                    bound_obj, msg_encoded, message
244                )
245            else:
246                assert self._send_async_raw_message_call is not None
247                send_awaitable = self._send_async_raw_message_call(
248                    bound_obj, msg_encoded
249                )
250        except Exception as exc:
251            return self._error_awaitable(exc)
252
253        # Now return an awaitable to finish the job.
254        return self._fetch_raw_response_awaitable(
255            bound_obj, message, send_awaitable
256        )

Fetch a raw message response awaitable.

The result of this should be awaited and then passed to unpack_raw_response() to produce the final message result.

Generally you can just call send(); calling fetch and unpack manually is for when message sending and response handling need to happen in different contexts/threads.

def unpack_raw_response( self, bound_obj: Any, message: Message, raw_response: Response | SysResponse) -> Response | None:
292    def unpack_raw_response(
293        self,
294        bound_obj: Any,
295        message: Message,
296        raw_response: Response | SysResponse,
297    ) -> Response | None:
298        """Convert a raw fetched response into a final response/error/etc.
299
300        Generally you can just call send(); calling fetch and unpack
301        manually is for when message sending and response handling need
302        to happen in different contexts/threads.
303        """
304        response = self._unpack_raw_response(bound_obj, raw_response)
305        assert (
306            response is None
307            or type(response) in type(message).get_response_types()
308        )
309        return response

Convert a raw fetched response into a final response/error/etc.

Generally you can just call send(); calling fetch and unpack manually is for when message sending and response handling need to happen in different contexts/threads.

class BoundMessageSender:
415class BoundMessageSender:
416    """Base class for bound senders."""
417
418    def __init__(self, obj: Any, sender: MessageSender) -> None:
419        # Note: not checking obj here since we want to support
420        # at least our protocol property when accessed via type.
421        self._obj = obj
422        self._sender = sender
423
424    @property
425    def protocol(self) -> MessageProtocol:
426        """Protocol associated with this sender."""
427        return self._sender.protocol
428
429    def send_untyped(self, message: Message) -> Response | None:
430        """Send a message synchronously.
431
432        Whenever possible, use the send() call provided by generated
433        subclasses instead of this; it will provide better type safety.
434        """
435        assert self._obj is not None
436        return self._sender.send(bound_obj=self._obj, message=message)
437
438    def send_async_untyped(
439        self, message: Message
440    ) -> Awaitable[Response | None]:
441        """Send a message asynchronously.
442
443        Whenever possible, use the send_async() call provided by generated
444        subclasses instead of this; it will provide better type safety.
445        """
446        assert self._obj is not None
447        return self._sender.send_async(bound_obj=self._obj, message=message)
448
449    def fetch_raw_response_async_untyped(
450        self, message: Message
451    ) -> Awaitable[Response | SysResponse]:
452        """Split send (part 1 of 2)."""
453        assert self._obj is not None
454        return self._sender.fetch_raw_response_async(
455            bound_obj=self._obj, message=message
456        )
457
458    def unpack_raw_response_untyped(
459        self, message: Message, raw_response: Response | SysResponse
460    ) -> Response | None:
461        """Split send (part 2 of 2)."""
462        return self._sender.unpack_raw_response(
463            bound_obj=self._obj, message=message, raw_response=raw_response
464        )

Base class for bound senders.

BoundMessageSender(obj: Any, sender: MessageSender)
418    def __init__(self, obj: Any, sender: MessageSender) -> None:
419        # Note: not checking obj here since we want to support
420        # at least our protocol property when accessed via type.
421        self._obj = obj
422        self._sender = sender
protocol: MessageProtocol
424    @property
425    def protocol(self) -> MessageProtocol:
426        """Protocol associated with this sender."""
427        return self._sender.protocol

Protocol associated with this sender.

def send_untyped( self, message: Message) -> Response | None:
429    def send_untyped(self, message: Message) -> Response | None:
430        """Send a message synchronously.
431
432        Whenever possible, use the send() call provided by generated
433        subclasses instead of this; it will provide better type safety.
434        """
435        assert self._obj is not None
436        return self._sender.send(bound_obj=self._obj, message=message)

Send a message synchronously.

Whenever possible, use the send() call provided by generated subclasses instead of this; it will provide better type safety.

def send_async_untyped( self, message: Message) -> Awaitable[Response | None]:
438    def send_async_untyped(
439        self, message: Message
440    ) -> Awaitable[Response | None]:
441        """Send a message asynchronously.
442
443        Whenever possible, use the send_async() call provided by generated
444        subclasses instead of this; it will provide better type safety.
445        """
446        assert self._obj is not None
447        return self._sender.send_async(bound_obj=self._obj, message=message)

Send a message asynchronously.

Whenever possible, use the send_async() call provided by generated subclasses instead of this; it will provide better type safety.

def fetch_raw_response_async_untyped( self, message: Message) -> Awaitable[Response | SysResponse]:
449    def fetch_raw_response_async_untyped(
450        self, message: Message
451    ) -> Awaitable[Response | SysResponse]:
452        """Split send (part 1 of 2)."""
453        assert self._obj is not None
454        return self._sender.fetch_raw_response_async(
455            bound_obj=self._obj, message=message
456        )

Split send (part 1 of 2).

def unpack_raw_response_untyped( self, message: Message, raw_response: Response | SysResponse) -> Response | None:
458    def unpack_raw_response_untyped(
459        self, message: Message, raw_response: Response | SysResponse
460    ) -> Response | None:
461        """Split send (part 2 of 2)."""
462        return self._sender.unpack_raw_response(
463            bound_obj=self._obj, message=message, raw_response=raw_response
464        )

Split send (part 2 of 2).

class MessageReceiver:
 29class MessageReceiver:
 30    """Facilitates receiving & responding to messages from a remote source.
 31
 32    This is instantiated at the class level with unbound methods registered
 33    as handlers for different message types in the protocol.
 34
 35    Example:
 36
 37    class MyClass:
 38        receiver = MyMessageReceiver()
 39
 40        # MyMessageReceiver fills out handler() overloads to ensure all
 41        # registered handlers have valid types/return-types.
 42        @receiver.handler
 43        def handle_some_message_type(self, message: SomeMsg) -> SomeResponse:
 44            # Deal with this message type here.
 45
 46    # This will trigger the registered handler being called.
 47    obj = MyClass()
 48    obj.receiver.handle_raw_message(some_raw_data)
 49
 50    Any unhandled Exception occurring during message handling will result in
 51    an Exception being raised on the sending end.
 52    """
 53
 54    is_async = False
 55
 56    def __init__(self, protocol: MessageProtocol) -> None:
 57        self.protocol = protocol
 58        self._handlers: dict[type[Message], Callable] = {}
 59        self._decode_filter_call: (
 60            Callable[[Any, dict, Message], None] | None
 61        ) = None
 62        self._encode_filter_call: (
 63            Callable[[Any, Message | None, Response | SysResponse, dict], None]
 64            | None
 65        ) = None
 66
 67    # noinspection PyProtectedMember
 68    def register_handler(
 69        self, call: Callable[[Any, Message], Response | None]
 70    ) -> None:
 71        """Register a handler call.
 72
 73        The message type handled by the call is determined by its
 74        type annotation.
 75        """
 76        # TODO: can use types.GenericAlias in 3.9.
 77        # (hmm though now that we're there,  it seems a drop-in
 78        # replace gives us errors. Should re-test in 3.11 as it seems
 79        # that typing_extensions handles it differently in that case)
 80        from typing import _GenericAlias  # type: ignore
 81        from typing import get_type_hints, get_args
 82
 83        sig = inspect.getfullargspec(call)
 84
 85        # The provided callable should be a method taking one 'msg' arg.
 86        expectedsig = ['self', 'msg']
 87        if sig.args != expectedsig:
 88            raise ValueError(
 89                f'Expected callable signature of {expectedsig};'
 90                f' got {sig.args}'
 91            )
 92
 93        # Make sure we are only given async methods if we are an async handler
 94        # and sync ones otherwise.
 95        # UPDATE - can't do this anymore since we now sometimes use
 96        # regular functions which return awaitables instead of having
 97        # the entire function be async.
 98        # is_async = inspect.iscoroutinefunction(call)
 99        # if self.is_async != is_async:
100        #     msg = (
101        #         'Expected a sync method; found an async one.'
102        #         if is_async
103        #         else 'Expected an async method; found a sync one.'
104        #     )
105        #     raise ValueError(msg)
106
107        # Check annotation types to determine what message types we handle.
108        # Return-type annotation can be a Union, but we probably don't
109        # have it available at runtime. Explicitly pull it in.
110        # UPDATE: we've updated our pylint filter to where we should
111        # have all annotations available.
112        # anns = get_type_hints(call, localns={'Union': Union})
113        anns = get_type_hints(call)
114
115        msgtype = anns.get('msg')
116        if not isinstance(msgtype, type):
117            raise TypeError(
118                f'expected a type for "msg" annotation; got {type(msgtype)}.'
119            )
120        assert issubclass(msgtype, Message)
121
122        ret = anns.get('return')
123        responsetypes: tuple[type[Any] | None, ...]
124
125        # Return types can be a single type or a union of types.
126        if isinstance(ret, (_GenericAlias, types.UnionType)):
127            targs = get_args(ret)
128            if not all(isinstance(a, (type, type(None))) for a in targs):
129                raise TypeError(
130                    f'expected only types for "return" annotation;'
131                    f' got {targs}.'
132                )
133            responsetypes = targs
134        else:
135            if not isinstance(ret, (type, type(None))):
136                raise TypeError(
137                    f'expected one or more types for'
138                    f' "return" annotation; got a {type(ret)}.'
139                )
140            # This seems like maybe a mypy bug. Appeared after adding
141            # types.UnionType above.
142            responsetypes = (ret,)
143
144        # This will contain NoneType for empty return cases, but
145        # we expect it to be None.
146        # noinspection PyPep8
147        responsetypes = tuple(
148            None if r is type(None) else r for r in responsetypes
149        )
150
151        # Make sure our protocol has this message type registered and our
152        # return types exactly match. (Technically we could return a subset
153        # of the supported types; can allow this in the future if it makes
154        # sense).
155        registered_types = self.protocol.message_ids_by_type.keys()
156
157        if msgtype not in registered_types:
158            raise TypeError(
159                f'Message type {msgtype} is not registered'
160                f' in this Protocol.'
161            )
162
163        if msgtype in self._handlers:
164            raise TypeError(
165                f'Message type {msgtype} already has a registered' f' handler.'
166            )
167
168        # Make sure the responses exactly matches what the message expects.
169        if set(responsetypes) != set(msgtype.get_response_types()):
170            raise TypeError(
171                f'Provided response types {responsetypes} do not'
172                f' match the set expected by message type {msgtype}: '
173                f'({msgtype.get_response_types()})'
174            )
175
176        # Ok; we're good!
177        self._handlers[msgtype] = call
178
179    def decode_filter_method(
180        self, call: Callable[[Any, dict, Message], None]
181    ) -> Callable[[Any, dict, Message], None]:
182        """Function decorator for defining a decode filter.
183
184        Decode filters can be used to extract extra data from incoming
185        message dicts. This version will work for both handle_raw_message()
186        and handle_raw_message_async()
187        """
188        assert self._decode_filter_call is None
189        self._decode_filter_call = call
190        return call
191
192    def encode_filter_method(
193        self,
194        call: Callable[
195            [Any, Message | None, Response | SysResponse, dict], None
196        ],
197    ) -> Callable[[Any, Message | None, Response, dict], None]:
198        """Function decorator for defining an encode filter.
199
200        Encode filters can be used to add extra data to the message
201        dict before is is encoded to a string and sent out.
202        """
203        assert self._encode_filter_call is None
204        self._encode_filter_call = call
205        return call
206
207    def validate(self, log_only: bool = False) -> None:
208        """Check for handler completeness, valid types, etc."""
209        for msgtype in self.protocol.message_ids_by_type.keys():
210            if issubclass(msgtype, Response):
211                continue
212            if msgtype not in self._handlers:
213                msg = (
214                    f'Protocol message type {msgtype} is not handled'
215                    f' by receiver type {type(self)}.'
216                )
217                if log_only:
218                    logging.error(msg)
219                else:
220                    raise TypeError(msg)
221
222    def _decode_incoming_message_base(
223        self, bound_obj: Any, msg: str
224    ) -> tuple[Any, dict, Message]:
225        # Decode the incoming message.
226        msg_dict = self.protocol.decode_dict(msg)
227        msg_decoded = self.protocol.message_from_dict(msg_dict)
228        assert isinstance(msg_decoded, Message)
229        if self._decode_filter_call is not None:
230            self._decode_filter_call(bound_obj, msg_dict, msg_decoded)
231        return bound_obj, msg_dict, msg_decoded
232
233    def _decode_incoming_message(self, bound_obj: Any, msg: str) -> Message:
234        bound_obj, _msg_dict, msg_decoded = self._decode_incoming_message_base(
235            bound_obj=bound_obj, msg=msg
236        )
237        return msg_decoded
238
239    def encode_user_response(
240        self, bound_obj: Any, message: Message, response: Response | None
241    ) -> str:
242        """Encode a response provided by the user for sending."""
243
244        assert isinstance(response, Response | None)
245        # (user should never explicitly return error-responses)
246        assert (
247            response is None or type(response) in message.get_response_types()
248        )
249
250        # A return value of None equals EmptySysResponse.
251        out_response: Response | SysResponse
252        if response is None:
253            out_response = EmptySysResponse()
254        else:
255            out_response = response
256
257        response_dict = self.protocol.response_to_dict(out_response)
258        if self._encode_filter_call is not None:
259            self._encode_filter_call(
260                bound_obj, message, out_response, response_dict
261            )
262        return self.protocol.encode_dict(response_dict)
263
264    def encode_error_response(
265        self, bound_obj: Any, message: Message | None, exc: Exception
266    ) -> tuple[str, bool]:
267        """Given an error, return sysresponse str and whether to log."""
268        response, dolog = self.protocol.error_to_response(exc)
269        response_dict = self.protocol.response_to_dict(response)
270        if self._encode_filter_call is not None:
271            self._encode_filter_call(
272                bound_obj, message, response, response_dict
273            )
274        return self.protocol.encode_dict(response_dict), dolog
275
276    def handle_raw_message(
277        self, bound_obj: Any, msg: str, raise_unregistered: bool = False
278    ) -> str:
279        """Decode, handle, and return an response for a message.
280
281        if 'raise_unregistered' is True, will raise an
282        efro.message.UnregisteredMessageIDError for messages not handled by
283        the protocol. In all other cases local errors will translate to
284        error responses returned to the sender.
285        """
286        assert not self.is_async, "can't call sync handler on async receiver"
287        msg_decoded: Message | None = None
288        msgtype: type[Message] | None = None
289        try:
290            msg_decoded = self._decode_incoming_message(bound_obj, msg)
291            msgtype = type(msg_decoded)
292            handler = self._handlers.get(msgtype)
293            if handler is None:
294                raise RuntimeError(f'Got unhandled message type: {msgtype}.')
295            response = handler(bound_obj, msg_decoded)
296            assert isinstance(response, Response | None)
297            return self.encode_user_response(bound_obj, msg_decoded, response)
298
299        except Exception as exc:
300            if raise_unregistered and isinstance(
301                exc, UnregisteredMessageIDError
302            ):
303                raise
304            rstr, dolog = self.encode_error_response(
305                bound_obj, msg_decoded, exc
306            )
307            if dolog:
308                if msgtype is not None:
309                    logging.exception(
310                        'Error handling %s.%s message.',
311                        msgtype.__module__,
312                        msgtype.__qualname__,
313                    )
314                else:
315                    logging.exception(
316                        'Error handling raw efro.message. msg=%s', msg
317                    )
318            return rstr
319
320    def handle_raw_message_async(
321        self, bound_obj: Any, msg: str, raise_unregistered: bool = False
322    ) -> Awaitable[str]:
323        """Should be called when the receiver gets a message.
324
325        The return value is the raw response to the message.
326        """
327
328        # Note: This call is synchronous so that the first part of it can
329        # happen synchronously. If the whole call were async we wouldn't be
330        # able to guarantee that messages handlers would be called in the
331        # order the messages were received.
332
333        assert self.is_async, "can't call async handler on sync receiver"
334        msg_decoded: Message | None = None
335        msgtype: type[Message] | None = None
336        try:
337            msg_decoded = self._decode_incoming_message(bound_obj, msg)
338            msgtype = type(msg_decoded)
339            handler = self._handlers.get(msgtype)
340            if handler is None:
341                raise RuntimeError(f'Got unhandled message type: {msgtype}.')
342            handler_awaitable = handler(bound_obj, msg_decoded)
343
344        except Exception as exc:
345            if raise_unregistered and isinstance(
346                exc, UnregisteredMessageIDError
347            ):
348                raise
349            return self._handle_raw_message_async_error(
350                bound_obj, msg_decoded, msgtype, exc
351            )
352
353        # Return an awaitable to handle the rest asynchronously.
354        return self._handle_raw_message_async(
355            bound_obj, msg_decoded, msgtype, handler_awaitable
356        )
357
358    async def _handle_raw_message_async_error(
359        self,
360        bound_obj: Any,
361        msg_decoded: Message | None,
362        msgtype: type[Message] | None,
363        exc: Exception,
364    ) -> str:
365        rstr, dolog = self.encode_error_response(bound_obj, msg_decoded, exc)
366        if dolog:
367            if msgtype is not None:
368                logging.exception(
369                    'Error handling %s.%s message.',
370                    msgtype.__module__,
371                    msgtype.__qualname__,
372                )
373            else:
374                logging.exception(
375                    'Error handling raw async efro.message.'
376                    ' msgtype=%s msg_decoded=%s.',
377                    msgtype,
378                    msg_decoded,
379                )
380        return rstr
381
382    async def _handle_raw_message_async(
383        self,
384        bound_obj: Any,
385        msg_decoded: Message,
386        msgtype: type[Message] | None,
387        handler_awaitable: Awaitable[Response | None],
388    ) -> str:
389        """Should be called when the receiver gets a message.
390
391        The return value is the raw response to the message.
392        """
393        try:
394            response = await handler_awaitable
395            assert isinstance(response, Response | None)
396            return self.encode_user_response(bound_obj, msg_decoded, response)
397
398        except Exception as exc:
399            return await self._handle_raw_message_async_error(
400                bound_obj, msg_decoded, msgtype, exc
401            )

Facilitates receiving & responding to messages from a remote source.

This is instantiated at the class level with unbound methods registered as handlers for different message types in the protocol.

Example:

class MyClass: receiver = MyMessageReceiver()

# MyMessageReceiver fills out handler() overloads to ensure all
# registered handlers have valid types/return-types.
@receiver.handler
def handle_some_message_type(self, message: SomeMsg) -> SomeResponse:
    # Deal with this message type here.

This will trigger the registered handler being called.

obj = MyClass() obj.receiver.handle_raw_message(some_raw_data)

Any unhandled Exception occurring during message handling will result in an Exception being raised on the sending end.

MessageReceiver(protocol: MessageProtocol)
56    def __init__(self, protocol: MessageProtocol) -> None:
57        self.protocol = protocol
58        self._handlers: dict[type[Message], Callable] = {}
59        self._decode_filter_call: (
60            Callable[[Any, dict, Message], None] | None
61        ) = None
62        self._encode_filter_call: (
63            Callable[[Any, Message | None, Response | SysResponse, dict], None]
64            | None
65        ) = None
is_async = False
protocol
def register_handler( self, call: Callable[[Any, Message], Response | None]) -> None:
 68    def register_handler(
 69        self, call: Callable[[Any, Message], Response | None]
 70    ) -> None:
 71        """Register a handler call.
 72
 73        The message type handled by the call is determined by its
 74        type annotation.
 75        """
 76        # TODO: can use types.GenericAlias in 3.9.
 77        # (hmm though now that we're there,  it seems a drop-in
 78        # replace gives us errors. Should re-test in 3.11 as it seems
 79        # that typing_extensions handles it differently in that case)
 80        from typing import _GenericAlias  # type: ignore
 81        from typing import get_type_hints, get_args
 82
 83        sig = inspect.getfullargspec(call)
 84
 85        # The provided callable should be a method taking one 'msg' arg.
 86        expectedsig = ['self', 'msg']
 87        if sig.args != expectedsig:
 88            raise ValueError(
 89                f'Expected callable signature of {expectedsig};'
 90                f' got {sig.args}'
 91            )
 92
 93        # Make sure we are only given async methods if we are an async handler
 94        # and sync ones otherwise.
 95        # UPDATE - can't do this anymore since we now sometimes use
 96        # regular functions which return awaitables instead of having
 97        # the entire function be async.
 98        # is_async = inspect.iscoroutinefunction(call)
 99        # if self.is_async != is_async:
100        #     msg = (
101        #         'Expected a sync method; found an async one.'
102        #         if is_async
103        #         else 'Expected an async method; found a sync one.'
104        #     )
105        #     raise ValueError(msg)
106
107        # Check annotation types to determine what message types we handle.
108        # Return-type annotation can be a Union, but we probably don't
109        # have it available at runtime. Explicitly pull it in.
110        # UPDATE: we've updated our pylint filter to where we should
111        # have all annotations available.
112        # anns = get_type_hints(call, localns={'Union': Union})
113        anns = get_type_hints(call)
114
115        msgtype = anns.get('msg')
116        if not isinstance(msgtype, type):
117            raise TypeError(
118                f'expected a type for "msg" annotation; got {type(msgtype)}.'
119            )
120        assert issubclass(msgtype, Message)
121
122        ret = anns.get('return')
123        responsetypes: tuple[type[Any] | None, ...]
124
125        # Return types can be a single type or a union of types.
126        if isinstance(ret, (_GenericAlias, types.UnionType)):
127            targs = get_args(ret)
128            if not all(isinstance(a, (type, type(None))) for a in targs):
129                raise TypeError(
130                    f'expected only types for "return" annotation;'
131                    f' got {targs}.'
132                )
133            responsetypes = targs
134        else:
135            if not isinstance(ret, (type, type(None))):
136                raise TypeError(
137                    f'expected one or more types for'
138                    f' "return" annotation; got a {type(ret)}.'
139                )
140            # This seems like maybe a mypy bug. Appeared after adding
141            # types.UnionType above.
142            responsetypes = (ret,)
143
144        # This will contain NoneType for empty return cases, but
145        # we expect it to be None.
146        # noinspection PyPep8
147        responsetypes = tuple(
148            None if r is type(None) else r for r in responsetypes
149        )
150
151        # Make sure our protocol has this message type registered and our
152        # return types exactly match. (Technically we could return a subset
153        # of the supported types; can allow this in the future if it makes
154        # sense).
155        registered_types = self.protocol.message_ids_by_type.keys()
156
157        if msgtype not in registered_types:
158            raise TypeError(
159                f'Message type {msgtype} is not registered'
160                f' in this Protocol.'
161            )
162
163        if msgtype in self._handlers:
164            raise TypeError(
165                f'Message type {msgtype} already has a registered' f' handler.'
166            )
167
168        # Make sure the responses exactly matches what the message expects.
169        if set(responsetypes) != set(msgtype.get_response_types()):
170            raise TypeError(
171                f'Provided response types {responsetypes} do not'
172                f' match the set expected by message type {msgtype}: '
173                f'({msgtype.get_response_types()})'
174            )
175
176        # Ok; we're good!
177        self._handlers[msgtype] = call

Register a handler call.

The message type handled by the call is determined by its type annotation.

def decode_filter_method( self, call: Callable[[Any, dict, Message], NoneType]) -> Callable[[Any, dict, Message], NoneType]:
179    def decode_filter_method(
180        self, call: Callable[[Any, dict, Message], None]
181    ) -> Callable[[Any, dict, Message], None]:
182        """Function decorator for defining a decode filter.
183
184        Decode filters can be used to extract extra data from incoming
185        message dicts. This version will work for both handle_raw_message()
186        and handle_raw_message_async()
187        """
188        assert self._decode_filter_call is None
189        self._decode_filter_call = call
190        return call

Function decorator for defining a decode filter.

Decode filters can be used to extract extra data from incoming message dicts. This version will work for both handle_raw_message() and handle_raw_message_async()

def encode_filter_method( self, call: Callable[[Any, Message | None, Response | SysResponse, dict], NoneType]) -> Callable[[Any, Message | None, Response, dict], NoneType]:
192    def encode_filter_method(
193        self,
194        call: Callable[
195            [Any, Message | None, Response | SysResponse, dict], None
196        ],
197    ) -> Callable[[Any, Message | None, Response, dict], None]:
198        """Function decorator for defining an encode filter.
199
200        Encode filters can be used to add extra data to the message
201        dict before is is encoded to a string and sent out.
202        """
203        assert self._encode_filter_call is None
204        self._encode_filter_call = call
205        return call

Function decorator for defining an encode filter.

Encode filters can be used to add extra data to the message dict before is is encoded to a string and sent out.

def validate(self, log_only: bool = False) -> None:
207    def validate(self, log_only: bool = False) -> None:
208        """Check for handler completeness, valid types, etc."""
209        for msgtype in self.protocol.message_ids_by_type.keys():
210            if issubclass(msgtype, Response):
211                continue
212            if msgtype not in self._handlers:
213                msg = (
214                    f'Protocol message type {msgtype} is not handled'
215                    f' by receiver type {type(self)}.'
216                )
217                if log_only:
218                    logging.error(msg)
219                else:
220                    raise TypeError(msg)

Check for handler completeness, valid types, etc.

def encode_user_response( self, bound_obj: Any, message: Message, response: Response | None) -> str:
239    def encode_user_response(
240        self, bound_obj: Any, message: Message, response: Response | None
241    ) -> str:
242        """Encode a response provided by the user for sending."""
243
244        assert isinstance(response, Response | None)
245        # (user should never explicitly return error-responses)
246        assert (
247            response is None or type(response) in message.get_response_types()
248        )
249
250        # A return value of None equals EmptySysResponse.
251        out_response: Response | SysResponse
252        if response is None:
253            out_response = EmptySysResponse()
254        else:
255            out_response = response
256
257        response_dict = self.protocol.response_to_dict(out_response)
258        if self._encode_filter_call is not None:
259            self._encode_filter_call(
260                bound_obj, message, out_response, response_dict
261            )
262        return self.protocol.encode_dict(response_dict)

Encode a response provided by the user for sending.

def encode_error_response( self, bound_obj: Any, message: Message | None, exc: Exception) -> tuple[str, bool]:
264    def encode_error_response(
265        self, bound_obj: Any, message: Message | None, exc: Exception
266    ) -> tuple[str, bool]:
267        """Given an error, return sysresponse str and whether to log."""
268        response, dolog = self.protocol.error_to_response(exc)
269        response_dict = self.protocol.response_to_dict(response)
270        if self._encode_filter_call is not None:
271            self._encode_filter_call(
272                bound_obj, message, response, response_dict
273            )
274        return self.protocol.encode_dict(response_dict), dolog

Given an error, return sysresponse str and whether to log.

def handle_raw_message(self, bound_obj: Any, msg: str, raise_unregistered: bool = False) -> str:
276    def handle_raw_message(
277        self, bound_obj: Any, msg: str, raise_unregistered: bool = False
278    ) -> str:
279        """Decode, handle, and return an response for a message.
280
281        if 'raise_unregistered' is True, will raise an
282        efro.message.UnregisteredMessageIDError for messages not handled by
283        the protocol. In all other cases local errors will translate to
284        error responses returned to the sender.
285        """
286        assert not self.is_async, "can't call sync handler on async receiver"
287        msg_decoded: Message | None = None
288        msgtype: type[Message] | None = None
289        try:
290            msg_decoded = self._decode_incoming_message(bound_obj, msg)
291            msgtype = type(msg_decoded)
292            handler = self._handlers.get(msgtype)
293            if handler is None:
294                raise RuntimeError(f'Got unhandled message type: {msgtype}.')
295            response = handler(bound_obj, msg_decoded)
296            assert isinstance(response, Response | None)
297            return self.encode_user_response(bound_obj, msg_decoded, response)
298
299        except Exception as exc:
300            if raise_unregistered and isinstance(
301                exc, UnregisteredMessageIDError
302            ):
303                raise
304            rstr, dolog = self.encode_error_response(
305                bound_obj, msg_decoded, exc
306            )
307            if dolog:
308                if msgtype is not None:
309                    logging.exception(
310                        'Error handling %s.%s message.',
311                        msgtype.__module__,
312                        msgtype.__qualname__,
313                    )
314                else:
315                    logging.exception(
316                        'Error handling raw efro.message. msg=%s', msg
317                    )
318            return rstr

Decode, handle, and return an response for a message.

if 'raise_unregistered' is True, will raise an efro.message.UnregisteredMessageIDError for messages not handled by the protocol. In all other cases local errors will translate to error responses returned to the sender.

def handle_raw_message_async( self, bound_obj: Any, msg: str, raise_unregistered: bool = False) -> Awaitable[str]:
320    def handle_raw_message_async(
321        self, bound_obj: Any, msg: str, raise_unregistered: bool = False
322    ) -> Awaitable[str]:
323        """Should be called when the receiver gets a message.
324
325        The return value is the raw response to the message.
326        """
327
328        # Note: This call is synchronous so that the first part of it can
329        # happen synchronously. If the whole call were async we wouldn't be
330        # able to guarantee that messages handlers would be called in the
331        # order the messages were received.
332
333        assert self.is_async, "can't call async handler on sync receiver"
334        msg_decoded: Message | None = None
335        msgtype: type[Message] | None = None
336        try:
337            msg_decoded = self._decode_incoming_message(bound_obj, msg)
338            msgtype = type(msg_decoded)
339            handler = self._handlers.get(msgtype)
340            if handler is None:
341                raise RuntimeError(f'Got unhandled message type: {msgtype}.')
342            handler_awaitable = handler(bound_obj, msg_decoded)
343
344        except Exception as exc:
345            if raise_unregistered and isinstance(
346                exc, UnregisteredMessageIDError
347            ):
348                raise
349            return self._handle_raw_message_async_error(
350                bound_obj, msg_decoded, msgtype, exc
351            )
352
353        # Return an awaitable to handle the rest asynchronously.
354        return self._handle_raw_message_async(
355            bound_obj, msg_decoded, msgtype, handler_awaitable
356        )

Should be called when the receiver gets a message.

The return value is the raw response to the message.

class BoundMessageReceiver:
404class BoundMessageReceiver:
405    """Base bound receiver class."""
406
407    def __init__(
408        self,
409        obj: Any,
410        receiver: MessageReceiver,
411    ) -> None:
412        assert obj is not None
413        self._obj = obj
414        self._receiver = receiver
415
416    @property
417    def protocol(self) -> MessageProtocol:
418        """Protocol associated with this receiver."""
419        return self._receiver.protocol
420
421    def encode_error_response(self, exc: Exception) -> str:
422        """Given an error, return a response ready to send.
423
424        This should be used for any errors that happen outside of
425        standard handle_raw_message calls. Any errors within those
426        calls will be automatically returned as encoded strings.
427        """
428        # Passing None for Message here; we would only have that available
429        # for things going wrong in the handler (which this is not for).
430        return self._receiver.encode_error_response(self._obj, None, exc)[0]

Base bound receiver class.

BoundMessageReceiver(obj: Any, receiver: MessageReceiver)
407    def __init__(
408        self,
409        obj: Any,
410        receiver: MessageReceiver,
411    ) -> None:
412        assert obj is not None
413        self._obj = obj
414        self._receiver = receiver
protocol: MessageProtocol
416    @property
417    def protocol(self) -> MessageProtocol:
418        """Protocol associated with this receiver."""
419        return self._receiver.protocol

Protocol associated with this receiver.

def encode_error_response(self, exc: Exception) -> str:
421    def encode_error_response(self, exc: Exception) -> str:
422        """Given an error, return a response ready to send.
423
424        This should be used for any errors that happen outside of
425        standard handle_raw_message calls. Any errors within those
426        calls will be automatically returned as encoded strings.
427        """
428        # Passing None for Message here; we would only have that available
429        # for things going wrong in the handler (which this is not for).
430        return self._receiver.encode_error_response(self._obj, None, exc)[0]

Given an error, return a response ready to send.

This should be used for any errors that happen outside of standard handle_raw_message calls. Any errors within those calls will be automatically returned as encoded strings.

def create_sender_module( basename: str, protocol_create_code: str, enable_sync_sends: bool, enable_async_sends: bool, private: bool = False, protocol_module_level_import_code: str | None = None, build_time_protocol_create_code: str | None = None) -> str:
18def create_sender_module(
19    basename: str,
20    protocol_create_code: str,
21    enable_sync_sends: bool,
22    enable_async_sends: bool,
23    private: bool = False,
24    protocol_module_level_import_code: str | None = None,
25    build_time_protocol_create_code: str | None = None,
26) -> str:
27    """Create a Python module defining a MessageSender subclass.
28
29    This class is primarily for type checking and will contain overrides
30    for the varieties of send calls for message/response types defined
31    in the protocol.
32
33    Code passed for 'protocol_create_code' should import necessary
34    modules and assign an instance of the Protocol to a 'protocol'
35    variable.
36
37    Class names are based on basename; a basename 'FooSender' will
38    result in classes FooSender and BoundFooSender.
39
40    If 'private' is True, class-names will be prefixed with an '_'.
41
42    Note: output code may have long lines and should generally be run
43    through a formatter. We should perhaps move this functionality to
44    efrotools so we can include that functionality inline.
45    """
46    protocol = _protocol_from_code(
47        build_time_protocol_create_code
48        if build_time_protocol_create_code is not None
49        else protocol_create_code
50    )
51    return protocol.do_create_sender_module(
52        basename=basename,
53        protocol_create_code=protocol_create_code,
54        enable_sync_sends=enable_sync_sends,
55        enable_async_sends=enable_async_sends,
56        private=private,
57        protocol_module_level_import_code=protocol_module_level_import_code,
58    )

Create a Python module defining a MessageSender subclass.

This class is primarily for type checking and will contain overrides for the varieties of send calls for message/response types defined in the protocol.

Code passed for 'protocol_create_code' should import necessary modules and assign an instance of the Protocol to a 'protocol' variable.

Class names are based on basename; a basename 'FooSender' will result in classes FooSender and BoundFooSender.

If 'private' is True, class-names will be prefixed with an '_'.

Note: output code may have long lines and should generally be run through a formatter. We should perhaps move this functionality to efrotools so we can include that functionality inline.

def create_receiver_module( basename: str, protocol_create_code: str, is_async: bool, private: bool = False, protocol_module_level_import_code: str | None = None, build_time_protocol_create_code: str | None = None) -> str:
61def create_receiver_module(
62    basename: str,
63    protocol_create_code: str,
64    is_async: bool,
65    private: bool = False,
66    protocol_module_level_import_code: str | None = None,
67    build_time_protocol_create_code: str | None = None,
68) -> str:
69    """ "Create a Python module defining a MessageReceiver subclass.
70
71    This class is primarily for type checking and will contain overrides
72    for the register method for message/response types defined in
73    the protocol.
74
75    Class names are based on basename; a basename 'FooReceiver' will
76    result in FooReceiver and BoundFooReceiver.
77
78    If 'is_async' is True, handle_raw_message() will be an async method
79    and the @handler decorator will expect async methods.
80
81    If 'private' is True, class-names will be prefixed with an '_'.
82
83    Note that line lengths are not clipped, so output may need to be
84    run through a formatter to prevent lint warnings about excessive
85    line lengths.
86    """
87    protocol = _protocol_from_code(
88        build_time_protocol_create_code
89        if build_time_protocol_create_code is not None
90        else protocol_create_code
91    )
92    return protocol.do_create_receiver_module(
93        basename=basename,
94        protocol_create_code=protocol_create_code,
95        is_async=is_async,
96        private=private,
97        protocol_module_level_import_code=protocol_module_level_import_code,
98    )

"Create a Python module defining a MessageReceiver subclass.

This class is primarily for type checking and will contain overrides for the register method for message/response types defined in the protocol.

Class names are based on basename; a basename 'FooReceiver' will result in FooReceiver and BoundFooReceiver.

If 'is_async' is True, handle_raw_message() will be an async method and the @handler decorator will expect async methods.

If 'private' is True, class-names will be prefixed with an '_'.

Note that line lengths are not clipped, so output may need to be run through a formatter to prevent lint warnings about excessive line lengths.

class UnregisteredMessageIDError(builtins.Exception):
20class UnregisteredMessageIDError(Exception):
21    """A message or response id is not covered by our protocol."""

A message or response id is not covered by our protocol.

Inherited Members
builtins.Exception
Exception
builtins.BaseException
with_traceback
add_note
args