efro.message
Functionality for sending and responding to messages. Supports static typing for message types and possible return types.
1# Released under the MIT License. See LICENSE for details. 2# 3"""Functionality for sending and responding to messages. 4Supports static typing for message types and possible return types. 5""" 6 7from __future__ import annotations 8 9from efro.util import set_canonical_module_names 10from efro.message._protocol import MessageProtocol 11from efro.message._sender import MessageSender, BoundMessageSender 12from efro.message._receiver import MessageReceiver, BoundMessageReceiver 13from efro.message._module import create_sender_module, create_receiver_module 14from efro.message._message import ( 15 Message, 16 Response, 17 SysResponse, 18 EmptySysResponse, 19 ErrorSysResponse, 20 StringResponse, 21 BoolResponse, 22 UnregisteredMessageIDError, 23) 24 25__all__ = [ 26 'Message', 27 'Response', 28 'SysResponse', 29 'EmptySysResponse', 30 'ErrorSysResponse', 31 'StringResponse', 32 'BoolResponse', 33 'MessageProtocol', 34 'MessageSender', 35 'BoundMessageSender', 36 'MessageReceiver', 37 'BoundMessageReceiver', 38 'create_sender_module', 39 'create_receiver_module', 40 'UnregisteredMessageIDError', 41] 42 43# Have these things present themselves cleanly as 'thismodule.SomeClass' 44# instead of 'thismodule._internalmodule.SomeClass' 45set_canonical_module_names(globals())
24class Message: 25 """Base class for messages.""" 26 27 @classmethod 28 def get_response_types(cls) -> list[type[Response] | None]: 29 """Return all Response types this Message can return when sent. 30 31 The default implementation specifies a None return type. 32 """ 33 return [None]
Base class for messages.
27 @classmethod 28 def get_response_types(cls) -> list[type[Response] | None]: 29 """Return all Response types this Message can return when sent. 30 31 The default implementation specifies a None return type. 32 """ 33 return [None]
Return all Response types this Message can return when sent.
The default implementation specifies a None return type.
Base class for responses to messages.
40class SysResponse: 41 """Base class for system-responses to messages. 42 43 These are only sent/handled by the messaging system itself; 44 users of the api never see them. 45 """ 46 47 def set_local_exception(self, exc: Exception) -> None: 48 """Attach a local exception to facilitate better logging/handling. 49 50 Be aware that this data does not get serialized and only 51 exists on the local object. 52 """ 53 setattr(self, '_sr_local_exception', exc) 54 55 def get_local_exception(self) -> Exception | None: 56 """Fetch a local attached exception.""" 57 value = getattr(self, '_sr_local_exception', None) 58 assert isinstance(value, Exception | None) 59 return value
Base class for system-responses to messages.
These are only sent/handled by the messaging system itself; users of the api never see them.
47 def set_local_exception(self, exc: Exception) -> None: 48 """Attach a local exception to facilitate better logging/handling. 49 50 Be aware that this data does not get serialized and only 51 exists on the local object. 52 """ 53 setattr(self, '_sr_local_exception', exc)
Attach a local exception to facilitate better logging/handling.
Be aware that this data does not get serialized and only exists on the local object.
86@ioprepped 87@dataclass 88class EmptySysResponse(SysResponse): 89 """The response equivalent of None."""
The response equivalent of None.
Inherited Members
65@ioprepped 66@dataclass 67class ErrorSysResponse(SysResponse): 68 """SysResponse saying some error has occurred for the send. 69 70 This generally results in an Exception being raised for the caller. 71 """ 72 73 class ErrorType(Enum): 74 """Type of error that occurred while sending a message.""" 75 76 REMOTE = 0 77 REMOTE_CLEAN = 1 78 LOCAL = 2 79 COMMUNICATION = 3 80 REMOTE_COMMUNICATION = 4 81 82 error_message: Annotated[str, IOAttrs('m')] 83 error_type: Annotated[ErrorType, IOAttrs('e')] = ErrorType.REMOTE
SysResponse saying some error has occurred for the send.
This generally results in an Exception being raised for the caller.
Inherited Members
73 class ErrorType(Enum): 74 """Type of error that occurred while sending a message.""" 75 76 REMOTE = 0 77 REMOTE_CLEAN = 1 78 LOCAL = 2 79 COMMUNICATION = 3 80 REMOTE_COMMUNICATION = 4
Type of error that occurred while sending a message.
Inherited Members
- enum.Enum
- name
- value
104@ioprepped 105@dataclass 106class StringResponse(Response): 107 """A simple string value response.""" 108 109 value: Annotated[str, IOAttrs('v')]
A simple string value response.
96@ioprepped 97@dataclass 98class BoolResponse(Response): 99 """A simple bool value response.""" 100 101 value: Annotated[bool, IOAttrs('v')]
A simple bool value response.
33class MessageProtocol: 34 """Wrangles a set of message types, formats, and response types. 35 Both endpoints must be using a compatible Protocol for communication 36 to succeed. To maintain Protocol compatibility between revisions, 37 all message types must retain the same id, message attr storage 38 names must not change, newly added attrs must have default values, 39 etc. 40 """ 41 42 def __init__( 43 self, 44 message_types: dict[int, type[Message]], 45 response_types: dict[int, type[Response]], 46 forward_communication_errors: bool = False, 47 forward_clean_errors: bool = False, 48 remote_errors_include_stack_traces: bool = False, 49 log_remote_errors: bool = True, 50 ) -> None: 51 """Create a protocol with a given configuration. 52 53 If 'forward_communication_errors' is True, 54 efro.error.CommunicationErrors raised on the receiver end will 55 result in a matching error raised back on the sender. This can 56 be useful if the receiver will be in some way forwarding 57 messages along and the sender doesn't need to know where 58 communication breakdowns occurred; only that they did. 59 60 If 'forward_clean_errors' is True, efro.error.CleanError 61 exceptions raised on the receiver end will result in a matching 62 CleanError raised back on the sender. 63 64 When an exception is not covered by the optional forwarding 65 mechanisms above, it will come across as efro.error.RemoteError 66 and the exception will be logged on the receiver 67 end - at least by default (see details below). 68 69 If 'remote_errors_include_stack_traces' is True, stringified 70 stack traces will be returned with efro.error.RemoteError 71 exceptions. This is useful for debugging but should only be 72 enabled in cases where the sender is trusted to see internal 73 details of the receiver. 74 75 By default, when a message-handling exception will result in an 76 efro.error.RemoteError being returned to the sender, the 77 exception will be logged on the receiver. This is because the 78 goal is usually to avoid returning opaque RemoteErrors and to 79 instead return something meaningful as part of the expected 80 response type (even if that value itself represents a logical 81 error state). If 'log_remote_errors' is False, however, such 82 exceptions will not be logged on the receiver. This can be 83 useful in combination with 'remote_errors_include_stack_traces' 84 and 'forward_clean_errors' in situations where all error 85 logging/management will be happening on the sender end. Be 86 aware, however, that in that case it may be possible for 87 communication errors to prevent such error messages from 88 ever being seen. 89 """ 90 # pylint: disable=too-many-locals 91 self.message_types_by_id: dict[int, type[Message]] = {} 92 self.message_ids_by_type: dict[type[Message], int] = {} 93 self.response_types_by_id: dict[ 94 int, type[Response] | type[SysResponse] 95 ] = {} 96 self.response_ids_by_type: dict[ 97 type[Response] | type[SysResponse], int 98 ] = {} 99 for m_id, m_type in message_types.items(): 100 # Make sure only valid message types were passed and each 101 # id was assigned only once. 102 assert isinstance(m_id, int) 103 assert m_id >= 0 104 assert is_ioprepped_dataclass(m_type) and issubclass( 105 m_type, Message 106 ) 107 assert self.message_types_by_id.get(m_id) is None 108 self.message_types_by_id[m_id] = m_type 109 self.message_ids_by_type[m_type] = m_id 110 111 for r_id, r_type in response_types.items(): 112 assert isinstance(r_id, int) 113 assert r_id >= 0 114 assert is_ioprepped_dataclass(r_type) and issubclass( 115 r_type, Response 116 ) 117 assert self.response_types_by_id.get(r_id) is None 118 self.response_types_by_id[r_id] = r_type 119 self.response_ids_by_type[r_type] = r_id 120 121 # Register our SysResponse types. These use negative 122 # IDs so as to never overlap with user Response types. 123 def _reg_sys(reg_tp: type[SysResponse], reg_id: int) -> None: 124 assert self.response_types_by_id.get(reg_id) is None 125 self.response_types_by_id[reg_id] = reg_tp 126 self.response_ids_by_type[reg_tp] = reg_id 127 128 _reg_sys(ErrorSysResponse, -1) 129 _reg_sys(EmptySysResponse, -2) 130 131 # Some extra-thorough validation in debug mode. 132 if __debug__: 133 # Make sure all Message types' return types are valid 134 # and have been assigned an ID as well. 135 all_response_types: set[type[Response] | None] = set() 136 for m_id, m_type in message_types.items(): 137 m_rtypes = m_type.get_response_types() 138 139 assert isinstance(m_rtypes, list) 140 assert ( 141 m_rtypes 142 ), f'Message type {m_type} specifies no return types.' 143 assert len(set(m_rtypes)) == len(m_rtypes) # check dups 144 for m_rtype in m_rtypes: 145 all_response_types.add(m_rtype) 146 for cls in all_response_types: 147 if cls is None: 148 continue 149 assert is_ioprepped_dataclass(cls) 150 assert issubclass(cls, Response) 151 if cls not in self.response_ids_by_type: 152 raise ValueError( 153 f'Possible response type {cls} needs to be included' 154 f' in response_types for this protocol.' 155 ) 156 157 # Make sure all registered types have unique base names. 158 # We can take advantage of this to generate cleaner looking 159 # protocol modules. Can revisit if this is ever a problem. 160 mtypenames = set(tp.__name__ for tp in self.message_ids_by_type) 161 if len(mtypenames) != len(message_types): 162 raise ValueError( 163 'message_types contains duplicate __name__s;' 164 ' all types are required to have unique names.' 165 ) 166 167 self.forward_clean_errors = forward_clean_errors 168 self.forward_communication_errors = forward_communication_errors 169 self.remote_errors_include_stack_traces = ( 170 remote_errors_include_stack_traces 171 ) 172 self.log_remote_errors = log_remote_errors 173 174 @staticmethod 175 def encode_dict(obj: dict) -> str: 176 """Json-encode a provided dict.""" 177 return json.dumps(obj, separators=(',', ':')) 178 179 def message_to_dict(self, message: Message) -> dict: 180 """Encode a message to a json ready dict.""" 181 return self._to_dict(message, self.message_ids_by_type, 'message') 182 183 def response_to_dict(self, response: Response | SysResponse) -> dict: 184 """Encode a response to a json ready dict.""" 185 return self._to_dict(response, self.response_ids_by_type, 'response') 186 187 def error_to_response(self, exc: Exception) -> tuple[SysResponse, bool]: 188 """Translate an Exception to a SysResponse. 189 190 Also returns whether the error should be logged if this happened 191 within handle_raw_message(). 192 """ 193 194 # If anything goes wrong, return a ErrorSysResponse instead. 195 # (either CLEAN or generic REMOTE) 196 if self.forward_clean_errors and isinstance(exc, CleanError): 197 return ( 198 ErrorSysResponse( 199 error_message=str(exc), 200 error_type=ErrorSysResponse.ErrorType.REMOTE_CLEAN, 201 ), 202 False, 203 ) 204 if self.forward_communication_errors and isinstance( 205 exc, CommunicationError 206 ): 207 return ( 208 ErrorSysResponse( 209 error_message=str(exc), 210 error_type=ErrorSysResponse.ErrorType.REMOTE_COMMUNICATION, 211 ), 212 False, 213 ) 214 return ( 215 ErrorSysResponse( 216 error_message=( 217 traceback.format_exc() 218 if self.remote_errors_include_stack_traces 219 else 'An internal error has occurred.' 220 ), 221 error_type=ErrorSysResponse.ErrorType.REMOTE, 222 ), 223 self.log_remote_errors, 224 ) 225 226 def _to_dict( 227 self, message: Any, ids_by_type: dict[type, int], opname: str 228 ) -> dict: 229 """Encode a message to a json string for transport.""" 230 231 m_id: int | None = ids_by_type.get(type(message)) 232 if m_id is None: 233 raise TypeError( 234 f'{opname} type is not registered in protocol:' 235 f' {type(message)}' 236 ) 237 out = {'t': m_id, 'm': dataclass_to_dict(message)} 238 return out 239 240 @staticmethod 241 def decode_dict(data: str) -> dict: 242 """Decode data to a dict.""" 243 out = json.loads(data) 244 assert isinstance(out, dict) 245 return out 246 247 def message_from_dict(self, data: dict) -> Message: 248 """Decode a message from a json string.""" 249 out = self._from_dict(data, self.message_types_by_id, 'message') 250 assert isinstance(out, Message) 251 return out 252 253 def response_from_dict(self, data: dict) -> Response | SysResponse: 254 """Decode a response from a json string.""" 255 out = self._from_dict(data, self.response_types_by_id, 'response') 256 assert isinstance(out, Response | SysResponse) 257 return out 258 259 # Weeeird; we get mypy errors returning dict[int, type] but 260 # dict[int, typing.Type] or dict[int, type[Any]] works.. 261 def _from_dict( 262 self, data: dict, types_by_id: dict[int, type[Any]], opname: str 263 ) -> Any: 264 """Decode a message from a json string.""" 265 msgdict: dict | None 266 267 m_id = data.get('t') 268 # Allow omitting 'm' dict if its empty. 269 msgdict = data.get('m', {}) 270 271 assert isinstance(m_id, int) 272 assert isinstance(msgdict, dict) 273 274 # Decode this particular type. 275 msgtype = types_by_id.get(m_id) 276 if msgtype is None: 277 raise UnregisteredMessageIDError( 278 f'Got unregistered {opname} id of {m_id}.' 279 ) 280 return dataclass_from_dict(msgtype, msgdict) 281 282 def _get_module_header( 283 self, 284 part: Literal['sender', 'receiver'], 285 extra_import_code: str | None, 286 enable_async_sends: bool, 287 ) -> str: 288 """Return common parts of generated modules.""" 289 # pylint: disable=too-many-locals 290 # pylint: disable=too-many-branches 291 # pylint: disable=too-many-statements 292 import textwrap 293 294 tpimports: dict[str, list[str]] = {} 295 imports: dict[str, list[str]] = {} 296 297 single_message_type = len(self.message_ids_by_type) == 1 298 299 msgtypes = list(self.message_ids_by_type) 300 if part == 'sender': 301 msgtypes.append(Message) 302 for msgtype in msgtypes: 303 tpimports.setdefault(msgtype.__module__, []).append( 304 msgtype.__name__ 305 ) 306 rsptypes = list(self.response_ids_by_type) 307 if part == 'sender': 308 rsptypes.append(Response) 309 for rsp_tp in rsptypes: 310 # Skip these as they don't actually show up in code. 311 if rsp_tp is EmptySysResponse or rsp_tp is ErrorSysResponse: 312 continue 313 if ( 314 single_message_type 315 and part == 'sender' 316 and rsp_tp is not Response 317 ): 318 # We need to cast to the single supported response type 319 # in this case so need response types at runtime. 320 imports.setdefault(rsp_tp.__module__, []).append( 321 rsp_tp.__name__ 322 ) 323 else: 324 tpimports.setdefault(rsp_tp.__module__, []).append( 325 rsp_tp.__name__ 326 ) 327 328 import_lines = '' 329 tpimport_lines = '' 330 331 for module, names in sorted(imports.items()): 332 jnames = ', '.join(names) 333 line = f'from {module} import {jnames}' 334 if len(line) > 80: 335 # Recreate in a wrapping-friendly form. 336 line = f'from {module} import ({jnames})' 337 import_lines += f'{line}\n' 338 for module, names in sorted(tpimports.items()): 339 jnames = ', '.join(names) 340 line = f'from {module} import {jnames}' 341 if len(line) > 75: # Account for indent 342 # Recreate in a wrapping-friendly form. 343 line = f'from {module} import ({jnames})' 344 tpimport_lines += f'{line}\n' 345 346 if part == 'sender': 347 import_lines += ( 348 'from efro.message import MessageSender, BoundMessageSender' 349 ) 350 tpimport_typing_extras = '' 351 else: 352 if single_message_type: 353 import_lines += ( 354 'from efro.message import (MessageReceiver,' 355 ' BoundMessageReceiver, Message, Response)' 356 ) 357 else: 358 import_lines += ( 359 'from efro.message import MessageReceiver,' 360 ' BoundMessageReceiver' 361 ) 362 tpimport_typing_extras = ', Awaitable' 363 364 if extra_import_code is not None: 365 import_lines += f'\n{extra_import_code}\n' 366 367 ovld = ', overload' if not single_message_type else '' 368 ovld2 = ( 369 ', cast, Awaitable' 370 if (single_message_type and part == 'sender' and enable_async_sends) 371 else '' 372 ) 373 tpimport_lines = textwrap.indent(tpimport_lines, ' ') 374 375 baseimps = ['Any'] 376 if part == 'receiver': 377 baseimps.append('Callable') 378 if part == 'sender' and enable_async_sends: 379 baseimps.append('Awaitable') 380 baseimps_s = ', '.join(baseimps) 381 out = ( 382 '# Released under the MIT License. See LICENSE for details.\n' 383 f'#\n' 384 f'"""Auto-generated {part} module. Do not edit by hand."""\n' 385 f'\n' 386 f'from __future__ import annotations\n' 387 f'\n' 388 f'from typing import TYPE_CHECKING{ovld}{ovld2}\n' 389 f'\n' 390 f'{import_lines}\n' 391 f'\n' 392 f'if TYPE_CHECKING:\n' 393 f' from typing import {baseimps_s}' 394 f'{tpimport_typing_extras}\n' 395 f'{tpimport_lines}' 396 f'\n' 397 f'\n' 398 ) 399 return out 400 401 def do_create_sender_module( 402 self, 403 basename: str, 404 protocol_create_code: str, 405 enable_sync_sends: bool, 406 enable_async_sends: bool, 407 private: bool = False, 408 protocol_module_level_import_code: str | None = None, 409 ) -> str: 410 """Used by create_sender_module(); do not call directly.""" 411 # pylint: disable=too-many-locals 412 # pylint: disable=too-many-branches 413 import textwrap 414 415 msgtypes = list(self.message_ids_by_type.keys()) 416 417 ppre = '_' if private else '' 418 out = self._get_module_header( 419 'sender', 420 extra_import_code=protocol_module_level_import_code, 421 enable_async_sends=enable_async_sends, 422 ) 423 ccind = textwrap.indent(protocol_create_code, ' ') 424 out += ( 425 f'class {ppre}{basename}(MessageSender):\n' 426 f' """Protocol-specific sender."""\n' 427 f'\n' 428 f' def __init__(self) -> None:\n' 429 f'{ccind}\n' 430 f' super().__init__(protocol)\n' 431 f'\n' 432 f' def __get__(\n' 433 f' self, obj: Any, type_in: Any = None\n' 434 f' ) -> {ppre}Bound{basename}:\n' 435 f' return {ppre}Bound{basename}(obj, self)\n' 436 f'\n' 437 f'\n' 438 f'class {ppre}Bound{basename}(BoundMessageSender):\n' 439 f' """Protocol-specific bound sender."""\n' 440 ) 441 442 def _filt_tp_name(rtype: type[Response] | None) -> str: 443 return 'None' if rtype is None else rtype.__name__ 444 445 # Define handler() overloads for all registered message types. 446 if msgtypes: 447 for async_pass in False, True: 448 if async_pass and not enable_async_sends: 449 continue 450 if not async_pass and not enable_sync_sends: 451 continue 452 pfx = 'async ' if async_pass else '' 453 sfx = '_async' if async_pass else '' 454 # awt = 'await ' if async_pass else '' 455 awt = '' 456 how = 'asynchronously' if async_pass else 'synchronously' 457 458 if len(msgtypes) == 1: 459 # Special case: with a single message types we don't 460 # use overloads. 461 msgtype = msgtypes[0] 462 msgtypevar = msgtype.__name__ 463 rtypes = msgtype.get_response_types() 464 if len(rtypes) > 1: 465 rtypevar = ' | '.join(_filt_tp_name(t) for t in rtypes) 466 else: 467 rtypevar = _filt_tp_name(rtypes[0]) 468 if async_pass: 469 rtypevar = f'Awaitable[{rtypevar}]' 470 out += ( 471 f'\n' 472 f' def send{sfx}(self,' 473 f' message: {msgtypevar})' 474 f' -> {rtypevar}:\n' 475 f' """Send a message {how}."""\n' 476 f' out = {awt}self._sender.' 477 f'send{sfx}(self._obj, message)\n' 478 ) 479 if not async_pass: 480 out += ( 481 f' assert isinstance(out, {rtypevar})\n' 482 ' return out\n' 483 ) 484 else: 485 out += f' return cast({rtypevar}, out)\n' 486 487 else: 488 for msgtype in msgtypes: 489 msgtypevar = msgtype.__name__ 490 rtypes = msgtype.get_response_types() 491 if len(rtypes) > 1: 492 rtypevar = ' | '.join( 493 _filt_tp_name(t) for t in rtypes 494 ) 495 else: 496 rtypevar = _filt_tp_name(rtypes[0]) 497 out += ( 498 f'\n' 499 f' @overload\n' 500 f' {pfx}def send{sfx}(self,' 501 f' message: {msgtypevar})' 502 f' -> {rtypevar}:\n' 503 f' ...\n' 504 ) 505 rtypevar = 'Response | None' 506 if async_pass: 507 rtypevar = f'Awaitable[{rtypevar}]' 508 out += ( 509 f'\n' 510 f' def send{sfx}(self, message: Message)' 511 f' -> {rtypevar}:\n' 512 f' """Send a message {how}."""\n' 513 f' return {awt}self._sender.' 514 f'send{sfx}(self._obj, message)\n' 515 ) 516 517 return out 518 519 def do_create_receiver_module( 520 self, 521 basename: str, 522 protocol_create_code: str, 523 is_async: bool, 524 private: bool = False, 525 protocol_module_level_import_code: str | None = None, 526 ) -> str: 527 """Used by create_receiver_module(); do not call directly.""" 528 # pylint: disable=too-many-locals 529 import textwrap 530 531 desc = 'asynchronous' if is_async else 'synchronous' 532 ppre = '_' if private else '' 533 msgtypes = list(self.message_ids_by_type.keys()) 534 out = self._get_module_header( 535 'receiver', 536 extra_import_code=protocol_module_level_import_code, 537 enable_async_sends=False, 538 ) 539 ccind = textwrap.indent(protocol_create_code, ' ') 540 out += ( 541 f'class {ppre}{basename}(MessageReceiver):\n' 542 f' """Protocol-specific {desc} receiver."""\n' 543 f'\n' 544 f' is_async = {is_async}\n' 545 f'\n' 546 f' def __init__(self) -> None:\n' 547 f'{ccind}\n' 548 f' super().__init__(protocol)\n' 549 f'\n' 550 f' def __get__(\n' 551 f' self,\n' 552 f' obj: Any,\n' 553 f' type_in: Any = None,\n' 554 f' ) -> {ppre}Bound{basename}:\n' 555 f' return {ppre}Bound{basename}(' 556 f'obj, self)\n' 557 ) 558 559 # Define handler() overloads for all registered message types. 560 561 def _filt_tp_name(rtype: type[Response] | None) -> str: 562 return 'None' if rtype is None else rtype.__name__ 563 564 if msgtypes: 565 cbgn = 'Awaitable[' if is_async else '' 566 cend = ']' if is_async else '' 567 if len(msgtypes) == 1: 568 # Special case: when we have a single message type we don't 569 # use overloads. 570 msgtype = msgtypes[0] 571 msgtypevar = msgtype.__name__ 572 rtypes = msgtype.get_response_types() 573 if len(rtypes) > 1: 574 rtypevar = ' | '.join(_filt_tp_name(t) for t in rtypes) 575 else: 576 rtypevar = _filt_tp_name(rtypes[0]) 577 rtypevar = f'{cbgn}{rtypevar}{cend}' 578 out += ( 579 f'\n' 580 f' def handler(\n' 581 f' self,\n' 582 f' call: Callable[[Any, {msgtypevar}], ' 583 f'{rtypevar}],\n' 584 f' )' 585 f' -> Callable[[Any, {msgtypevar}], {rtypevar}]:\n' 586 f' """Decorator to register message handlers."""\n' 587 f' from typing import cast, Callable, Any\n' 588 f'\n' 589 f' self.register_handler(cast(Callable' 590 f'[[Any, Message], Response], call))\n' 591 f' return call\n' 592 ) 593 else: 594 for msgtype in msgtypes: 595 msgtypevar = msgtype.__name__ 596 rtypes = msgtype.get_response_types() 597 if len(rtypes) > 1: 598 rtypevar = ' | '.join(_filt_tp_name(t) for t in rtypes) 599 else: 600 rtypevar = _filt_tp_name(rtypes[0]) 601 rtypevar = f'{cbgn}{rtypevar}{cend}' 602 out += ( 603 f'\n' 604 f' @overload\n' 605 f' def handler(\n' 606 f' self,\n' 607 f' call: Callable[[Any, {msgtypevar}], ' 608 f'{rtypevar}],\n' 609 f' )' 610 f' -> Callable[[Any, {msgtypevar}], {rtypevar}]:\n' 611 f' ...\n' 612 ) 613 out += ( 614 '\n' 615 ' def handler(self, call: Callable) -> Callable:\n' 616 ' """Decorator to register message handlers."""\n' 617 ' self.register_handler(call)\n' 618 ' return call\n' 619 ) 620 621 out += ( 622 f'\n' 623 f'\n' 624 f'class {ppre}Bound{basename}(BoundMessageReceiver):\n' 625 f' """Protocol-specific bound receiver."""\n' 626 ) 627 if is_async: 628 out += ( 629 '\n' 630 ' def handle_raw_message(\n' 631 ' self, message: str, raise_unregistered: bool = False\n' 632 ' ) -> Awaitable[str]:\n' 633 ' """Asynchronously handle a raw incoming message."""\n' 634 ' return self._receiver.' 635 'handle_raw_message_async(\n' 636 ' self._obj, message, raise_unregistered\n' 637 ' )\n' 638 ) 639 640 else: 641 out += ( 642 '\n' 643 ' def handle_raw_message(\n' 644 ' self, message: str, raise_unregistered: bool = False\n' 645 ' ) -> str:\n' 646 ' """Synchronously handle a raw incoming message."""\n' 647 ' return self._receiver.handle_raw_message(\n' 648 ' self._obj, message, raise_unregistered\n' 649 ' )\n' 650 ) 651 652 return out
Wrangles a set of message types, formats, and response types. Both endpoints must be using a compatible Protocol for communication to succeed. To maintain Protocol compatibility between revisions, all message types must retain the same id, message attr storage names must not change, newly added attrs must have default values, etc.
42 def __init__( 43 self, 44 message_types: dict[int, type[Message]], 45 response_types: dict[int, type[Response]], 46 forward_communication_errors: bool = False, 47 forward_clean_errors: bool = False, 48 remote_errors_include_stack_traces: bool = False, 49 log_remote_errors: bool = True, 50 ) -> None: 51 """Create a protocol with a given configuration. 52 53 If 'forward_communication_errors' is True, 54 efro.error.CommunicationErrors raised on the receiver end will 55 result in a matching error raised back on the sender. This can 56 be useful if the receiver will be in some way forwarding 57 messages along and the sender doesn't need to know where 58 communication breakdowns occurred; only that they did. 59 60 If 'forward_clean_errors' is True, efro.error.CleanError 61 exceptions raised on the receiver end will result in a matching 62 CleanError raised back on the sender. 63 64 When an exception is not covered by the optional forwarding 65 mechanisms above, it will come across as efro.error.RemoteError 66 and the exception will be logged on the receiver 67 end - at least by default (see details below). 68 69 If 'remote_errors_include_stack_traces' is True, stringified 70 stack traces will be returned with efro.error.RemoteError 71 exceptions. This is useful for debugging but should only be 72 enabled in cases where the sender is trusted to see internal 73 details of the receiver. 74 75 By default, when a message-handling exception will result in an 76 efro.error.RemoteError being returned to the sender, the 77 exception will be logged on the receiver. This is because the 78 goal is usually to avoid returning opaque RemoteErrors and to 79 instead return something meaningful as part of the expected 80 response type (even if that value itself represents a logical 81 error state). If 'log_remote_errors' is False, however, such 82 exceptions will not be logged on the receiver. This can be 83 useful in combination with 'remote_errors_include_stack_traces' 84 and 'forward_clean_errors' in situations where all error 85 logging/management will be happening on the sender end. Be 86 aware, however, that in that case it may be possible for 87 communication errors to prevent such error messages from 88 ever being seen. 89 """ 90 # pylint: disable=too-many-locals 91 self.message_types_by_id: dict[int, type[Message]] = {} 92 self.message_ids_by_type: dict[type[Message], int] = {} 93 self.response_types_by_id: dict[ 94 int, type[Response] | type[SysResponse] 95 ] = {} 96 self.response_ids_by_type: dict[ 97 type[Response] | type[SysResponse], int 98 ] = {} 99 for m_id, m_type in message_types.items(): 100 # Make sure only valid message types were passed and each 101 # id was assigned only once. 102 assert isinstance(m_id, int) 103 assert m_id >= 0 104 assert is_ioprepped_dataclass(m_type) and issubclass( 105 m_type, Message 106 ) 107 assert self.message_types_by_id.get(m_id) is None 108 self.message_types_by_id[m_id] = m_type 109 self.message_ids_by_type[m_type] = m_id 110 111 for r_id, r_type in response_types.items(): 112 assert isinstance(r_id, int) 113 assert r_id >= 0 114 assert is_ioprepped_dataclass(r_type) and issubclass( 115 r_type, Response 116 ) 117 assert self.response_types_by_id.get(r_id) is None 118 self.response_types_by_id[r_id] = r_type 119 self.response_ids_by_type[r_type] = r_id 120 121 # Register our SysResponse types. These use negative 122 # IDs so as to never overlap with user Response types. 123 def _reg_sys(reg_tp: type[SysResponse], reg_id: int) -> None: 124 assert self.response_types_by_id.get(reg_id) is None 125 self.response_types_by_id[reg_id] = reg_tp 126 self.response_ids_by_type[reg_tp] = reg_id 127 128 _reg_sys(ErrorSysResponse, -1) 129 _reg_sys(EmptySysResponse, -2) 130 131 # Some extra-thorough validation in debug mode. 132 if __debug__: 133 # Make sure all Message types' return types are valid 134 # and have been assigned an ID as well. 135 all_response_types: set[type[Response] | None] = set() 136 for m_id, m_type in message_types.items(): 137 m_rtypes = m_type.get_response_types() 138 139 assert isinstance(m_rtypes, list) 140 assert ( 141 m_rtypes 142 ), f'Message type {m_type} specifies no return types.' 143 assert len(set(m_rtypes)) == len(m_rtypes) # check dups 144 for m_rtype in m_rtypes: 145 all_response_types.add(m_rtype) 146 for cls in all_response_types: 147 if cls is None: 148 continue 149 assert is_ioprepped_dataclass(cls) 150 assert issubclass(cls, Response) 151 if cls not in self.response_ids_by_type: 152 raise ValueError( 153 f'Possible response type {cls} needs to be included' 154 f' in response_types for this protocol.' 155 ) 156 157 # Make sure all registered types have unique base names. 158 # We can take advantage of this to generate cleaner looking 159 # protocol modules. Can revisit if this is ever a problem. 160 mtypenames = set(tp.__name__ for tp in self.message_ids_by_type) 161 if len(mtypenames) != len(message_types): 162 raise ValueError( 163 'message_types contains duplicate __name__s;' 164 ' all types are required to have unique names.' 165 ) 166 167 self.forward_clean_errors = forward_clean_errors 168 self.forward_communication_errors = forward_communication_errors 169 self.remote_errors_include_stack_traces = ( 170 remote_errors_include_stack_traces 171 ) 172 self.log_remote_errors = log_remote_errors
Create a protocol with a given configuration.
If 'forward_communication_errors' is True, efro.error.CommunicationErrors raised on the receiver end will result in a matching error raised back on the sender. This can be useful if the receiver will be in some way forwarding messages along and the sender doesn't need to know where communication breakdowns occurred; only that they did.
If 'forward_clean_errors' is True, efro.error.CleanError exceptions raised on the receiver end will result in a matching CleanError raised back on the sender.
When an exception is not covered by the optional forwarding mechanisms above, it will come across as efro.error.RemoteError and the exception will be logged on the receiver end - at least by default (see details below).
If 'remote_errors_include_stack_traces' is True, stringified stack traces will be returned with efro.error.RemoteError exceptions. This is useful for debugging but should only be enabled in cases where the sender is trusted to see internal details of the receiver.
By default, when a message-handling exception will result in an efro.error.RemoteError being returned to the sender, the exception will be logged on the receiver. This is because the goal is usually to avoid returning opaque RemoteErrors and to instead return something meaningful as part of the expected response type (even if that value itself represents a logical error state). If 'log_remote_errors' is False, however, such exceptions will not be logged on the receiver. This can be useful in combination with 'remote_errors_include_stack_traces' and 'forward_clean_errors' in situations where all error logging/management will be happening on the sender end. Be aware, however, that in that case it may be possible for communication errors to prevent such error messages from ever being seen.
174 @staticmethod 175 def encode_dict(obj: dict) -> str: 176 """Json-encode a provided dict.""" 177 return json.dumps(obj, separators=(',', ':'))
Json-encode a provided dict.
179 def message_to_dict(self, message: Message) -> dict: 180 """Encode a message to a json ready dict.""" 181 return self._to_dict(message, self.message_ids_by_type, 'message')
Encode a message to a json ready dict.
183 def response_to_dict(self, response: Response | SysResponse) -> dict: 184 """Encode a response to a json ready dict.""" 185 return self._to_dict(response, self.response_ids_by_type, 'response')
Encode a response to a json ready dict.
187 def error_to_response(self, exc: Exception) -> tuple[SysResponse, bool]: 188 """Translate an Exception to a SysResponse. 189 190 Also returns whether the error should be logged if this happened 191 within handle_raw_message(). 192 """ 193 194 # If anything goes wrong, return a ErrorSysResponse instead. 195 # (either CLEAN or generic REMOTE) 196 if self.forward_clean_errors and isinstance(exc, CleanError): 197 return ( 198 ErrorSysResponse( 199 error_message=str(exc), 200 error_type=ErrorSysResponse.ErrorType.REMOTE_CLEAN, 201 ), 202 False, 203 ) 204 if self.forward_communication_errors and isinstance( 205 exc, CommunicationError 206 ): 207 return ( 208 ErrorSysResponse( 209 error_message=str(exc), 210 error_type=ErrorSysResponse.ErrorType.REMOTE_COMMUNICATION, 211 ), 212 False, 213 ) 214 return ( 215 ErrorSysResponse( 216 error_message=( 217 traceback.format_exc() 218 if self.remote_errors_include_stack_traces 219 else 'An internal error has occurred.' 220 ), 221 error_type=ErrorSysResponse.ErrorType.REMOTE, 222 ), 223 self.log_remote_errors, 224 )
Translate an Exception to a SysResponse.
Also returns whether the error should be logged if this happened within handle_raw_message().
240 @staticmethod 241 def decode_dict(data: str) -> dict: 242 """Decode data to a dict.""" 243 out = json.loads(data) 244 assert isinstance(out, dict) 245 return out
Decode data to a dict.
247 def message_from_dict(self, data: dict) -> Message: 248 """Decode a message from a json string.""" 249 out = self._from_dict(data, self.message_types_by_id, 'message') 250 assert isinstance(out, Message) 251 return out
Decode a message from a json string.
253 def response_from_dict(self, data: dict) -> Response | SysResponse: 254 """Decode a response from a json string.""" 255 out = self._from_dict(data, self.response_types_by_id, 'response') 256 assert isinstance(out, Response | SysResponse) 257 return out
Decode a response from a json string.
401 def do_create_sender_module( 402 self, 403 basename: str, 404 protocol_create_code: str, 405 enable_sync_sends: bool, 406 enable_async_sends: bool, 407 private: bool = False, 408 protocol_module_level_import_code: str | None = None, 409 ) -> str: 410 """Used by create_sender_module(); do not call directly.""" 411 # pylint: disable=too-many-locals 412 # pylint: disable=too-many-branches 413 import textwrap 414 415 msgtypes = list(self.message_ids_by_type.keys()) 416 417 ppre = '_' if private else '' 418 out = self._get_module_header( 419 'sender', 420 extra_import_code=protocol_module_level_import_code, 421 enable_async_sends=enable_async_sends, 422 ) 423 ccind = textwrap.indent(protocol_create_code, ' ') 424 out += ( 425 f'class {ppre}{basename}(MessageSender):\n' 426 f' """Protocol-specific sender."""\n' 427 f'\n' 428 f' def __init__(self) -> None:\n' 429 f'{ccind}\n' 430 f' super().__init__(protocol)\n' 431 f'\n' 432 f' def __get__(\n' 433 f' self, obj: Any, type_in: Any = None\n' 434 f' ) -> {ppre}Bound{basename}:\n' 435 f' return {ppre}Bound{basename}(obj, self)\n' 436 f'\n' 437 f'\n' 438 f'class {ppre}Bound{basename}(BoundMessageSender):\n' 439 f' """Protocol-specific bound sender."""\n' 440 ) 441 442 def _filt_tp_name(rtype: type[Response] | None) -> str: 443 return 'None' if rtype is None else rtype.__name__ 444 445 # Define handler() overloads for all registered message types. 446 if msgtypes: 447 for async_pass in False, True: 448 if async_pass and not enable_async_sends: 449 continue 450 if not async_pass and not enable_sync_sends: 451 continue 452 pfx = 'async ' if async_pass else '' 453 sfx = '_async' if async_pass else '' 454 # awt = 'await ' if async_pass else '' 455 awt = '' 456 how = 'asynchronously' if async_pass else 'synchronously' 457 458 if len(msgtypes) == 1: 459 # Special case: with a single message types we don't 460 # use overloads. 461 msgtype = msgtypes[0] 462 msgtypevar = msgtype.__name__ 463 rtypes = msgtype.get_response_types() 464 if len(rtypes) > 1: 465 rtypevar = ' | '.join(_filt_tp_name(t) for t in rtypes) 466 else: 467 rtypevar = _filt_tp_name(rtypes[0]) 468 if async_pass: 469 rtypevar = f'Awaitable[{rtypevar}]' 470 out += ( 471 f'\n' 472 f' def send{sfx}(self,' 473 f' message: {msgtypevar})' 474 f' -> {rtypevar}:\n' 475 f' """Send a message {how}."""\n' 476 f' out = {awt}self._sender.' 477 f'send{sfx}(self._obj, message)\n' 478 ) 479 if not async_pass: 480 out += ( 481 f' assert isinstance(out, {rtypevar})\n' 482 ' return out\n' 483 ) 484 else: 485 out += f' return cast({rtypevar}, out)\n' 486 487 else: 488 for msgtype in msgtypes: 489 msgtypevar = msgtype.__name__ 490 rtypes = msgtype.get_response_types() 491 if len(rtypes) > 1: 492 rtypevar = ' | '.join( 493 _filt_tp_name(t) for t in rtypes 494 ) 495 else: 496 rtypevar = _filt_tp_name(rtypes[0]) 497 out += ( 498 f'\n' 499 f' @overload\n' 500 f' {pfx}def send{sfx}(self,' 501 f' message: {msgtypevar})' 502 f' -> {rtypevar}:\n' 503 f' ...\n' 504 ) 505 rtypevar = 'Response | None' 506 if async_pass: 507 rtypevar = f'Awaitable[{rtypevar}]' 508 out += ( 509 f'\n' 510 f' def send{sfx}(self, message: Message)' 511 f' -> {rtypevar}:\n' 512 f' """Send a message {how}."""\n' 513 f' return {awt}self._sender.' 514 f'send{sfx}(self._obj, message)\n' 515 ) 516 517 return out
Used by create_sender_module(); do not call directly.
519 def do_create_receiver_module( 520 self, 521 basename: str, 522 protocol_create_code: str, 523 is_async: bool, 524 private: bool = False, 525 protocol_module_level_import_code: str | None = None, 526 ) -> str: 527 """Used by create_receiver_module(); do not call directly.""" 528 # pylint: disable=too-many-locals 529 import textwrap 530 531 desc = 'asynchronous' if is_async else 'synchronous' 532 ppre = '_' if private else '' 533 msgtypes = list(self.message_ids_by_type.keys()) 534 out = self._get_module_header( 535 'receiver', 536 extra_import_code=protocol_module_level_import_code, 537 enable_async_sends=False, 538 ) 539 ccind = textwrap.indent(protocol_create_code, ' ') 540 out += ( 541 f'class {ppre}{basename}(MessageReceiver):\n' 542 f' """Protocol-specific {desc} receiver."""\n' 543 f'\n' 544 f' is_async = {is_async}\n' 545 f'\n' 546 f' def __init__(self) -> None:\n' 547 f'{ccind}\n' 548 f' super().__init__(protocol)\n' 549 f'\n' 550 f' def __get__(\n' 551 f' self,\n' 552 f' obj: Any,\n' 553 f' type_in: Any = None,\n' 554 f' ) -> {ppre}Bound{basename}:\n' 555 f' return {ppre}Bound{basename}(' 556 f'obj, self)\n' 557 ) 558 559 # Define handler() overloads for all registered message types. 560 561 def _filt_tp_name(rtype: type[Response] | None) -> str: 562 return 'None' if rtype is None else rtype.__name__ 563 564 if msgtypes: 565 cbgn = 'Awaitable[' if is_async else '' 566 cend = ']' if is_async else '' 567 if len(msgtypes) == 1: 568 # Special case: when we have a single message type we don't 569 # use overloads. 570 msgtype = msgtypes[0] 571 msgtypevar = msgtype.__name__ 572 rtypes = msgtype.get_response_types() 573 if len(rtypes) > 1: 574 rtypevar = ' | '.join(_filt_tp_name(t) for t in rtypes) 575 else: 576 rtypevar = _filt_tp_name(rtypes[0]) 577 rtypevar = f'{cbgn}{rtypevar}{cend}' 578 out += ( 579 f'\n' 580 f' def handler(\n' 581 f' self,\n' 582 f' call: Callable[[Any, {msgtypevar}], ' 583 f'{rtypevar}],\n' 584 f' )' 585 f' -> Callable[[Any, {msgtypevar}], {rtypevar}]:\n' 586 f' """Decorator to register message handlers."""\n' 587 f' from typing import cast, Callable, Any\n' 588 f'\n' 589 f' self.register_handler(cast(Callable' 590 f'[[Any, Message], Response], call))\n' 591 f' return call\n' 592 ) 593 else: 594 for msgtype in msgtypes: 595 msgtypevar = msgtype.__name__ 596 rtypes = msgtype.get_response_types() 597 if len(rtypes) > 1: 598 rtypevar = ' | '.join(_filt_tp_name(t) for t in rtypes) 599 else: 600 rtypevar = _filt_tp_name(rtypes[0]) 601 rtypevar = f'{cbgn}{rtypevar}{cend}' 602 out += ( 603 f'\n' 604 f' @overload\n' 605 f' def handler(\n' 606 f' self,\n' 607 f' call: Callable[[Any, {msgtypevar}], ' 608 f'{rtypevar}],\n' 609 f' )' 610 f' -> Callable[[Any, {msgtypevar}], {rtypevar}]:\n' 611 f' ...\n' 612 ) 613 out += ( 614 '\n' 615 ' def handler(self, call: Callable) -> Callable:\n' 616 ' """Decorator to register message handlers."""\n' 617 ' self.register_handler(call)\n' 618 ' return call\n' 619 ) 620 621 out += ( 622 f'\n' 623 f'\n' 624 f'class {ppre}Bound{basename}(BoundMessageReceiver):\n' 625 f' """Protocol-specific bound receiver."""\n' 626 ) 627 if is_async: 628 out += ( 629 '\n' 630 ' def handle_raw_message(\n' 631 ' self, message: str, raise_unregistered: bool = False\n' 632 ' ) -> Awaitable[str]:\n' 633 ' """Asynchronously handle a raw incoming message."""\n' 634 ' return self._receiver.' 635 'handle_raw_message_async(\n' 636 ' self._obj, message, raise_unregistered\n' 637 ' )\n' 638 ) 639 640 else: 641 out += ( 642 '\n' 643 ' def handle_raw_message(\n' 644 ' self, message: str, raise_unregistered: bool = False\n' 645 ' ) -> str:\n' 646 ' """Synchronously handle a raw incoming message."""\n' 647 ' return self._receiver.handle_raw_message(\n' 648 ' self._obj, message, raise_unregistered\n' 649 ' )\n' 650 ) 651 652 return out
Used by create_receiver_module(); do not call directly.
22class MessageSender: 23 """Facilitates sending messages to a target and receiving responses. 24 This is instantiated at the class level and used to register unbound 25 class methods to handle raw message sending. 26 27 Example: 28 29 class MyClass: 30 msg = MyMessageSender(some_protocol) 31 32 @msg.send_method 33 def send_raw_message(self, message: str) -> str: 34 # Actually send the message here. 35 36 # MyMessageSender class should provide overloads for send(), send_async(), 37 # etc. to ensure all sending happens with valid types. 38 obj = MyClass() 39 obj.msg.send(SomeMessageType()) 40 """ 41 42 def __init__(self, protocol: MessageProtocol) -> None: 43 self.protocol = protocol 44 self._send_raw_message_call: Callable[[Any, str], str] | None = None 45 self._send_async_raw_message_call: Callable[ 46 [Any, str], Awaitable[str] 47 ] | None = None 48 self._send_async_raw_message_ex_call: Callable[ 49 [Any, str, Message], Awaitable[str] 50 ] | None = None 51 self._encode_filter_call: Callable[ 52 [Any, Message, dict], None 53 ] | None = None 54 self._decode_filter_call: Callable[ 55 [Any, Message, dict, Response | SysResponse], None 56 ] | None = None 57 self._peer_desc_call: Callable[[Any], str] | None = None 58 59 def send_method( 60 self, call: Callable[[Any, str], str] 61 ) -> Callable[[Any, str], str]: 62 """Function decorator for setting raw send method. 63 64 Send methods take strings and should return strings. 65 CommunicationErrors raised here will be returned to the sender 66 as such; all other exceptions will result in a RuntimeError for 67 the sender. 68 """ 69 assert self._send_raw_message_call is None 70 self._send_raw_message_call = call 71 return call 72 73 def send_async_method( 74 self, call: Callable[[Any, str], Awaitable[str]] 75 ) -> Callable[[Any, str], Awaitable[str]]: 76 """Function decorator for setting raw send-async method. 77 78 Send methods take strings and should return strings. 79 CommunicationErrors raised here will be returned to the sender 80 as such; all other exceptions will result in a RuntimeError for 81 the sender. 82 83 IMPORTANT: Generally async send methods should not be implemented 84 as 'async' methods, but instead should be regular methods that 85 return awaitable objects. This way it can be guaranteed that 86 outgoing messages are synchronously enqueued in the correct 87 order, and then async calls can be returned which finish each 88 send. If the entire call is async, they may be enqueued out of 89 order in rare cases. 90 """ 91 assert self._send_async_raw_message_call is None 92 self._send_async_raw_message_call = call 93 return call 94 95 def send_async_ex_method( 96 self, call: Callable[[Any, str, Message], Awaitable[str]] 97 ) -> Callable[[Any, str, Message], Awaitable[str]]: 98 """Function decorator for extended send-async method. 99 100 Version of send_async_method which is also is passed the original 101 unencoded message; can be useful for cases where metadata is sent 102 along with messages referring to their payloads/etc. 103 """ 104 assert self._send_async_raw_message_ex_call is None 105 self._send_async_raw_message_ex_call = call 106 return call 107 108 def encode_filter_method( 109 self, call: Callable[[Any, Message, dict], None] 110 ) -> Callable[[Any, Message, dict], None]: 111 """Function decorator for defining an encode filter. 112 113 Encode filters can be used to add extra data to the message 114 dict before is is encoded to a string and sent out. 115 """ 116 assert self._encode_filter_call is None 117 self._encode_filter_call = call 118 return call 119 120 def decode_filter_method( 121 self, call: Callable[[Any, Message, dict, Response | SysResponse], None] 122 ) -> Callable[[Any, Message, dict, Response], None]: 123 """Function decorator for defining a decode filter. 124 125 Decode filters can be used to extract extra data from incoming 126 message dicts. 127 """ 128 assert self._decode_filter_call is None 129 self._decode_filter_call = call 130 return call 131 132 def peer_desc_method( 133 self, call: Callable[[Any], str] 134 ) -> Callable[[Any], str]: 135 """Function decorator for defining peer descriptions. 136 137 These are included in error messages or other diagnostics. 138 """ 139 assert self._peer_desc_call is None 140 self._peer_desc_call = call 141 return call 142 143 def send(self, bound_obj: Any, message: Message) -> Response | None: 144 """Send a message synchronously.""" 145 return self.unpack_raw_response( 146 bound_obj=bound_obj, 147 message=message, 148 raw_response=self.fetch_raw_response( 149 bound_obj=bound_obj, 150 message=message, 151 ), 152 ) 153 154 def send_async( 155 self, bound_obj: Any, message: Message 156 ) -> Awaitable[Response | None]: 157 """Send a message asynchronously.""" 158 159 # Note: This call is synchronous so that the first part of it can 160 # happen synchronously. If the whole call were async we wouldn't be 161 # able to guarantee that messages sent in order would actually go 162 # out in order. 163 raw_response_awaitable = self.fetch_raw_response_async( 164 bound_obj=bound_obj, 165 message=message, 166 ) 167 # Now return an awaitable that will finish the send. 168 return self._send_async_awaitable( 169 bound_obj, message, raw_response_awaitable 170 ) 171 172 async def _send_async_awaitable( 173 self, 174 bound_obj: Any, 175 message: Message, 176 raw_response_awaitable: Awaitable[Response | SysResponse], 177 ) -> Response | None: 178 return self.unpack_raw_response( 179 bound_obj=bound_obj, 180 message=message, 181 raw_response=await raw_response_awaitable, 182 ) 183 184 def fetch_raw_response( 185 self, bound_obj: Any, message: Message 186 ) -> Response | SysResponse: 187 """Send a message synchronously. 188 189 Generally you can just call send(); these split versions are 190 for when message sending and response handling need to happen 191 in different contexts/threads. 192 """ 193 if self._send_raw_message_call is None: 194 raise RuntimeError('send() is unimplemented for this type.') 195 196 msg_encoded = self._encode_message(bound_obj, message) 197 try: 198 response_encoded = self._send_raw_message_call( 199 bound_obj, msg_encoded 200 ) 201 except Exception as exc: 202 response = ErrorSysResponse( 203 error_message='Error in MessageSender @send_method.', 204 error_type=( 205 ErrorSysResponse.ErrorType.COMMUNICATION 206 if isinstance(exc, CommunicationError) 207 else ErrorSysResponse.ErrorType.LOCAL 208 ), 209 ) 210 # Can include the actual exception since we'll be looking at 211 # this locally; might be helpful. 212 response.set_local_exception(exc) 213 return response 214 return self._decode_raw_response(bound_obj, message, response_encoded) 215 216 def fetch_raw_response_async( 217 self, bound_obj: Any, message: Message 218 ) -> Awaitable[Response | SysResponse]: 219 """Fetch a raw message response awaitable. 220 221 The result of this should be awaited and then passed to 222 unpack_raw_response() to produce the final message result. 223 224 Generally you can just call send(); calling fetch and unpack 225 manually is for when message sending and response handling need 226 to happen in different contexts/threads. 227 """ 228 229 # Note: This call is synchronous so that the first part of it can 230 # happen synchronously. If the whole call were async we wouldn't be 231 # able to guarantee that messages sent in order would actually go 232 # out in order. 233 if ( 234 self._send_async_raw_message_call is None 235 and self._send_async_raw_message_ex_call is None 236 ): 237 raise RuntimeError('send_async() is unimplemented for this type.') 238 239 msg_encoded = self._encode_message(bound_obj, message) 240 try: 241 if self._send_async_raw_message_ex_call is not None: 242 send_awaitable = self._send_async_raw_message_ex_call( 243 bound_obj, msg_encoded, message 244 ) 245 else: 246 assert self._send_async_raw_message_call is not None 247 send_awaitable = self._send_async_raw_message_call( 248 bound_obj, msg_encoded 249 ) 250 except Exception as exc: 251 return self._error_awaitable(exc) 252 253 # Now return an awaitable to finish the job. 254 return self._fetch_raw_response_awaitable( 255 bound_obj, message, send_awaitable 256 ) 257 258 async def _error_awaitable(self, exc: Exception) -> SysResponse: 259 response = ErrorSysResponse( 260 error_message='Error in MessageSender @send_async_method.', 261 error_type=( 262 ErrorSysResponse.ErrorType.COMMUNICATION 263 if isinstance(exc, CommunicationError) 264 else ErrorSysResponse.ErrorType.LOCAL 265 ), 266 ) 267 # Can include the actual exception since we'll be looking at 268 # this locally; might be helpful. 269 response.set_local_exception(exc) 270 return response 271 272 async def _fetch_raw_response_awaitable( 273 self, bound_obj: Any, message: Message, send_awaitable: Awaitable[str] 274 ) -> Response | SysResponse: 275 try: 276 response_encoded = await send_awaitable 277 except Exception as exc: 278 response = ErrorSysResponse( 279 error_message='Error in MessageSender @send_async_method.', 280 error_type=( 281 ErrorSysResponse.ErrorType.COMMUNICATION 282 if isinstance(exc, CommunicationError) 283 else ErrorSysResponse.ErrorType.LOCAL 284 ), 285 ) 286 # Can include the actual exception since we'll be looking at 287 # this locally; might be helpful. 288 response.set_local_exception(exc) 289 return response 290 return self._decode_raw_response(bound_obj, message, response_encoded) 291 292 def unpack_raw_response( 293 self, 294 bound_obj: Any, 295 message: Message, 296 raw_response: Response | SysResponse, 297 ) -> Response | None: 298 """Convert a raw fetched response into a final response/error/etc. 299 300 Generally you can just call send(); calling fetch and unpack 301 manually is for when message sending and response handling need 302 to happen in different contexts/threads. 303 """ 304 response = self._unpack_raw_response(bound_obj, raw_response) 305 assert ( 306 response is None 307 or type(response) in type(message).get_response_types() 308 ) 309 return response 310 311 def _encode_message(self, bound_obj: Any, message: Message) -> str: 312 """Encode a message for sending.""" 313 msg_dict = self.protocol.message_to_dict(message) 314 if self._encode_filter_call is not None: 315 self._encode_filter_call(bound_obj, message, msg_dict) 316 return self.protocol.encode_dict(msg_dict) 317 318 def _decode_raw_response( 319 self, bound_obj: Any, message: Message, response_encoded: str 320 ) -> Response | SysResponse: 321 """Create a Response from returned data. 322 323 These Responses may encapsulate things like remote errors and 324 should not be handed directly to users. _unpack_raw_response() 325 should be used to translate to special values like None or raise 326 Exceptions. This function itself should never raise Exceptions. 327 """ 328 response: Response | SysResponse 329 try: 330 response_dict = self.protocol.decode_dict(response_encoded) 331 response = self.protocol.response_from_dict(response_dict) 332 if self._decode_filter_call is not None: 333 self._decode_filter_call( 334 bound_obj, message, response_dict, response 335 ) 336 except Exception as exc: 337 response = ErrorSysResponse( 338 error_message='Error decoding raw response.', 339 error_type=ErrorSysResponse.ErrorType.LOCAL, 340 ) 341 # Since we'll be looking at this locally, we can include 342 # extra info for logging/etc. 343 response.set_local_exception(exc) 344 return response 345 346 def _unpack_raw_response( 347 self, bound_obj: Any, raw_response: Response | SysResponse 348 ) -> Response | None: 349 """Given a raw Response, unpacks to special values or Exceptions. 350 351 The result of this call is what should be passed to users. 352 For complex messaging situations such as response callbacks 353 operating across different threads, this last stage should be 354 run such that any raised Exception is active when the callback 355 fires; not on the thread where the message was sent. 356 """ 357 # EmptySysResponse translates to None 358 if isinstance(raw_response, EmptySysResponse): 359 return None 360 361 # Some error occurred. Raise a local Exception for it. 362 if isinstance(raw_response, ErrorSysResponse): 363 # Errors that happened locally can attach their exceptions 364 # here for extra logging goodness. 365 local_exception = raw_response.get_local_exception() 366 367 if ( 368 raw_response.error_type 369 is ErrorSysResponse.ErrorType.COMMUNICATION 370 ): 371 raise CommunicationError( 372 raw_response.error_message 373 ) from local_exception 374 375 # If something went wrong on *our* end of the connection, 376 # don't say it was a remote error. 377 if raw_response.error_type is ErrorSysResponse.ErrorType.LOCAL: 378 raise RuntimeError( 379 raw_response.error_message 380 ) from local_exception 381 382 # If they want to support clean errors, do those. 383 if ( 384 self.protocol.forward_clean_errors 385 and raw_response.error_type 386 is ErrorSysResponse.ErrorType.REMOTE_CLEAN 387 ): 388 raise CleanError( 389 raw_response.error_message 390 ) from local_exception 391 392 if ( 393 self.protocol.forward_communication_errors 394 and raw_response.error_type 395 is ErrorSysResponse.ErrorType.REMOTE_COMMUNICATION 396 ): 397 raise CommunicationError( 398 raw_response.error_message 399 ) from local_exception 400 401 # Everything else gets lumped in as a remote error. 402 raise RemoteError( 403 raw_response.error_message, 404 peer_desc=( 405 'peer' 406 if self._peer_desc_call is None 407 else self._peer_desc_call(bound_obj) 408 ), 409 ) from local_exception 410 411 assert isinstance(raw_response, Response) 412 return raw_response
Facilitates sending messages to a target and receiving responses. This is instantiated at the class level and used to register unbound class methods to handle raw message sending.
Example:
class MyClass: msg = MyMessageSender(some_protocol)
@msg.send_method
def send_raw_message(self, message: str) -> str:
# Actually send the message here.
MyMessageSender class should provide overloads for send(), send_async(),
etc. to ensure all sending happens with valid types.
obj = MyClass() obj.msg.send(SomeMessageType())
42 def __init__(self, protocol: MessageProtocol) -> None: 43 self.protocol = protocol 44 self._send_raw_message_call: Callable[[Any, str], str] | None = None 45 self._send_async_raw_message_call: Callable[ 46 [Any, str], Awaitable[str] 47 ] | None = None 48 self._send_async_raw_message_ex_call: Callable[ 49 [Any, str, Message], Awaitable[str] 50 ] | None = None 51 self._encode_filter_call: Callable[ 52 [Any, Message, dict], None 53 ] | None = None 54 self._decode_filter_call: Callable[ 55 [Any, Message, dict, Response | SysResponse], None 56 ] | None = None 57 self._peer_desc_call: Callable[[Any], str] | None = None
59 def send_method( 60 self, call: Callable[[Any, str], str] 61 ) -> Callable[[Any, str], str]: 62 """Function decorator for setting raw send method. 63 64 Send methods take strings and should return strings. 65 CommunicationErrors raised here will be returned to the sender 66 as such; all other exceptions will result in a RuntimeError for 67 the sender. 68 """ 69 assert self._send_raw_message_call is None 70 self._send_raw_message_call = call 71 return call
Function decorator for setting raw send method.
Send methods take strings and should return strings. CommunicationErrors raised here will be returned to the sender as such; all other exceptions will result in a RuntimeError for the sender.
73 def send_async_method( 74 self, call: Callable[[Any, str], Awaitable[str]] 75 ) -> Callable[[Any, str], Awaitable[str]]: 76 """Function decorator for setting raw send-async method. 77 78 Send methods take strings and should return strings. 79 CommunicationErrors raised here will be returned to the sender 80 as such; all other exceptions will result in a RuntimeError for 81 the sender. 82 83 IMPORTANT: Generally async send methods should not be implemented 84 as 'async' methods, but instead should be regular methods that 85 return awaitable objects. This way it can be guaranteed that 86 outgoing messages are synchronously enqueued in the correct 87 order, and then async calls can be returned which finish each 88 send. If the entire call is async, they may be enqueued out of 89 order in rare cases. 90 """ 91 assert self._send_async_raw_message_call is None 92 self._send_async_raw_message_call = call 93 return call
Function decorator for setting raw send-async method.
Send methods take strings and should return strings. CommunicationErrors raised here will be returned to the sender as such; all other exceptions will result in a RuntimeError for the sender.
IMPORTANT: Generally async send methods should not be implemented as 'async' methods, but instead should be regular methods that return awaitable objects. This way it can be guaranteed that outgoing messages are synchronously enqueued in the correct order, and then async calls can be returned which finish each send. If the entire call is async, they may be enqueued out of order in rare cases.
95 def send_async_ex_method( 96 self, call: Callable[[Any, str, Message], Awaitable[str]] 97 ) -> Callable[[Any, str, Message], Awaitable[str]]: 98 """Function decorator for extended send-async method. 99 100 Version of send_async_method which is also is passed the original 101 unencoded message; can be useful for cases where metadata is sent 102 along with messages referring to their payloads/etc. 103 """ 104 assert self._send_async_raw_message_ex_call is None 105 self._send_async_raw_message_ex_call = call 106 return call
Function decorator for extended send-async method.
Version of send_async_method which is also is passed the original unencoded message; can be useful for cases where metadata is sent along with messages referring to their payloads/etc.
108 def encode_filter_method( 109 self, call: Callable[[Any, Message, dict], None] 110 ) -> Callable[[Any, Message, dict], None]: 111 """Function decorator for defining an encode filter. 112 113 Encode filters can be used to add extra data to the message 114 dict before is is encoded to a string and sent out. 115 """ 116 assert self._encode_filter_call is None 117 self._encode_filter_call = call 118 return call
Function decorator for defining an encode filter.
Encode filters can be used to add extra data to the message dict before is is encoded to a string and sent out.
120 def decode_filter_method( 121 self, call: Callable[[Any, Message, dict, Response | SysResponse], None] 122 ) -> Callable[[Any, Message, dict, Response], None]: 123 """Function decorator for defining a decode filter. 124 125 Decode filters can be used to extract extra data from incoming 126 message dicts. 127 """ 128 assert self._decode_filter_call is None 129 self._decode_filter_call = call 130 return call
Function decorator for defining a decode filter.
Decode filters can be used to extract extra data from incoming message dicts.
132 def peer_desc_method( 133 self, call: Callable[[Any], str] 134 ) -> Callable[[Any], str]: 135 """Function decorator for defining peer descriptions. 136 137 These are included in error messages or other diagnostics. 138 """ 139 assert self._peer_desc_call is None 140 self._peer_desc_call = call 141 return call
Function decorator for defining peer descriptions.
These are included in error messages or other diagnostics.
143 def send(self, bound_obj: Any, message: Message) -> Response | None: 144 """Send a message synchronously.""" 145 return self.unpack_raw_response( 146 bound_obj=bound_obj, 147 message=message, 148 raw_response=self.fetch_raw_response( 149 bound_obj=bound_obj, 150 message=message, 151 ), 152 )
Send a message synchronously.
154 def send_async( 155 self, bound_obj: Any, message: Message 156 ) -> Awaitable[Response | None]: 157 """Send a message asynchronously.""" 158 159 # Note: This call is synchronous so that the first part of it can 160 # happen synchronously. If the whole call were async we wouldn't be 161 # able to guarantee that messages sent in order would actually go 162 # out in order. 163 raw_response_awaitable = self.fetch_raw_response_async( 164 bound_obj=bound_obj, 165 message=message, 166 ) 167 # Now return an awaitable that will finish the send. 168 return self._send_async_awaitable( 169 bound_obj, message, raw_response_awaitable 170 )
Send a message asynchronously.
184 def fetch_raw_response( 185 self, bound_obj: Any, message: Message 186 ) -> Response | SysResponse: 187 """Send a message synchronously. 188 189 Generally you can just call send(); these split versions are 190 for when message sending and response handling need to happen 191 in different contexts/threads. 192 """ 193 if self._send_raw_message_call is None: 194 raise RuntimeError('send() is unimplemented for this type.') 195 196 msg_encoded = self._encode_message(bound_obj, message) 197 try: 198 response_encoded = self._send_raw_message_call( 199 bound_obj, msg_encoded 200 ) 201 except Exception as exc: 202 response = ErrorSysResponse( 203 error_message='Error in MessageSender @send_method.', 204 error_type=( 205 ErrorSysResponse.ErrorType.COMMUNICATION 206 if isinstance(exc, CommunicationError) 207 else ErrorSysResponse.ErrorType.LOCAL 208 ), 209 ) 210 # Can include the actual exception since we'll be looking at 211 # this locally; might be helpful. 212 response.set_local_exception(exc) 213 return response 214 return self._decode_raw_response(bound_obj, message, response_encoded)
Send a message synchronously.
Generally you can just call send(); these split versions are for when message sending and response handling need to happen in different contexts/threads.
216 def fetch_raw_response_async( 217 self, bound_obj: Any, message: Message 218 ) -> Awaitable[Response | SysResponse]: 219 """Fetch a raw message response awaitable. 220 221 The result of this should be awaited and then passed to 222 unpack_raw_response() to produce the final message result. 223 224 Generally you can just call send(); calling fetch and unpack 225 manually is for when message sending and response handling need 226 to happen in different contexts/threads. 227 """ 228 229 # Note: This call is synchronous so that the first part of it can 230 # happen synchronously. If the whole call were async we wouldn't be 231 # able to guarantee that messages sent in order would actually go 232 # out in order. 233 if ( 234 self._send_async_raw_message_call is None 235 and self._send_async_raw_message_ex_call is None 236 ): 237 raise RuntimeError('send_async() is unimplemented for this type.') 238 239 msg_encoded = self._encode_message(bound_obj, message) 240 try: 241 if self._send_async_raw_message_ex_call is not None: 242 send_awaitable = self._send_async_raw_message_ex_call( 243 bound_obj, msg_encoded, message 244 ) 245 else: 246 assert self._send_async_raw_message_call is not None 247 send_awaitable = self._send_async_raw_message_call( 248 bound_obj, msg_encoded 249 ) 250 except Exception as exc: 251 return self._error_awaitable(exc) 252 253 # Now return an awaitable to finish the job. 254 return self._fetch_raw_response_awaitable( 255 bound_obj, message, send_awaitable 256 )
Fetch a raw message response awaitable.
The result of this should be awaited and then passed to unpack_raw_response() to produce the final message result.
Generally you can just call send(); calling fetch and unpack manually is for when message sending and response handling need to happen in different contexts/threads.
292 def unpack_raw_response( 293 self, 294 bound_obj: Any, 295 message: Message, 296 raw_response: Response | SysResponse, 297 ) -> Response | None: 298 """Convert a raw fetched response into a final response/error/etc. 299 300 Generally you can just call send(); calling fetch and unpack 301 manually is for when message sending and response handling need 302 to happen in different contexts/threads. 303 """ 304 response = self._unpack_raw_response(bound_obj, raw_response) 305 assert ( 306 response is None 307 or type(response) in type(message).get_response_types() 308 ) 309 return response
Convert a raw fetched response into a final response/error/etc.
Generally you can just call send(); calling fetch and unpack manually is for when message sending and response handling need to happen in different contexts/threads.
415class BoundMessageSender: 416 """Base class for bound senders.""" 417 418 def __init__(self, obj: Any, sender: MessageSender) -> None: 419 # Note: not checking obj here since we want to support 420 # at least our protocol property when accessed via type. 421 self._obj = obj 422 self._sender = sender 423 424 @property 425 def protocol(self) -> MessageProtocol: 426 """Protocol associated with this sender.""" 427 return self._sender.protocol 428 429 def send_untyped(self, message: Message) -> Response | None: 430 """Send a message synchronously. 431 432 Whenever possible, use the send() call provided by generated 433 subclasses instead of this; it will provide better type safety. 434 """ 435 assert self._obj is not None 436 return self._sender.send(bound_obj=self._obj, message=message) 437 438 def send_async_untyped( 439 self, message: Message 440 ) -> Awaitable[Response | None]: 441 """Send a message asynchronously. 442 443 Whenever possible, use the send_async() call provided by generated 444 subclasses instead of this; it will provide better type safety. 445 """ 446 assert self._obj is not None 447 return self._sender.send_async(bound_obj=self._obj, message=message) 448 449 def fetch_raw_response_async_untyped( 450 self, message: Message 451 ) -> Awaitable[Response | SysResponse]: 452 """Split send (part 1 of 2).""" 453 assert self._obj is not None 454 return self._sender.fetch_raw_response_async( 455 bound_obj=self._obj, message=message 456 ) 457 458 def unpack_raw_response_untyped( 459 self, message: Message, raw_response: Response | SysResponse 460 ) -> Response | None: 461 """Split send (part 2 of 2).""" 462 return self._sender.unpack_raw_response( 463 bound_obj=self._obj, message=message, raw_response=raw_response 464 )
Base class for bound senders.
429 def send_untyped(self, message: Message) -> Response | None: 430 """Send a message synchronously. 431 432 Whenever possible, use the send() call provided by generated 433 subclasses instead of this; it will provide better type safety. 434 """ 435 assert self._obj is not None 436 return self._sender.send(bound_obj=self._obj, message=message)
Send a message synchronously.
Whenever possible, use the send() call provided by generated subclasses instead of this; it will provide better type safety.
438 def send_async_untyped( 439 self, message: Message 440 ) -> Awaitable[Response | None]: 441 """Send a message asynchronously. 442 443 Whenever possible, use the send_async() call provided by generated 444 subclasses instead of this; it will provide better type safety. 445 """ 446 assert self._obj is not None 447 return self._sender.send_async(bound_obj=self._obj, message=message)
Send a message asynchronously.
Whenever possible, use the send_async() call provided by generated subclasses instead of this; it will provide better type safety.
449 def fetch_raw_response_async_untyped( 450 self, message: Message 451 ) -> Awaitable[Response | SysResponse]: 452 """Split send (part 1 of 2).""" 453 assert self._obj is not None 454 return self._sender.fetch_raw_response_async( 455 bound_obj=self._obj, message=message 456 )
Split send (part 1 of 2).
458 def unpack_raw_response_untyped( 459 self, message: Message, raw_response: Response | SysResponse 460 ) -> Response | None: 461 """Split send (part 2 of 2).""" 462 return self._sender.unpack_raw_response( 463 bound_obj=self._obj, message=message, raw_response=raw_response 464 )
Split send (part 2 of 2).
29class MessageReceiver: 30 """Facilitates receiving & responding to messages from a remote source. 31 32 This is instantiated at the class level with unbound methods registered 33 as handlers for different message types in the protocol. 34 35 Example: 36 37 class MyClass: 38 receiver = MyMessageReceiver() 39 40 # MyMessageReceiver fills out handler() overloads to ensure all 41 # registered handlers have valid types/return-types. 42 @receiver.handler 43 def handle_some_message_type(self, message: SomeMsg) -> SomeResponse: 44 # Deal with this message type here. 45 46 # This will trigger the registered handler being called. 47 obj = MyClass() 48 obj.receiver.handle_raw_message(some_raw_data) 49 50 Any unhandled Exception occurring during message handling will result in 51 an Exception being raised on the sending end. 52 """ 53 54 is_async = False 55 56 def __init__(self, protocol: MessageProtocol) -> None: 57 self.protocol = protocol 58 self._handlers: dict[type[Message], Callable] = {} 59 self._decode_filter_call: Callable[ 60 [Any, dict, Message], None 61 ] | None = None 62 self._encode_filter_call: Callable[ 63 [Any, Message | None, Response | SysResponse, dict], None 64 ] | None = None 65 66 # noinspection PyProtectedMember 67 def register_handler( 68 self, call: Callable[[Any, Message], Response | None] 69 ) -> None: 70 """Register a handler call. 71 72 The message type handled by the call is determined by its 73 type annotation. 74 """ 75 # TODO: can use types.GenericAlias in 3.9. 76 # (hmm though now that we're there, it seems a drop-in 77 # replace gives us errors. Should re-test in 3.11 as it seems 78 # that typing_extensions handles it differently in that case) 79 from typing import _GenericAlias # type: ignore 80 from typing import get_type_hints, get_args 81 82 sig = inspect.getfullargspec(call) 83 84 # The provided callable should be a method taking one 'msg' arg. 85 expectedsig = ['self', 'msg'] 86 if sig.args != expectedsig: 87 raise ValueError( 88 f'Expected callable signature of {expectedsig};' 89 f' got {sig.args}' 90 ) 91 92 # Make sure we are only given async methods if we are an async handler 93 # and sync ones otherwise. 94 # UPDATE - can't do this anymore since we now sometimes use 95 # regular functions which return awaitables instead of having 96 # the entire function be async. 97 # is_async = inspect.iscoroutinefunction(call) 98 # if self.is_async != is_async: 99 # msg = ( 100 # 'Expected a sync method; found an async one.' 101 # if is_async 102 # else 'Expected an async method; found a sync one.' 103 # ) 104 # raise ValueError(msg) 105 106 # Check annotation types to determine what message types we handle. 107 # Return-type annotation can be a Union, but we probably don't 108 # have it available at runtime. Explicitly pull it in. 109 # UPDATE: we've updated our pylint filter to where we should 110 # have all annotations available. 111 # anns = get_type_hints(call, localns={'Union': Union}) 112 anns = get_type_hints(call) 113 114 msgtype = anns.get('msg') 115 if not isinstance(msgtype, type): 116 raise TypeError( 117 f'expected a type for "msg" annotation; got {type(msgtype)}.' 118 ) 119 assert issubclass(msgtype, Message) 120 121 ret = anns.get('return') 122 responsetypes: tuple[type[Any] | None, ...] 123 124 # Return types can be a single type or a union of types. 125 if isinstance(ret, (_GenericAlias, types.UnionType)): 126 targs = get_args(ret) 127 if not all(isinstance(a, (type, type(None))) for a in targs): 128 raise TypeError( 129 f'expected only types for "return" annotation;' 130 f' got {targs}.' 131 ) 132 responsetypes = targs 133 else: 134 if not isinstance(ret, (type, type(None))): 135 raise TypeError( 136 f'expected one or more types for' 137 f' "return" annotation; got a {type(ret)}.' 138 ) 139 # This seems like maybe a mypy bug. Appeared after adding 140 # types.UnionType above. 141 responsetypes = (ret,) 142 143 # This will contain NoneType for empty return cases, but 144 # we expect it to be None. 145 # noinspection PyPep8 146 responsetypes = tuple( 147 None if r is type(None) else r for r in responsetypes 148 ) 149 150 # Make sure our protocol has this message type registered and our 151 # return types exactly match. (Technically we could return a subset 152 # of the supported types; can allow this in the future if it makes 153 # sense). 154 registered_types = self.protocol.message_ids_by_type.keys() 155 156 if msgtype not in registered_types: 157 raise TypeError( 158 f'Message type {msgtype} is not registered' 159 f' in this Protocol.' 160 ) 161 162 if msgtype in self._handlers: 163 raise TypeError( 164 f'Message type {msgtype} already has a registered' f' handler.' 165 ) 166 167 # Make sure the responses exactly matches what the message expects. 168 if set(responsetypes) != set(msgtype.get_response_types()): 169 raise TypeError( 170 f'Provided response types {responsetypes} do not' 171 f' match the set expected by message type {msgtype}: ' 172 f'({msgtype.get_response_types()})' 173 ) 174 175 # Ok; we're good! 176 self._handlers[msgtype] = call 177 178 def decode_filter_method( 179 self, call: Callable[[Any, dict, Message], None] 180 ) -> Callable[[Any, dict, Message], None]: 181 """Function decorator for defining a decode filter. 182 183 Decode filters can be used to extract extra data from incoming 184 message dicts. This version will work for both handle_raw_message() 185 and handle_raw_message_async() 186 """ 187 assert self._decode_filter_call is None 188 self._decode_filter_call = call 189 return call 190 191 def encode_filter_method( 192 self, 193 call: Callable[ 194 [Any, Message | None, Response | SysResponse, dict], None 195 ], 196 ) -> Callable[[Any, Message | None, Response, dict], None]: 197 """Function decorator for defining an encode filter. 198 199 Encode filters can be used to add extra data to the message 200 dict before is is encoded to a string and sent out. 201 """ 202 assert self._encode_filter_call is None 203 self._encode_filter_call = call 204 return call 205 206 def validate(self, log_only: bool = False) -> None: 207 """Check for handler completeness, valid types, etc.""" 208 for msgtype in self.protocol.message_ids_by_type.keys(): 209 if issubclass(msgtype, Response): 210 continue 211 if msgtype not in self._handlers: 212 msg = ( 213 f'Protocol message type {msgtype} is not handled' 214 f' by receiver type {type(self)}.' 215 ) 216 if log_only: 217 logging.error(msg) 218 else: 219 raise TypeError(msg) 220 221 def _decode_incoming_message_base( 222 self, bound_obj: Any, msg: str 223 ) -> tuple[Any, dict, Message]: 224 # Decode the incoming message. 225 msg_dict = self.protocol.decode_dict(msg) 226 msg_decoded = self.protocol.message_from_dict(msg_dict) 227 assert isinstance(msg_decoded, Message) 228 if self._decode_filter_call is not None: 229 self._decode_filter_call(bound_obj, msg_dict, msg_decoded) 230 return bound_obj, msg_dict, msg_decoded 231 232 def _decode_incoming_message(self, bound_obj: Any, msg: str) -> Message: 233 bound_obj, _msg_dict, msg_decoded = self._decode_incoming_message_base( 234 bound_obj=bound_obj, msg=msg 235 ) 236 return msg_decoded 237 238 def encode_user_response( 239 self, bound_obj: Any, message: Message, response: Response | None 240 ) -> str: 241 """Encode a response provided by the user for sending.""" 242 243 assert isinstance(response, Response | None) 244 # (user should never explicitly return error-responses) 245 assert ( 246 response is None or type(response) in message.get_response_types() 247 ) 248 249 # A return value of None equals EmptySysResponse. 250 out_response: Response | SysResponse 251 if response is None: 252 out_response = EmptySysResponse() 253 else: 254 out_response = response 255 256 response_dict = self.protocol.response_to_dict(out_response) 257 if self._encode_filter_call is not None: 258 self._encode_filter_call( 259 bound_obj, message, out_response, response_dict 260 ) 261 return self.protocol.encode_dict(response_dict) 262 263 def encode_error_response( 264 self, bound_obj: Any, message: Message | None, exc: Exception 265 ) -> tuple[str, bool]: 266 """Given an error, return sysresponse str and whether to log.""" 267 response, dolog = self.protocol.error_to_response(exc) 268 response_dict = self.protocol.response_to_dict(response) 269 if self._encode_filter_call is not None: 270 self._encode_filter_call( 271 bound_obj, message, response, response_dict 272 ) 273 return self.protocol.encode_dict(response_dict), dolog 274 275 def handle_raw_message( 276 self, bound_obj: Any, msg: str, raise_unregistered: bool = False 277 ) -> str: 278 """Decode, handle, and return an response for a message. 279 280 if 'raise_unregistered' is True, will raise an 281 efro.message.UnregisteredMessageIDError for messages not handled by 282 the protocol. In all other cases local errors will translate to 283 error responses returned to the sender. 284 """ 285 assert not self.is_async, "can't call sync handler on async receiver" 286 msg_decoded: Message | None = None 287 msgtype: type[Message] | None = None 288 try: 289 msg_decoded = self._decode_incoming_message(bound_obj, msg) 290 msgtype = type(msg_decoded) 291 handler = self._handlers.get(msgtype) 292 if handler is None: 293 raise RuntimeError(f'Got unhandled message type: {msgtype}.') 294 response = handler(bound_obj, msg_decoded) 295 assert isinstance(response, Response | None) 296 return self.encode_user_response(bound_obj, msg_decoded, response) 297 298 except Exception as exc: 299 if raise_unregistered and isinstance( 300 exc, UnregisteredMessageIDError 301 ): 302 raise 303 rstr, dolog = self.encode_error_response( 304 bound_obj, msg_decoded, exc 305 ) 306 if dolog: 307 if msgtype is not None: 308 logging.exception( 309 'Error handling %s.%s message.', 310 msgtype.__module__, 311 msgtype.__qualname__, 312 ) 313 else: 314 logging.exception('Error in efro.message handling.') 315 return rstr 316 317 def handle_raw_message_async( 318 self, bound_obj: Any, msg: str, raise_unregistered: bool = False 319 ) -> Awaitable[str]: 320 """Should be called when the receiver gets a message. 321 322 The return value is the raw response to the message. 323 """ 324 325 # Note: This call is synchronous so that the first part of it can 326 # happen synchronously. If the whole call were async we wouldn't be 327 # able to guarantee that messages handlers would be called in the 328 # order the messages were received. 329 330 assert self.is_async, "can't call async handler on sync receiver" 331 msg_decoded: Message | None = None 332 msgtype: type[Message] | None = None 333 try: 334 msg_decoded = self._decode_incoming_message(bound_obj, msg) 335 msgtype = type(msg_decoded) 336 handler = self._handlers.get(msgtype) 337 if handler is None: 338 raise RuntimeError(f'Got unhandled message type: {msgtype}.') 339 handler_awaitable = handler(bound_obj, msg_decoded) 340 341 except Exception as exc: 342 if raise_unregistered and isinstance( 343 exc, UnregisteredMessageIDError 344 ): 345 raise 346 return self._handle_raw_message_async_error( 347 bound_obj, msg_decoded, msgtype, exc 348 ) 349 350 # Return an awaitable to handle the rest asynchronously. 351 return self._handle_raw_message_async( 352 bound_obj, msg_decoded, msgtype, handler_awaitable 353 ) 354 355 async def _handle_raw_message_async_error( 356 self, 357 bound_obj: Any, 358 msg_decoded: Message | None, 359 msgtype: type[Message] | None, 360 exc: Exception, 361 ) -> str: 362 rstr, dolog = self.encode_error_response(bound_obj, msg_decoded, exc) 363 if dolog: 364 if msgtype is not None: 365 logging.exception( 366 'Error handling %s.%s message.', 367 msgtype.__module__, 368 msgtype.__qualname__, 369 ) 370 else: 371 logging.exception('Error in efro.message handling.') 372 return rstr 373 374 async def _handle_raw_message_async( 375 self, 376 bound_obj: Any, 377 msg_decoded: Message, 378 msgtype: type[Message] | None, 379 handler_awaitable: Awaitable[Response | None], 380 ) -> str: 381 """Should be called when the receiver gets a message. 382 383 The return value is the raw response to the message. 384 """ 385 try: 386 response = await handler_awaitable 387 assert isinstance(response, Response | None) 388 return self.encode_user_response(bound_obj, msg_decoded, response) 389 390 except Exception as exc: 391 return await self._handle_raw_message_async_error( 392 bound_obj, msg_decoded, msgtype, exc 393 )
Facilitates receiving & responding to messages from a remote source.
This is instantiated at the class level with unbound methods registered as handlers for different message types in the protocol.
Example:
class MyClass: receiver = MyMessageReceiver()
# MyMessageReceiver fills out handler() overloads to ensure all
# registered handlers have valid types/return-types.
@receiver.handler
def handle_some_message_type(self, message: SomeMsg) -> SomeResponse:
# Deal with this message type here.
This will trigger the registered handler being called.
obj = MyClass() obj.receiver.handle_raw_message(some_raw_data)
Any unhandled Exception occurring during message handling will result in an Exception being raised on the sending end.
56 def __init__(self, protocol: MessageProtocol) -> None: 57 self.protocol = protocol 58 self._handlers: dict[type[Message], Callable] = {} 59 self._decode_filter_call: Callable[ 60 [Any, dict, Message], None 61 ] | None = None 62 self._encode_filter_call: Callable[ 63 [Any, Message | None, Response | SysResponse, dict], None 64 ] | None = None
67 def register_handler( 68 self, call: Callable[[Any, Message], Response | None] 69 ) -> None: 70 """Register a handler call. 71 72 The message type handled by the call is determined by its 73 type annotation. 74 """ 75 # TODO: can use types.GenericAlias in 3.9. 76 # (hmm though now that we're there, it seems a drop-in 77 # replace gives us errors. Should re-test in 3.11 as it seems 78 # that typing_extensions handles it differently in that case) 79 from typing import _GenericAlias # type: ignore 80 from typing import get_type_hints, get_args 81 82 sig = inspect.getfullargspec(call) 83 84 # The provided callable should be a method taking one 'msg' arg. 85 expectedsig = ['self', 'msg'] 86 if sig.args != expectedsig: 87 raise ValueError( 88 f'Expected callable signature of {expectedsig};' 89 f' got {sig.args}' 90 ) 91 92 # Make sure we are only given async methods if we are an async handler 93 # and sync ones otherwise. 94 # UPDATE - can't do this anymore since we now sometimes use 95 # regular functions which return awaitables instead of having 96 # the entire function be async. 97 # is_async = inspect.iscoroutinefunction(call) 98 # if self.is_async != is_async: 99 # msg = ( 100 # 'Expected a sync method; found an async one.' 101 # if is_async 102 # else 'Expected an async method; found a sync one.' 103 # ) 104 # raise ValueError(msg) 105 106 # Check annotation types to determine what message types we handle. 107 # Return-type annotation can be a Union, but we probably don't 108 # have it available at runtime. Explicitly pull it in. 109 # UPDATE: we've updated our pylint filter to where we should 110 # have all annotations available. 111 # anns = get_type_hints(call, localns={'Union': Union}) 112 anns = get_type_hints(call) 113 114 msgtype = anns.get('msg') 115 if not isinstance(msgtype, type): 116 raise TypeError( 117 f'expected a type for "msg" annotation; got {type(msgtype)}.' 118 ) 119 assert issubclass(msgtype, Message) 120 121 ret = anns.get('return') 122 responsetypes: tuple[type[Any] | None, ...] 123 124 # Return types can be a single type or a union of types. 125 if isinstance(ret, (_GenericAlias, types.UnionType)): 126 targs = get_args(ret) 127 if not all(isinstance(a, (type, type(None))) for a in targs): 128 raise TypeError( 129 f'expected only types for "return" annotation;' 130 f' got {targs}.' 131 ) 132 responsetypes = targs 133 else: 134 if not isinstance(ret, (type, type(None))): 135 raise TypeError( 136 f'expected one or more types for' 137 f' "return" annotation; got a {type(ret)}.' 138 ) 139 # This seems like maybe a mypy bug. Appeared after adding 140 # types.UnionType above. 141 responsetypes = (ret,) 142 143 # This will contain NoneType for empty return cases, but 144 # we expect it to be None. 145 # noinspection PyPep8 146 responsetypes = tuple( 147 None if r is type(None) else r for r in responsetypes 148 ) 149 150 # Make sure our protocol has this message type registered and our 151 # return types exactly match. (Technically we could return a subset 152 # of the supported types; can allow this in the future if it makes 153 # sense). 154 registered_types = self.protocol.message_ids_by_type.keys() 155 156 if msgtype not in registered_types: 157 raise TypeError( 158 f'Message type {msgtype} is not registered' 159 f' in this Protocol.' 160 ) 161 162 if msgtype in self._handlers: 163 raise TypeError( 164 f'Message type {msgtype} already has a registered' f' handler.' 165 ) 166 167 # Make sure the responses exactly matches what the message expects. 168 if set(responsetypes) != set(msgtype.get_response_types()): 169 raise TypeError( 170 f'Provided response types {responsetypes} do not' 171 f' match the set expected by message type {msgtype}: ' 172 f'({msgtype.get_response_types()})' 173 ) 174 175 # Ok; we're good! 176 self._handlers[msgtype] = call
Register a handler call.
The message type handled by the call is determined by its type annotation.
178 def decode_filter_method( 179 self, call: Callable[[Any, dict, Message], None] 180 ) -> Callable[[Any, dict, Message], None]: 181 """Function decorator for defining a decode filter. 182 183 Decode filters can be used to extract extra data from incoming 184 message dicts. This version will work for both handle_raw_message() 185 and handle_raw_message_async() 186 """ 187 assert self._decode_filter_call is None 188 self._decode_filter_call = call 189 return call
Function decorator for defining a decode filter.
Decode filters can be used to extract extra data from incoming message dicts. This version will work for both handle_raw_message() and handle_raw_message_async()
191 def encode_filter_method( 192 self, 193 call: Callable[ 194 [Any, Message | None, Response | SysResponse, dict], None 195 ], 196 ) -> Callable[[Any, Message | None, Response, dict], None]: 197 """Function decorator for defining an encode filter. 198 199 Encode filters can be used to add extra data to the message 200 dict before is is encoded to a string and sent out. 201 """ 202 assert self._encode_filter_call is None 203 self._encode_filter_call = call 204 return call
Function decorator for defining an encode filter.
Encode filters can be used to add extra data to the message dict before is is encoded to a string and sent out.
206 def validate(self, log_only: bool = False) -> None: 207 """Check for handler completeness, valid types, etc.""" 208 for msgtype in self.protocol.message_ids_by_type.keys(): 209 if issubclass(msgtype, Response): 210 continue 211 if msgtype not in self._handlers: 212 msg = ( 213 f'Protocol message type {msgtype} is not handled' 214 f' by receiver type {type(self)}.' 215 ) 216 if log_only: 217 logging.error(msg) 218 else: 219 raise TypeError(msg)
Check for handler completeness, valid types, etc.
238 def encode_user_response( 239 self, bound_obj: Any, message: Message, response: Response | None 240 ) -> str: 241 """Encode a response provided by the user for sending.""" 242 243 assert isinstance(response, Response | None) 244 # (user should never explicitly return error-responses) 245 assert ( 246 response is None or type(response) in message.get_response_types() 247 ) 248 249 # A return value of None equals EmptySysResponse. 250 out_response: Response | SysResponse 251 if response is None: 252 out_response = EmptySysResponse() 253 else: 254 out_response = response 255 256 response_dict = self.protocol.response_to_dict(out_response) 257 if self._encode_filter_call is not None: 258 self._encode_filter_call( 259 bound_obj, message, out_response, response_dict 260 ) 261 return self.protocol.encode_dict(response_dict)
Encode a response provided by the user for sending.
263 def encode_error_response( 264 self, bound_obj: Any, message: Message | None, exc: Exception 265 ) -> tuple[str, bool]: 266 """Given an error, return sysresponse str and whether to log.""" 267 response, dolog = self.protocol.error_to_response(exc) 268 response_dict = self.protocol.response_to_dict(response) 269 if self._encode_filter_call is not None: 270 self._encode_filter_call( 271 bound_obj, message, response, response_dict 272 ) 273 return self.protocol.encode_dict(response_dict), dolog
Given an error, return sysresponse str and whether to log.
275 def handle_raw_message( 276 self, bound_obj: Any, msg: str, raise_unregistered: bool = False 277 ) -> str: 278 """Decode, handle, and return an response for a message. 279 280 if 'raise_unregistered' is True, will raise an 281 efro.message.UnregisteredMessageIDError for messages not handled by 282 the protocol. In all other cases local errors will translate to 283 error responses returned to the sender. 284 """ 285 assert not self.is_async, "can't call sync handler on async receiver" 286 msg_decoded: Message | None = None 287 msgtype: type[Message] | None = None 288 try: 289 msg_decoded = self._decode_incoming_message(bound_obj, msg) 290 msgtype = type(msg_decoded) 291 handler = self._handlers.get(msgtype) 292 if handler is None: 293 raise RuntimeError(f'Got unhandled message type: {msgtype}.') 294 response = handler(bound_obj, msg_decoded) 295 assert isinstance(response, Response | None) 296 return self.encode_user_response(bound_obj, msg_decoded, response) 297 298 except Exception as exc: 299 if raise_unregistered and isinstance( 300 exc, UnregisteredMessageIDError 301 ): 302 raise 303 rstr, dolog = self.encode_error_response( 304 bound_obj, msg_decoded, exc 305 ) 306 if dolog: 307 if msgtype is not None: 308 logging.exception( 309 'Error handling %s.%s message.', 310 msgtype.__module__, 311 msgtype.__qualname__, 312 ) 313 else: 314 logging.exception('Error in efro.message handling.') 315 return rstr
Decode, handle, and return an response for a message.
if 'raise_unregistered' is True, will raise an efro.message.UnregisteredMessageIDError for messages not handled by the protocol. In all other cases local errors will translate to error responses returned to the sender.
317 def handle_raw_message_async( 318 self, bound_obj: Any, msg: str, raise_unregistered: bool = False 319 ) -> Awaitable[str]: 320 """Should be called when the receiver gets a message. 321 322 The return value is the raw response to the message. 323 """ 324 325 # Note: This call is synchronous so that the first part of it can 326 # happen synchronously. If the whole call were async we wouldn't be 327 # able to guarantee that messages handlers would be called in the 328 # order the messages were received. 329 330 assert self.is_async, "can't call async handler on sync receiver" 331 msg_decoded: Message | None = None 332 msgtype: type[Message] | None = None 333 try: 334 msg_decoded = self._decode_incoming_message(bound_obj, msg) 335 msgtype = type(msg_decoded) 336 handler = self._handlers.get(msgtype) 337 if handler is None: 338 raise RuntimeError(f'Got unhandled message type: {msgtype}.') 339 handler_awaitable = handler(bound_obj, msg_decoded) 340 341 except Exception as exc: 342 if raise_unregistered and isinstance( 343 exc, UnregisteredMessageIDError 344 ): 345 raise 346 return self._handle_raw_message_async_error( 347 bound_obj, msg_decoded, msgtype, exc 348 ) 349 350 # Return an awaitable to handle the rest asynchronously. 351 return self._handle_raw_message_async( 352 bound_obj, msg_decoded, msgtype, handler_awaitable 353 )
Should be called when the receiver gets a message.
The return value is the raw response to the message.
396class BoundMessageReceiver: 397 """Base bound receiver class.""" 398 399 def __init__( 400 self, 401 obj: Any, 402 receiver: MessageReceiver, 403 ) -> None: 404 assert obj is not None 405 self._obj = obj 406 self._receiver = receiver 407 408 @property 409 def protocol(self) -> MessageProtocol: 410 """Protocol associated with this receiver.""" 411 return self._receiver.protocol 412 413 def encode_error_response(self, exc: Exception) -> str: 414 """Given an error, return a response ready to send. 415 416 This should be used for any errors that happen outside of 417 standard handle_raw_message calls. Any errors within those 418 calls will be automatically returned as encoded strings. 419 """ 420 # Passing None for Message here; we would only have that available 421 # for things going wrong in the handler (which this is not for). 422 return self._receiver.encode_error_response(self._obj, None, exc)[0]
Base bound receiver class.
413 def encode_error_response(self, exc: Exception) -> str: 414 """Given an error, return a response ready to send. 415 416 This should be used for any errors that happen outside of 417 standard handle_raw_message calls. Any errors within those 418 calls will be automatically returned as encoded strings. 419 """ 420 # Passing None for Message here; we would only have that available 421 # for things going wrong in the handler (which this is not for). 422 return self._receiver.encode_error_response(self._obj, None, exc)[0]
Given an error, return a response ready to send.
This should be used for any errors that happen outside of standard handle_raw_message calls. Any errors within those calls will be automatically returned as encoded strings.
18def create_sender_module( 19 basename: str, 20 protocol_create_code: str, 21 enable_sync_sends: bool, 22 enable_async_sends: bool, 23 private: bool = False, 24 protocol_module_level_import_code: str | None = None, 25 build_time_protocol_create_code: str | None = None, 26) -> str: 27 """Create a Python module defining a MessageSender subclass. 28 29 This class is primarily for type checking and will contain overrides 30 for the varieties of send calls for message/response types defined 31 in the protocol. 32 33 Code passed for 'protocol_create_code' should import necessary 34 modules and assign an instance of the Protocol to a 'protocol' 35 variable. 36 37 Class names are based on basename; a basename 'FooSender' will 38 result in classes FooSender and BoundFooSender. 39 40 If 'private' is True, class-names will be prefixed with an '_'. 41 42 Note: output code may have long lines and should generally be run 43 through a formatter. We should perhaps move this functionality to 44 efrotools so we can include that functionality inline. 45 """ 46 protocol = _protocol_from_code( 47 build_time_protocol_create_code 48 if build_time_protocol_create_code is not None 49 else protocol_create_code 50 ) 51 return protocol.do_create_sender_module( 52 basename=basename, 53 protocol_create_code=protocol_create_code, 54 enable_sync_sends=enable_sync_sends, 55 enable_async_sends=enable_async_sends, 56 private=private, 57 protocol_module_level_import_code=protocol_module_level_import_code, 58 )
Create a Python module defining a MessageSender subclass.
This class is primarily for type checking and will contain overrides for the varieties of send calls for message/response types defined in the protocol.
Code passed for 'protocol_create_code' should import necessary modules and assign an instance of the Protocol to a 'protocol' variable.
Class names are based on basename; a basename 'FooSender' will result in classes FooSender and BoundFooSender.
If 'private' is True, class-names will be prefixed with an '_'.
Note: output code may have long lines and should generally be run through a formatter. We should perhaps move this functionality to efrotools so we can include that functionality inline.
61def create_receiver_module( 62 basename: str, 63 protocol_create_code: str, 64 is_async: bool, 65 private: bool = False, 66 protocol_module_level_import_code: str | None = None, 67 build_time_protocol_create_code: str | None = None, 68) -> str: 69 """ "Create a Python module defining a MessageReceiver subclass. 70 71 This class is primarily for type checking and will contain overrides 72 for the register method for message/response types defined in 73 the protocol. 74 75 Class names are based on basename; a basename 'FooReceiver' will 76 result in FooReceiver and BoundFooReceiver. 77 78 If 'is_async' is True, handle_raw_message() will be an async method 79 and the @handler decorator will expect async methods. 80 81 If 'private' is True, class-names will be prefixed with an '_'. 82 83 Note that line lengths are not clipped, so output may need to be 84 run through a formatter to prevent lint warnings about excessive 85 line lengths. 86 """ 87 protocol = _protocol_from_code( 88 build_time_protocol_create_code 89 if build_time_protocol_create_code is not None 90 else protocol_create_code 91 ) 92 return protocol.do_create_receiver_module( 93 basename=basename, 94 protocol_create_code=protocol_create_code, 95 is_async=is_async, 96 private=private, 97 protocol_module_level_import_code=protocol_module_level_import_code, 98 )
"Create a Python module defining a MessageReceiver subclass.
This class is primarily for type checking and will contain overrides for the register method for message/response types defined in the protocol.
Class names are based on basename; a basename 'FooReceiver' will result in FooReceiver and BoundFooReceiver.
If 'is_async' is True, handle_raw_message() will be an async method and the @handler decorator will expect async methods.
If 'private' is True, class-names will be prefixed with an '_'.
Note that line lengths are not clipped, so output may need to be run through a formatter to prevent lint warnings about excessive line lengths.
20class UnregisteredMessageIDError(Exception): 21 """A message or response id is not covered by our protocol."""
A message or response id is not covered by our protocol.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- add_note
- args