efro.message
Functionality for sending and responding to messages. Supports static typing for message types and possible return types.
1# Released under the MIT License. See LICENSE for details. 2# 3"""Functionality for sending and responding to messages. 4Supports static typing for message types and possible return types. 5""" 6 7from __future__ import annotations 8 9from efro.util import set_canonical_module_names 10from efro.message._protocol import MessageProtocol 11from efro.message._sender import MessageSender, BoundMessageSender 12from efro.message._receiver import MessageReceiver, BoundMessageReceiver 13from efro.message._module import create_sender_module, create_receiver_module 14from efro.message._message import ( 15 Message, 16 Response, 17 SysResponse, 18 EmptySysResponse, 19 ErrorSysResponse, 20 StringResponse, 21 BoolResponse, 22 UnregisteredMessageIDError, 23) 24 25__all__ = [ 26 'Message', 27 'Response', 28 'SysResponse', 29 'EmptySysResponse', 30 'ErrorSysResponse', 31 'StringResponse', 32 'BoolResponse', 33 'MessageProtocol', 34 'MessageSender', 35 'BoundMessageSender', 36 'MessageReceiver', 37 'BoundMessageReceiver', 38 'create_sender_module', 39 'create_receiver_module', 40 'UnregisteredMessageIDError', 41] 42 43# Have these things present themselves cleanly as 'thismodule.SomeClass' 44# instead of 'thismodule._internalmodule.SomeClass' 45set_canonical_module_names(globals())
24class Message: 25 """Base class for messages.""" 26 27 @classmethod 28 def get_response_types(cls) -> list[type[Response] | None]: 29 """Return all Response types this Message can return when sent. 30 31 The default implementation specifies a None return type. 32 """ 33 return [None]
Base class for messages.
27 @classmethod 28 def get_response_types(cls) -> list[type[Response] | None]: 29 """Return all Response types this Message can return when sent. 30 31 The default implementation specifies a None return type. 32 """ 33 return [None]
Return all Response types this Message can return when sent.
The default implementation specifies a None return type.
Base class for responses to messages.
40class SysResponse: 41 """Base class for system-responses to messages. 42 43 These are only sent/handled by the messaging system itself; 44 users of the api never see them. 45 """ 46 47 def set_local_exception(self, exc: Exception) -> None: 48 """Attach a local exception to facilitate better logging/handling. 49 50 Be aware that this data does not get serialized and only 51 exists on the local object. 52 """ 53 setattr(self, '_sr_local_exception', exc) 54 55 def get_local_exception(self) -> Exception | None: 56 """Fetch a local attached exception.""" 57 value = getattr(self, '_sr_local_exception', None) 58 assert isinstance(value, Exception | None) 59 return value
Base class for system-responses to messages.
These are only sent/handled by the messaging system itself; users of the api never see them.
47 def set_local_exception(self, exc: Exception) -> None: 48 """Attach a local exception to facilitate better logging/handling. 49 50 Be aware that this data does not get serialized and only 51 exists on the local object. 52 """ 53 setattr(self, '_sr_local_exception', exc)
Attach a local exception to facilitate better logging/handling.
Be aware that this data does not get serialized and only exists on the local object.
86@ioprepped 87@dataclass 88class EmptySysResponse(SysResponse): 89 """The response equivalent of None."""
The response equivalent of None.
Inherited Members
65@ioprepped 66@dataclass 67class ErrorSysResponse(SysResponse): 68 """SysResponse saying some error has occurred for the send. 69 70 This generally results in an Exception being raised for the caller. 71 """ 72 73 class ErrorType(Enum): 74 """Type of error that occurred while sending a message.""" 75 76 REMOTE = 0 77 REMOTE_CLEAN = 1 78 LOCAL = 2 79 COMMUNICATION = 3 80 REMOTE_COMMUNICATION = 4 81 82 error_message: Annotated[str, IOAttrs('m')] 83 error_type: Annotated[ErrorType, IOAttrs('e')] = ErrorType.REMOTE
SysResponse saying some error has occurred for the send.
This generally results in an Exception being raised for the caller.
Inherited Members
73 class ErrorType(Enum): 74 """Type of error that occurred while sending a message.""" 75 76 REMOTE = 0 77 REMOTE_CLEAN = 1 78 LOCAL = 2 79 COMMUNICATION = 3 80 REMOTE_COMMUNICATION = 4
Type of error that occurred while sending a message.
Inherited Members
- enum.Enum
- name
- value
104@ioprepped 105@dataclass 106class StringResponse(Response): 107 """A simple string value response.""" 108 109 value: Annotated[str, IOAttrs('v')]
A simple string value response.
96@ioprepped 97@dataclass 98class BoolResponse(Response): 99 """A simple bool value response.""" 100 101 value: Annotated[bool, IOAttrs('v')]
A simple bool value response.
33class MessageProtocol: 34 """Wrangles a set of message types, formats, and response types. 35 Both endpoints must be using a compatible Protocol for communication 36 to succeed. To maintain Protocol compatibility between revisions, 37 all message types must retain the same id, message attr storage 38 names must not change, newly added attrs must have default values, 39 etc. 40 """ 41 42 def __init__( 43 self, 44 message_types: dict[int, type[Message]], 45 response_types: dict[int, type[Response]], 46 forward_communication_errors: bool = False, 47 forward_clean_errors: bool = False, 48 remote_errors_include_stack_traces: bool = False, 49 log_remote_errors: bool = True, 50 ) -> None: 51 """Create a protocol with a given configuration. 52 53 If 'forward_communication_errors' is True, 54 efro.error.CommunicationErrors raised on the receiver end will 55 result in a matching error raised back on the sender. This can 56 be useful if the receiver will be in some way forwarding 57 messages along and the sender doesn't need to know where 58 communication breakdowns occurred; only that they did. 59 60 If 'forward_clean_errors' is True, efro.error.CleanError 61 exceptions raised on the receiver end will result in a matching 62 CleanError raised back on the sender. 63 64 When an exception is not covered by the optional forwarding 65 mechanisms above, it will come across as efro.error.RemoteError 66 and the exception will be logged on the receiver 67 end - at least by default (see details below). 68 69 If 'remote_errors_include_stack_traces' is True, stringified 70 stack traces will be returned with efro.error.RemoteError 71 exceptions. This is useful for debugging but should only be 72 enabled in cases where the sender is trusted to see internal 73 details of the receiver. 74 75 By default, when a message-handling exception will result in an 76 efro.error.RemoteError being returned to the sender, the 77 exception will be logged on the receiver. This is because the 78 goal is usually to avoid returning opaque RemoteErrors and to 79 instead return something meaningful as part of the expected 80 response type (even if that value itself represents a logical 81 error state). If 'log_remote_errors' is False, however, such 82 exceptions will not be logged on the receiver. This can be 83 useful in combination with 'remote_errors_include_stack_traces' 84 and 'forward_clean_errors' in situations where all error 85 logging/management will be happening on the sender end. Be 86 aware, however, that in that case it may be possible for 87 communication errors to prevent such error messages from 88 ever being seen. 89 """ 90 # pylint: disable=too-many-locals 91 self.message_types_by_id: dict[int, type[Message]] = {} 92 self.message_ids_by_type: dict[type[Message], int] = {} 93 self.response_types_by_id: dict[ 94 int, type[Response] | type[SysResponse] 95 ] = {} 96 self.response_ids_by_type: dict[ 97 type[Response] | type[SysResponse], int 98 ] = {} 99 for m_id, m_type in message_types.items(): 100 # Make sure only valid message types were passed and each 101 # id was assigned only once. 102 assert isinstance(m_id, int) 103 assert m_id >= 0 104 assert is_ioprepped_dataclass(m_type) and issubclass( 105 m_type, Message 106 ) 107 assert self.message_types_by_id.get(m_id) is None 108 self.message_types_by_id[m_id] = m_type 109 self.message_ids_by_type[m_type] = m_id 110 111 for r_id, r_type in response_types.items(): 112 assert isinstance(r_id, int) 113 assert r_id >= 0 114 assert is_ioprepped_dataclass(r_type) and issubclass( 115 r_type, Response 116 ) 117 assert self.response_types_by_id.get(r_id) is None 118 self.response_types_by_id[r_id] = r_type 119 self.response_ids_by_type[r_type] = r_id 120 121 # Register our SysResponse types. These use negative 122 # IDs so as to never overlap with user Response types. 123 def _reg_sys(reg_tp: type[SysResponse], reg_id: int) -> None: 124 assert self.response_types_by_id.get(reg_id) is None 125 self.response_types_by_id[reg_id] = reg_tp 126 self.response_ids_by_type[reg_tp] = reg_id 127 128 _reg_sys(ErrorSysResponse, -1) 129 _reg_sys(EmptySysResponse, -2) 130 131 # Some extra-thorough validation in debug mode. 132 if __debug__: 133 # Make sure all Message types' return types are valid 134 # and have been assigned an ID as well. 135 all_response_types: set[type[Response] | None] = set() 136 for m_id, m_type in message_types.items(): 137 m_rtypes = m_type.get_response_types() 138 139 assert isinstance(m_rtypes, list) 140 assert ( 141 m_rtypes 142 ), f'Message type {m_type} specifies no return types.' 143 assert len(set(m_rtypes)) == len(m_rtypes) # check dups 144 for m_rtype in m_rtypes: 145 all_response_types.add(m_rtype) 146 for cls in all_response_types: 147 if cls is None: 148 continue 149 assert is_ioprepped_dataclass(cls) 150 assert issubclass(cls, Response) 151 if cls not in self.response_ids_by_type: 152 raise ValueError( 153 f'Possible response type {cls} needs to be included' 154 f' in response_types for this protocol.' 155 ) 156 157 # Make sure all registered types have unique base names. 158 # We can take advantage of this to generate cleaner looking 159 # protocol modules. Can revisit if this is ever a problem. 160 mtypenames = set(tp.__name__ for tp in self.message_ids_by_type) 161 if len(mtypenames) != len(message_types): 162 raise ValueError( 163 'message_types contains duplicate __name__s;' 164 ' all types are required to have unique names.' 165 ) 166 167 self.forward_clean_errors = forward_clean_errors 168 self.forward_communication_errors = forward_communication_errors 169 self.remote_errors_include_stack_traces = ( 170 remote_errors_include_stack_traces 171 ) 172 self.log_remote_errors = log_remote_errors 173 174 @staticmethod 175 def encode_dict(obj: dict) -> str: 176 """Json-encode a provided dict.""" 177 return json.dumps(obj, separators=(',', ':')) 178 179 def message_to_dict(self, message: Message) -> dict: 180 """Encode a message to a json ready dict.""" 181 return self._to_dict(message, self.message_ids_by_type, 'message') 182 183 def response_to_dict(self, response: Response | SysResponse) -> dict: 184 """Encode a response to a json ready dict.""" 185 return self._to_dict(response, self.response_ids_by_type, 'response') 186 187 def error_to_response(self, exc: Exception) -> tuple[SysResponse, bool]: 188 """Translate an Exception to a SysResponse. 189 190 Also returns whether the error should be logged if this happened 191 within handle_raw_message(). 192 """ 193 194 # If anything goes wrong, return a ErrorSysResponse instead. 195 # (either CLEAN or generic REMOTE) 196 if self.forward_clean_errors and isinstance(exc, CleanError): 197 return ( 198 ErrorSysResponse( 199 error_message=str(exc), 200 error_type=ErrorSysResponse.ErrorType.REMOTE_CLEAN, 201 ), 202 False, 203 ) 204 if self.forward_communication_errors and isinstance( 205 exc, CommunicationError 206 ): 207 return ( 208 ErrorSysResponse( 209 error_message=str(exc), 210 error_type=ErrorSysResponse.ErrorType.REMOTE_COMMUNICATION, 211 ), 212 False, 213 ) 214 return ( 215 ErrorSysResponse( 216 error_message=( 217 traceback.format_exc() 218 if self.remote_errors_include_stack_traces 219 else 'An internal error has occurred.' 220 ), 221 error_type=ErrorSysResponse.ErrorType.REMOTE, 222 ), 223 self.log_remote_errors, 224 ) 225 226 def _to_dict( 227 self, message: Any, ids_by_type: dict[type, int], opname: str 228 ) -> dict: 229 """Encode a message to a json string for transport.""" 230 231 m_id: int | None = ids_by_type.get(type(message)) 232 if m_id is None: 233 raise TypeError( 234 f'{opname} type is not registered in protocol:' 235 f' {type(message)}' 236 ) 237 out = {'t': m_id, 'm': dataclass_to_dict(message)} 238 return out 239 240 @staticmethod 241 def decode_dict(data: str) -> dict: 242 """Decode data to a dict.""" 243 out = json.loads(data) 244 assert isinstance(out, dict) 245 return out 246 247 def message_from_dict(self, data: dict) -> Message: 248 """Decode a message from a json string.""" 249 out = self._from_dict(data, self.message_types_by_id, 'message') 250 assert isinstance(out, Message) 251 return out 252 253 def response_from_dict(self, data: dict) -> Response | SysResponse: 254 """Decode a response from a json string.""" 255 out = self._from_dict(data, self.response_types_by_id, 'response') 256 assert isinstance(out, Response | SysResponse) 257 return out 258 259 # Weeeird; we get mypy errors returning dict[int, type] but 260 # dict[int, typing.Type] or dict[int, type[Any]] works.. 261 def _from_dict( 262 self, data: dict, types_by_id: dict[int, type[Any]], opname: str 263 ) -> Any: 264 """Decode a message from a json string.""" 265 msgdict: dict | None 266 267 m_id = data.get('t') 268 # Allow omitting 'm' dict if its empty. 269 msgdict = data.get('m', {}) 270 271 assert isinstance(m_id, int) 272 assert isinstance(msgdict, dict) 273 274 # Decode this particular type. 275 msgtype = types_by_id.get(m_id) 276 if msgtype is None: 277 raise UnregisteredMessageIDError( 278 f'Got unregistered {opname} id of {m_id}.' 279 ) 280 return dataclass_from_dict(msgtype, msgdict) 281 282 def _get_module_header( 283 self, 284 part: Literal['sender', 'receiver'], 285 extra_import_code: str | None, 286 enable_async_sends: bool, 287 ) -> str: 288 """Return common parts of generated modules.""" 289 # pylint: disable=too-many-locals 290 # pylint: disable=too-many-branches 291 # pylint: disable=too-many-statements 292 import textwrap 293 294 tpimports: dict[str, list[str]] = {} 295 imports: dict[str, list[str]] = {} 296 297 single_message_type = len(self.message_ids_by_type) == 1 298 299 msgtypes = list(self.message_ids_by_type) 300 if part == 'sender': 301 msgtypes.append(Message) 302 for msgtype in msgtypes: 303 tpimports.setdefault(msgtype.__module__, []).append( 304 msgtype.__name__ 305 ) 306 rsptypes = list(self.response_ids_by_type) 307 if part == 'sender': 308 rsptypes.append(Response) 309 for rsp_tp in rsptypes: 310 # Skip these as they don't actually show up in code. 311 if rsp_tp is EmptySysResponse or rsp_tp is ErrorSysResponse: 312 continue 313 if ( 314 single_message_type 315 and part == 'sender' 316 and rsp_tp is not Response 317 ): 318 # We need to cast to the single supported response type 319 # in this case so need response types at runtime. 320 imports.setdefault(rsp_tp.__module__, []).append( 321 rsp_tp.__name__ 322 ) 323 else: 324 tpimports.setdefault(rsp_tp.__module__, []).append( 325 rsp_tp.__name__ 326 ) 327 328 import_lines = '' 329 tpimport_lines = '' 330 331 for module, names in sorted(imports.items()): 332 jnames = ', '.join(names) 333 line = f'from {module} import {jnames}' 334 if len(line) > 80: 335 # Recreate in a wrapping-friendly form. 336 line = f'from {module} import ({jnames})' 337 import_lines += f'{line}\n' 338 for module, names in sorted(tpimports.items()): 339 jnames = ', '.join(names) 340 line = f'from {module} import {jnames}' 341 if len(line) > 75: # Account for indent 342 # Recreate in a wrapping-friendly form. 343 line = f'from {module} import ({jnames})' 344 tpimport_lines += f'{line}\n' 345 346 if part == 'sender': 347 import_lines += ( 348 'from efro.message import MessageSender, BoundMessageSender\n' 349 ) 350 tpimport_typing_extras = '' 351 else: 352 if single_message_type: 353 import_lines += ( 354 'from efro.message import (MessageReceiver,' 355 ' BoundMessageReceiver, Message, Response)\n' 356 ) 357 else: 358 import_lines += ( 359 'from efro.message import MessageReceiver,' 360 ' BoundMessageReceiver\n' 361 ) 362 tpimport_typing_extras = ', Awaitable' 363 364 if extra_import_code is not None: 365 import_lines += extra_import_code 366 367 ovld = ', overload' if not single_message_type else '' 368 ovld2 = ( 369 ', cast, Awaitable' 370 if (single_message_type and part == 'sender' and enable_async_sends) 371 else '' 372 ) 373 tpimport_lines = textwrap.indent(tpimport_lines, ' ') 374 375 baseimps = ['Any'] 376 if part == 'receiver': 377 baseimps.append('Callable') 378 if part == 'sender' and enable_async_sends: 379 baseimps.append('Awaitable') 380 baseimps_s = ', '.join(baseimps) 381 out = ( 382 '# Released under the MIT License. See LICENSE for details.\n' 383 f'#\n' 384 f'"""Auto-generated {part} module. Do not edit by hand."""\n' 385 f'\n' 386 f'from __future__ import annotations\n' 387 f'\n' 388 f'from typing import TYPE_CHECKING{ovld}{ovld2}\n' 389 f'\n' 390 f'{import_lines}' 391 f'\n' 392 f'if TYPE_CHECKING:\n' 393 f' from typing import {baseimps_s}' 394 f'{tpimport_typing_extras}\n' 395 f'{tpimport_lines}' 396 f'\n' 397 f'\n' 398 ) 399 return out 400 401 def do_create_sender_module( 402 self, 403 basename: str, 404 protocol_create_code: str, 405 enable_sync_sends: bool, 406 enable_async_sends: bool, 407 private: bool = False, 408 protocol_module_level_import_code: str | None = None, 409 ) -> str: 410 """Used by create_sender_module(); do not call directly.""" 411 # pylint: disable=too-many-locals 412 # pylint: disable=too-many-branches 413 import textwrap 414 415 msgtypes = list(self.message_ids_by_type.keys()) 416 417 ppre = '_' if private else '' 418 out = self._get_module_header( 419 'sender', 420 extra_import_code=protocol_module_level_import_code, 421 enable_async_sends=enable_async_sends, 422 ) 423 ccind = textwrap.indent(protocol_create_code, ' ') 424 out += ( 425 f'class {ppre}{basename}(MessageSender):\n' 426 f' """Protocol-specific sender."""\n' 427 f'\n' 428 f' def __init__(self) -> None:\n' 429 f'{ccind}\n' 430 f' super().__init__(protocol)\n' 431 f'\n' 432 f' def __get__(\n' 433 f' self, obj: Any, type_in: Any = None\n' 434 f' ) -> {ppre}Bound{basename}:\n' 435 f' return {ppre}Bound{basename}(obj, self)\n' 436 f'\n' 437 f'\n' 438 f'class {ppre}Bound{basename}(BoundMessageSender):\n' 439 f' """Protocol-specific bound sender."""\n' 440 ) 441 442 def _filt_tp_name(rtype: type[Response] | None) -> str: 443 return 'None' if rtype is None else rtype.__name__ 444 445 # Define handler() overloads for all registered message types. 446 if msgtypes: 447 for async_pass in False, True: 448 if async_pass and not enable_async_sends: 449 continue 450 if not async_pass and not enable_sync_sends: 451 continue 452 pfx = 'async ' if async_pass else '' 453 sfx = '_async' if async_pass else '' 454 # awt = 'await ' if async_pass else '' 455 awt = '' 456 how = 'asynchronously' if async_pass else 'synchronously' 457 458 if len(msgtypes) == 1: 459 # Special case: with a single message types we don't 460 # use overloads. 461 msgtype = msgtypes[0] 462 msgtypevar = msgtype.__name__ 463 rtypes = msgtype.get_response_types() 464 if len(rtypes) > 1: 465 rtypevar = ' | '.join(_filt_tp_name(t) for t in rtypes) 466 else: 467 rtypevar = _filt_tp_name(rtypes[0]) 468 if async_pass: 469 rtypevar = f'Awaitable[{rtypevar}]' 470 out += ( 471 f'\n' 472 f' def send{sfx}(self,' 473 f' message: {msgtypevar})' 474 f' -> {rtypevar}:\n' 475 f' """Send a message {how}."""\n' 476 f' out = {awt}self._sender.' 477 f'send{sfx}(self._obj, message)\n' 478 ) 479 if not async_pass: 480 out += ( 481 f' assert isinstance(out, {rtypevar})\n' 482 ' return out\n' 483 ) 484 else: 485 out += f' return cast({rtypevar}, out)\n' 486 487 else: 488 for msgtype in msgtypes: 489 msgtypevar = msgtype.__name__ 490 rtypes = msgtype.get_response_types() 491 if len(rtypes) > 1: 492 rtypevar = ' | '.join( 493 _filt_tp_name(t) for t in rtypes 494 ) 495 else: 496 rtypevar = _filt_tp_name(rtypes[0]) 497 out += ( 498 f'\n' 499 f' @overload\n' 500 f' {pfx}def send{sfx}(self,' 501 f' message: {msgtypevar})' 502 f' -> {rtypevar}: ...\n' 503 ) 504 rtypevar = 'Response | None' 505 if async_pass: 506 rtypevar = f'Awaitable[{rtypevar}]' 507 out += ( 508 f'\n' 509 f' def send{sfx}(self, message: Message)' 510 f' -> {rtypevar}:\n' 511 f' """Send a message {how}."""\n' 512 f' return {awt}self._sender.' 513 f'send{sfx}(self._obj, message)\n' 514 ) 515 516 return out 517 518 def do_create_receiver_module( 519 self, 520 basename: str, 521 protocol_create_code: str, 522 is_async: bool, 523 private: bool = False, 524 protocol_module_level_import_code: str | None = None, 525 ) -> str: 526 """Used by create_receiver_module(); do not call directly.""" 527 # pylint: disable=too-many-locals 528 import textwrap 529 530 desc = 'asynchronous' if is_async else 'synchronous' 531 ppre = '_' if private else '' 532 msgtypes = list(self.message_ids_by_type.keys()) 533 out = self._get_module_header( 534 'receiver', 535 extra_import_code=protocol_module_level_import_code, 536 enable_async_sends=False, 537 ) 538 ccind = textwrap.indent(protocol_create_code, ' ') 539 out += ( 540 f'class {ppre}{basename}(MessageReceiver):\n' 541 f' """Protocol-specific {desc} receiver."""\n' 542 f'\n' 543 f' is_async = {is_async}\n' 544 f'\n' 545 f' def __init__(self) -> None:\n' 546 f'{ccind}\n' 547 f' super().__init__(protocol)\n' 548 f'\n' 549 f' def __get__(\n' 550 f' self,\n' 551 f' obj: Any,\n' 552 f' type_in: Any = None,\n' 553 f' ) -> {ppre}Bound{basename}:\n' 554 f' return {ppre}Bound{basename}(' 555 f'obj, self)\n' 556 ) 557 558 # Define handler() overloads for all registered message types. 559 560 def _filt_tp_name(rtype: type[Response] | None) -> str: 561 return 'None' if rtype is None else rtype.__name__ 562 563 if msgtypes: 564 cbgn = 'Awaitable[' if is_async else '' 565 cend = ']' if is_async else '' 566 if len(msgtypes) == 1: 567 # Special case: when we have a single message type we don't 568 # use overloads. 569 msgtype = msgtypes[0] 570 msgtypevar = msgtype.__name__ 571 rtypes = msgtype.get_response_types() 572 if len(rtypes) > 1: 573 rtypevar = ' | '.join(_filt_tp_name(t) for t in rtypes) 574 else: 575 rtypevar = _filt_tp_name(rtypes[0]) 576 rtypevar = f'{cbgn}{rtypevar}{cend}' 577 out += ( 578 f'\n' 579 f' def handler(\n' 580 f' self,\n' 581 f' call: Callable[[Any, {msgtypevar}], ' 582 f'{rtypevar}],\n' 583 f' )' 584 f' -> Callable[[Any, {msgtypevar}], {rtypevar}]:\n' 585 f' """Decorator to register message handlers."""\n' 586 f' from typing import cast, Callable, Any\n' 587 f'\n' 588 f' self.register_handler(cast(Callable' 589 f'[[Any, Message], Response], call))\n' 590 f' return call\n' 591 ) 592 else: 593 for msgtype in msgtypes: 594 msgtypevar = msgtype.__name__ 595 rtypes = msgtype.get_response_types() 596 if len(rtypes) > 1: 597 rtypevar = ' | '.join(_filt_tp_name(t) for t in rtypes) 598 else: 599 rtypevar = _filt_tp_name(rtypes[0]) 600 rtypevar = f'{cbgn}{rtypevar}{cend}' 601 out += ( 602 f'\n' 603 f' @overload\n' 604 f' def handler(\n' 605 f' self,\n' 606 f' call: Callable[[Any, {msgtypevar}], ' 607 f'{rtypevar}],\n' 608 f' )' 609 f' -> Callable[[Any, {msgtypevar}], {rtypevar}]: ...\n' 610 ) 611 out += ( 612 '\n' 613 ' def handler(self, call: Callable) -> Callable:\n' 614 ' """Decorator to register message handlers."""\n' 615 ' self.register_handler(call)\n' 616 ' return call\n' 617 ) 618 619 out += ( 620 f'\n' 621 f'\n' 622 f'class {ppre}Bound{basename}(BoundMessageReceiver):\n' 623 f' """Protocol-specific bound receiver."""\n' 624 ) 625 if is_async: 626 out += ( 627 '\n' 628 ' def handle_raw_message(\n' 629 ' self, message: str, raise_unregistered: bool = False\n' 630 ' ) -> Awaitable[str]:\n' 631 ' """Asynchronously handle a raw incoming message."""\n' 632 ' return self._receiver.' 633 'handle_raw_message_async(\n' 634 ' self._obj, message, raise_unregistered\n' 635 ' )\n' 636 ) 637 638 else: 639 out += ( 640 '\n' 641 ' def handle_raw_message(\n' 642 ' self, message: str, raise_unregistered: bool = False\n' 643 ' ) -> str:\n' 644 ' """Synchronously handle a raw incoming message."""\n' 645 ' return self._receiver.handle_raw_message(\n' 646 ' self._obj, message, raise_unregistered\n' 647 ' )\n' 648 ) 649 650 return out
Wrangles a set of message types, formats, and response types. Both endpoints must be using a compatible Protocol for communication to succeed. To maintain Protocol compatibility between revisions, all message types must retain the same id, message attr storage names must not change, newly added attrs must have default values, etc.
42 def __init__( 43 self, 44 message_types: dict[int, type[Message]], 45 response_types: dict[int, type[Response]], 46 forward_communication_errors: bool = False, 47 forward_clean_errors: bool = False, 48 remote_errors_include_stack_traces: bool = False, 49 log_remote_errors: bool = True, 50 ) -> None: 51 """Create a protocol with a given configuration. 52 53 If 'forward_communication_errors' is True, 54 efro.error.CommunicationErrors raised on the receiver end will 55 result in a matching error raised back on the sender. This can 56 be useful if the receiver will be in some way forwarding 57 messages along and the sender doesn't need to know where 58 communication breakdowns occurred; only that they did. 59 60 If 'forward_clean_errors' is True, efro.error.CleanError 61 exceptions raised on the receiver end will result in a matching 62 CleanError raised back on the sender. 63 64 When an exception is not covered by the optional forwarding 65 mechanisms above, it will come across as efro.error.RemoteError 66 and the exception will be logged on the receiver 67 end - at least by default (see details below). 68 69 If 'remote_errors_include_stack_traces' is True, stringified 70 stack traces will be returned with efro.error.RemoteError 71 exceptions. This is useful for debugging but should only be 72 enabled in cases where the sender is trusted to see internal 73 details of the receiver. 74 75 By default, when a message-handling exception will result in an 76 efro.error.RemoteError being returned to the sender, the 77 exception will be logged on the receiver. This is because the 78 goal is usually to avoid returning opaque RemoteErrors and to 79 instead return something meaningful as part of the expected 80 response type (even if that value itself represents a logical 81 error state). If 'log_remote_errors' is False, however, such 82 exceptions will not be logged on the receiver. This can be 83 useful in combination with 'remote_errors_include_stack_traces' 84 and 'forward_clean_errors' in situations where all error 85 logging/management will be happening on the sender end. Be 86 aware, however, that in that case it may be possible for 87 communication errors to prevent such error messages from 88 ever being seen. 89 """ 90 # pylint: disable=too-many-locals 91 self.message_types_by_id: dict[int, type[Message]] = {} 92 self.message_ids_by_type: dict[type[Message], int] = {} 93 self.response_types_by_id: dict[ 94 int, type[Response] | type[SysResponse] 95 ] = {} 96 self.response_ids_by_type: dict[ 97 type[Response] | type[SysResponse], int 98 ] = {} 99 for m_id, m_type in message_types.items(): 100 # Make sure only valid message types were passed and each 101 # id was assigned only once. 102 assert isinstance(m_id, int) 103 assert m_id >= 0 104 assert is_ioprepped_dataclass(m_type) and issubclass( 105 m_type, Message 106 ) 107 assert self.message_types_by_id.get(m_id) is None 108 self.message_types_by_id[m_id] = m_type 109 self.message_ids_by_type[m_type] = m_id 110 111 for r_id, r_type in response_types.items(): 112 assert isinstance(r_id, int) 113 assert r_id >= 0 114 assert is_ioprepped_dataclass(r_type) and issubclass( 115 r_type, Response 116 ) 117 assert self.response_types_by_id.get(r_id) is None 118 self.response_types_by_id[r_id] = r_type 119 self.response_ids_by_type[r_type] = r_id 120 121 # Register our SysResponse types. These use negative 122 # IDs so as to never overlap with user Response types. 123 def _reg_sys(reg_tp: type[SysResponse], reg_id: int) -> None: 124 assert self.response_types_by_id.get(reg_id) is None 125 self.response_types_by_id[reg_id] = reg_tp 126 self.response_ids_by_type[reg_tp] = reg_id 127 128 _reg_sys(ErrorSysResponse, -1) 129 _reg_sys(EmptySysResponse, -2) 130 131 # Some extra-thorough validation in debug mode. 132 if __debug__: 133 # Make sure all Message types' return types are valid 134 # and have been assigned an ID as well. 135 all_response_types: set[type[Response] | None] = set() 136 for m_id, m_type in message_types.items(): 137 m_rtypes = m_type.get_response_types() 138 139 assert isinstance(m_rtypes, list) 140 assert ( 141 m_rtypes 142 ), f'Message type {m_type} specifies no return types.' 143 assert len(set(m_rtypes)) == len(m_rtypes) # check dups 144 for m_rtype in m_rtypes: 145 all_response_types.add(m_rtype) 146 for cls in all_response_types: 147 if cls is None: 148 continue 149 assert is_ioprepped_dataclass(cls) 150 assert issubclass(cls, Response) 151 if cls not in self.response_ids_by_type: 152 raise ValueError( 153 f'Possible response type {cls} needs to be included' 154 f' in response_types for this protocol.' 155 ) 156 157 # Make sure all registered types have unique base names. 158 # We can take advantage of this to generate cleaner looking 159 # protocol modules. Can revisit if this is ever a problem. 160 mtypenames = set(tp.__name__ for tp in self.message_ids_by_type) 161 if len(mtypenames) != len(message_types): 162 raise ValueError( 163 'message_types contains duplicate __name__s;' 164 ' all types are required to have unique names.' 165 ) 166 167 self.forward_clean_errors = forward_clean_errors 168 self.forward_communication_errors = forward_communication_errors 169 self.remote_errors_include_stack_traces = ( 170 remote_errors_include_stack_traces 171 ) 172 self.log_remote_errors = log_remote_errors
Create a protocol with a given configuration.
If 'forward_communication_errors' is True, efro.error.CommunicationErrors raised on the receiver end will result in a matching error raised back on the sender. This can be useful if the receiver will be in some way forwarding messages along and the sender doesn't need to know where communication breakdowns occurred; only that they did.
If 'forward_clean_errors' is True, efro.error.CleanError exceptions raised on the receiver end will result in a matching CleanError raised back on the sender.
When an exception is not covered by the optional forwarding mechanisms above, it will come across as efro.error.RemoteError and the exception will be logged on the receiver end - at least by default (see details below).
If 'remote_errors_include_stack_traces' is True, stringified stack traces will be returned with efro.error.RemoteError exceptions. This is useful for debugging but should only be enabled in cases where the sender is trusted to see internal details of the receiver.
By default, when a message-handling exception will result in an efro.error.RemoteError being returned to the sender, the exception will be logged on the receiver. This is because the goal is usually to avoid returning opaque RemoteErrors and to instead return something meaningful as part of the expected response type (even if that value itself represents a logical error state). If 'log_remote_errors' is False, however, such exceptions will not be logged on the receiver. This can be useful in combination with 'remote_errors_include_stack_traces' and 'forward_clean_errors' in situations where all error logging/management will be happening on the sender end. Be aware, however, that in that case it may be possible for communication errors to prevent such error messages from ever being seen.
174 @staticmethod 175 def encode_dict(obj: dict) -> str: 176 """Json-encode a provided dict.""" 177 return json.dumps(obj, separators=(',', ':'))
Json-encode a provided dict.
179 def message_to_dict(self, message: Message) -> dict: 180 """Encode a message to a json ready dict.""" 181 return self._to_dict(message, self.message_ids_by_type, 'message')
Encode a message to a json ready dict.
183 def response_to_dict(self, response: Response | SysResponse) -> dict: 184 """Encode a response to a json ready dict.""" 185 return self._to_dict(response, self.response_ids_by_type, 'response')
Encode a response to a json ready dict.
187 def error_to_response(self, exc: Exception) -> tuple[SysResponse, bool]: 188 """Translate an Exception to a SysResponse. 189 190 Also returns whether the error should be logged if this happened 191 within handle_raw_message(). 192 """ 193 194 # If anything goes wrong, return a ErrorSysResponse instead. 195 # (either CLEAN or generic REMOTE) 196 if self.forward_clean_errors and isinstance(exc, CleanError): 197 return ( 198 ErrorSysResponse( 199 error_message=str(exc), 200 error_type=ErrorSysResponse.ErrorType.REMOTE_CLEAN, 201 ), 202 False, 203 ) 204 if self.forward_communication_errors and isinstance( 205 exc, CommunicationError 206 ): 207 return ( 208 ErrorSysResponse( 209 error_message=str(exc), 210 error_type=ErrorSysResponse.ErrorType.REMOTE_COMMUNICATION, 211 ), 212 False, 213 ) 214 return ( 215 ErrorSysResponse( 216 error_message=( 217 traceback.format_exc() 218 if self.remote_errors_include_stack_traces 219 else 'An internal error has occurred.' 220 ), 221 error_type=ErrorSysResponse.ErrorType.REMOTE, 222 ), 223 self.log_remote_errors, 224 )
Translate an Exception to a SysResponse.
Also returns whether the error should be logged if this happened within handle_raw_message().
240 @staticmethod 241 def decode_dict(data: str) -> dict: 242 """Decode data to a dict.""" 243 out = json.loads(data) 244 assert isinstance(out, dict) 245 return out
Decode data to a dict.
247 def message_from_dict(self, data: dict) -> Message: 248 """Decode a message from a json string.""" 249 out = self._from_dict(data, self.message_types_by_id, 'message') 250 assert isinstance(out, Message) 251 return out
Decode a message from a json string.
253 def response_from_dict(self, data: dict) -> Response | SysResponse: 254 """Decode a response from a json string.""" 255 out = self._from_dict(data, self.response_types_by_id, 'response') 256 assert isinstance(out, Response | SysResponse) 257 return out
Decode a response from a json string.
401 def do_create_sender_module( 402 self, 403 basename: str, 404 protocol_create_code: str, 405 enable_sync_sends: bool, 406 enable_async_sends: bool, 407 private: bool = False, 408 protocol_module_level_import_code: str | None = None, 409 ) -> str: 410 """Used by create_sender_module(); do not call directly.""" 411 # pylint: disable=too-many-locals 412 # pylint: disable=too-many-branches 413 import textwrap 414 415 msgtypes = list(self.message_ids_by_type.keys()) 416 417 ppre = '_' if private else '' 418 out = self._get_module_header( 419 'sender', 420 extra_import_code=protocol_module_level_import_code, 421 enable_async_sends=enable_async_sends, 422 ) 423 ccind = textwrap.indent(protocol_create_code, ' ') 424 out += ( 425 f'class {ppre}{basename}(MessageSender):\n' 426 f' """Protocol-specific sender."""\n' 427 f'\n' 428 f' def __init__(self) -> None:\n' 429 f'{ccind}\n' 430 f' super().__init__(protocol)\n' 431 f'\n' 432 f' def __get__(\n' 433 f' self, obj: Any, type_in: Any = None\n' 434 f' ) -> {ppre}Bound{basename}:\n' 435 f' return {ppre}Bound{basename}(obj, self)\n' 436 f'\n' 437 f'\n' 438 f'class {ppre}Bound{basename}(BoundMessageSender):\n' 439 f' """Protocol-specific bound sender."""\n' 440 ) 441 442 def _filt_tp_name(rtype: type[Response] | None) -> str: 443 return 'None' if rtype is None else rtype.__name__ 444 445 # Define handler() overloads for all registered message types. 446 if msgtypes: 447 for async_pass in False, True: 448 if async_pass and not enable_async_sends: 449 continue 450 if not async_pass and not enable_sync_sends: 451 continue 452 pfx = 'async ' if async_pass else '' 453 sfx = '_async' if async_pass else '' 454 # awt = 'await ' if async_pass else '' 455 awt = '' 456 how = 'asynchronously' if async_pass else 'synchronously' 457 458 if len(msgtypes) == 1: 459 # Special case: with a single message types we don't 460 # use overloads. 461 msgtype = msgtypes[0] 462 msgtypevar = msgtype.__name__ 463 rtypes = msgtype.get_response_types() 464 if len(rtypes) > 1: 465 rtypevar = ' | '.join(_filt_tp_name(t) for t in rtypes) 466 else: 467 rtypevar = _filt_tp_name(rtypes[0]) 468 if async_pass: 469 rtypevar = f'Awaitable[{rtypevar}]' 470 out += ( 471 f'\n' 472 f' def send{sfx}(self,' 473 f' message: {msgtypevar})' 474 f' -> {rtypevar}:\n' 475 f' """Send a message {how}."""\n' 476 f' out = {awt}self._sender.' 477 f'send{sfx}(self._obj, message)\n' 478 ) 479 if not async_pass: 480 out += ( 481 f' assert isinstance(out, {rtypevar})\n' 482 ' return out\n' 483 ) 484 else: 485 out += f' return cast({rtypevar}, out)\n' 486 487 else: 488 for msgtype in msgtypes: 489 msgtypevar = msgtype.__name__ 490 rtypes = msgtype.get_response_types() 491 if len(rtypes) > 1: 492 rtypevar = ' | '.join( 493 _filt_tp_name(t) for t in rtypes 494 ) 495 else: 496 rtypevar = _filt_tp_name(rtypes[0]) 497 out += ( 498 f'\n' 499 f' @overload\n' 500 f' {pfx}def send{sfx}(self,' 501 f' message: {msgtypevar})' 502 f' -> {rtypevar}: ...\n' 503 ) 504 rtypevar = 'Response | None' 505 if async_pass: 506 rtypevar = f'Awaitable[{rtypevar}]' 507 out += ( 508 f'\n' 509 f' def send{sfx}(self, message: Message)' 510 f' -> {rtypevar}:\n' 511 f' """Send a message {how}."""\n' 512 f' return {awt}self._sender.' 513 f'send{sfx}(self._obj, message)\n' 514 ) 515 516 return out
Used by create_sender_module(); do not call directly.
518 def do_create_receiver_module( 519 self, 520 basename: str, 521 protocol_create_code: str, 522 is_async: bool, 523 private: bool = False, 524 protocol_module_level_import_code: str | None = None, 525 ) -> str: 526 """Used by create_receiver_module(); do not call directly.""" 527 # pylint: disable=too-many-locals 528 import textwrap 529 530 desc = 'asynchronous' if is_async else 'synchronous' 531 ppre = '_' if private else '' 532 msgtypes = list(self.message_ids_by_type.keys()) 533 out = self._get_module_header( 534 'receiver', 535 extra_import_code=protocol_module_level_import_code, 536 enable_async_sends=False, 537 ) 538 ccind = textwrap.indent(protocol_create_code, ' ') 539 out += ( 540 f'class {ppre}{basename}(MessageReceiver):\n' 541 f' """Protocol-specific {desc} receiver."""\n' 542 f'\n' 543 f' is_async = {is_async}\n' 544 f'\n' 545 f' def __init__(self) -> None:\n' 546 f'{ccind}\n' 547 f' super().__init__(protocol)\n' 548 f'\n' 549 f' def __get__(\n' 550 f' self,\n' 551 f' obj: Any,\n' 552 f' type_in: Any = None,\n' 553 f' ) -> {ppre}Bound{basename}:\n' 554 f' return {ppre}Bound{basename}(' 555 f'obj, self)\n' 556 ) 557 558 # Define handler() overloads for all registered message types. 559 560 def _filt_tp_name(rtype: type[Response] | None) -> str: 561 return 'None' if rtype is None else rtype.__name__ 562 563 if msgtypes: 564 cbgn = 'Awaitable[' if is_async else '' 565 cend = ']' if is_async else '' 566 if len(msgtypes) == 1: 567 # Special case: when we have a single message type we don't 568 # use overloads. 569 msgtype = msgtypes[0] 570 msgtypevar = msgtype.__name__ 571 rtypes = msgtype.get_response_types() 572 if len(rtypes) > 1: 573 rtypevar = ' | '.join(_filt_tp_name(t) for t in rtypes) 574 else: 575 rtypevar = _filt_tp_name(rtypes[0]) 576 rtypevar = f'{cbgn}{rtypevar}{cend}' 577 out += ( 578 f'\n' 579 f' def handler(\n' 580 f' self,\n' 581 f' call: Callable[[Any, {msgtypevar}], ' 582 f'{rtypevar}],\n' 583 f' )' 584 f' -> Callable[[Any, {msgtypevar}], {rtypevar}]:\n' 585 f' """Decorator to register message handlers."""\n' 586 f' from typing import cast, Callable, Any\n' 587 f'\n' 588 f' self.register_handler(cast(Callable' 589 f'[[Any, Message], Response], call))\n' 590 f' return call\n' 591 ) 592 else: 593 for msgtype in msgtypes: 594 msgtypevar = msgtype.__name__ 595 rtypes = msgtype.get_response_types() 596 if len(rtypes) > 1: 597 rtypevar = ' | '.join(_filt_tp_name(t) for t in rtypes) 598 else: 599 rtypevar = _filt_tp_name(rtypes[0]) 600 rtypevar = f'{cbgn}{rtypevar}{cend}' 601 out += ( 602 f'\n' 603 f' @overload\n' 604 f' def handler(\n' 605 f' self,\n' 606 f' call: Callable[[Any, {msgtypevar}], ' 607 f'{rtypevar}],\n' 608 f' )' 609 f' -> Callable[[Any, {msgtypevar}], {rtypevar}]: ...\n' 610 ) 611 out += ( 612 '\n' 613 ' def handler(self, call: Callable) -> Callable:\n' 614 ' """Decorator to register message handlers."""\n' 615 ' self.register_handler(call)\n' 616 ' return call\n' 617 ) 618 619 out += ( 620 f'\n' 621 f'\n' 622 f'class {ppre}Bound{basename}(BoundMessageReceiver):\n' 623 f' """Protocol-specific bound receiver."""\n' 624 ) 625 if is_async: 626 out += ( 627 '\n' 628 ' def handle_raw_message(\n' 629 ' self, message: str, raise_unregistered: bool = False\n' 630 ' ) -> Awaitable[str]:\n' 631 ' """Asynchronously handle a raw incoming message."""\n' 632 ' return self._receiver.' 633 'handle_raw_message_async(\n' 634 ' self._obj, message, raise_unregistered\n' 635 ' )\n' 636 ) 637 638 else: 639 out += ( 640 '\n' 641 ' def handle_raw_message(\n' 642 ' self, message: str, raise_unregistered: bool = False\n' 643 ' ) -> str:\n' 644 ' """Synchronously handle a raw incoming message."""\n' 645 ' return self._receiver.handle_raw_message(\n' 646 ' self._obj, message, raise_unregistered\n' 647 ' )\n' 648 ) 649 650 return out
Used by create_receiver_module(); do not call directly.
22class MessageSender: 23 """Facilitates sending messages to a target and receiving responses. 24 This is instantiated at the class level and used to register unbound 25 class methods to handle raw message sending. 26 27 Example: 28 29 class MyClass: 30 msg = MyMessageSender(some_protocol) 31 32 @msg.send_method 33 def send_raw_message(self, message: str) -> str: 34 # Actually send the message here. 35 36 # MyMessageSender class should provide overloads for send(), send_async(), 37 # etc. to ensure all sending happens with valid types. 38 obj = MyClass() 39 obj.msg.send(SomeMessageType()) 40 """ 41 42 def __init__(self, protocol: MessageProtocol) -> None: 43 self.protocol = protocol 44 self._send_raw_message_call: Callable[[Any, str], str] | None = None 45 self._send_async_raw_message_call: ( 46 Callable[[Any, str], Awaitable[str]] | None 47 ) = None 48 self._send_async_raw_message_ex_call: ( 49 Callable[[Any, str, Message], Awaitable[str]] | None 50 ) = None 51 self._encode_filter_call: ( 52 Callable[[Any, Message, dict], None] | None 53 ) = None 54 self._decode_filter_call: ( 55 Callable[[Any, Message, dict, Response | SysResponse], None] | None 56 ) = None 57 self._peer_desc_call: Callable[[Any], str] | None = None 58 59 def send_method( 60 self, call: Callable[[Any, str], str] 61 ) -> Callable[[Any, str], str]: 62 """Function decorator for setting raw send method. 63 64 Send methods take strings and should return strings. 65 CommunicationErrors raised here will be returned to the sender 66 as such; all other exceptions will result in a RuntimeError for 67 the sender. 68 """ 69 assert self._send_raw_message_call is None 70 self._send_raw_message_call = call 71 return call 72 73 def send_async_method( 74 self, call: Callable[[Any, str], Awaitable[str]] 75 ) -> Callable[[Any, str], Awaitable[str]]: 76 """Function decorator for setting raw send-async method. 77 78 Send methods take strings and should return strings. 79 CommunicationErrors raised here will be returned to the sender 80 as such; all other exceptions will result in a RuntimeError for 81 the sender. 82 83 IMPORTANT: Generally async send methods should not be implemented 84 as 'async' methods, but instead should be regular methods that 85 return awaitable objects. This way it can be guaranteed that 86 outgoing messages are synchronously enqueued in the correct 87 order, and then async calls can be returned which finish each 88 send. If the entire call is async, they may be enqueued out of 89 order in rare cases. 90 """ 91 assert self._send_async_raw_message_call is None 92 self._send_async_raw_message_call = call 93 return call 94 95 def send_async_ex_method( 96 self, call: Callable[[Any, str, Message], Awaitable[str]] 97 ) -> Callable[[Any, str, Message], Awaitable[str]]: 98 """Function decorator for extended send-async method. 99 100 Version of send_async_method which is also is passed the original 101 unencoded message; can be useful for cases where metadata is sent 102 along with messages referring to their payloads/etc. 103 """ 104 assert self._send_async_raw_message_ex_call is None 105 self._send_async_raw_message_ex_call = call 106 return call 107 108 def encode_filter_method( 109 self, call: Callable[[Any, Message, dict], None] 110 ) -> Callable[[Any, Message, dict], None]: 111 """Function decorator for defining an encode filter. 112 113 Encode filters can be used to add extra data to the message 114 dict before is is encoded to a string and sent out. 115 """ 116 assert self._encode_filter_call is None 117 self._encode_filter_call = call 118 return call 119 120 def decode_filter_method( 121 self, call: Callable[[Any, Message, dict, Response | SysResponse], None] 122 ) -> Callable[[Any, Message, dict, Response], None]: 123 """Function decorator for defining a decode filter. 124 125 Decode filters can be used to extract extra data from incoming 126 message dicts. 127 """ 128 assert self._decode_filter_call is None 129 self._decode_filter_call = call 130 return call 131 132 def peer_desc_method( 133 self, call: Callable[[Any], str] 134 ) -> Callable[[Any], str]: 135 """Function decorator for defining peer descriptions. 136 137 These are included in error messages or other diagnostics. 138 """ 139 assert self._peer_desc_call is None 140 self._peer_desc_call = call 141 return call 142 143 def send(self, bound_obj: Any, message: Message) -> Response | None: 144 """Send a message synchronously.""" 145 return self.unpack_raw_response( 146 bound_obj=bound_obj, 147 message=message, 148 raw_response=self.fetch_raw_response( 149 bound_obj=bound_obj, 150 message=message, 151 ), 152 ) 153 154 def send_async( 155 self, bound_obj: Any, message: Message 156 ) -> Awaitable[Response | None]: 157 """Send a message asynchronously.""" 158 159 # Note: This call is synchronous so that the first part of it can 160 # happen synchronously. If the whole call were async we wouldn't be 161 # able to guarantee that messages sent in order would actually go 162 # out in order. 163 raw_response_awaitable = self.fetch_raw_response_async( 164 bound_obj=bound_obj, 165 message=message, 166 ) 167 # Now return an awaitable that will finish the send. 168 return self._send_async_awaitable( 169 bound_obj, message, raw_response_awaitable 170 ) 171 172 async def _send_async_awaitable( 173 self, 174 bound_obj: Any, 175 message: Message, 176 raw_response_awaitable: Awaitable[Response | SysResponse], 177 ) -> Response | None: 178 return self.unpack_raw_response( 179 bound_obj=bound_obj, 180 message=message, 181 raw_response=await raw_response_awaitable, 182 ) 183 184 def fetch_raw_response( 185 self, bound_obj: Any, message: Message 186 ) -> Response | SysResponse: 187 """Send a message synchronously. 188 189 Generally you can just call send(); these split versions are 190 for when message sending and response handling need to happen 191 in different contexts/threads. 192 """ 193 if self._send_raw_message_call is None: 194 raise RuntimeError('send() is unimplemented for this type.') 195 196 msg_encoded = self._encode_message(bound_obj, message) 197 try: 198 response_encoded = self._send_raw_message_call( 199 bound_obj, msg_encoded 200 ) 201 except Exception as exc: 202 response = ErrorSysResponse( 203 error_message='Error in MessageSender @send_method.', 204 error_type=( 205 ErrorSysResponse.ErrorType.COMMUNICATION 206 if isinstance(exc, CommunicationError) 207 else ErrorSysResponse.ErrorType.LOCAL 208 ), 209 ) 210 # Can include the actual exception since we'll be looking at 211 # this locally; might be helpful. 212 response.set_local_exception(exc) 213 return response 214 return self._decode_raw_response(bound_obj, message, response_encoded) 215 216 def fetch_raw_response_async( 217 self, bound_obj: Any, message: Message 218 ) -> Awaitable[Response | SysResponse]: 219 """Fetch a raw message response awaitable. 220 221 The result of this should be awaited and then passed to 222 unpack_raw_response() to produce the final message result. 223 224 Generally you can just call send(); calling fetch and unpack 225 manually is for when message sending and response handling need 226 to happen in different contexts/threads. 227 """ 228 229 # Note: This call is synchronous so that the first part of it can 230 # happen synchronously. If the whole call were async we wouldn't be 231 # able to guarantee that messages sent in order would actually go 232 # out in order. 233 if ( 234 self._send_async_raw_message_call is None 235 and self._send_async_raw_message_ex_call is None 236 ): 237 raise RuntimeError('send_async() is unimplemented for this type.') 238 239 msg_encoded = self._encode_message(bound_obj, message) 240 try: 241 if self._send_async_raw_message_ex_call is not None: 242 send_awaitable = self._send_async_raw_message_ex_call( 243 bound_obj, msg_encoded, message 244 ) 245 else: 246 assert self._send_async_raw_message_call is not None 247 send_awaitable = self._send_async_raw_message_call( 248 bound_obj, msg_encoded 249 ) 250 except Exception as exc: 251 return self._error_awaitable(exc) 252 253 # Now return an awaitable to finish the job. 254 return self._fetch_raw_response_awaitable( 255 bound_obj, message, send_awaitable 256 ) 257 258 async def _error_awaitable(self, exc: Exception) -> SysResponse: 259 response = ErrorSysResponse( 260 error_message='Error in MessageSender @send_async_method.', 261 error_type=( 262 ErrorSysResponse.ErrorType.COMMUNICATION 263 if isinstance(exc, CommunicationError) 264 else ErrorSysResponse.ErrorType.LOCAL 265 ), 266 ) 267 # Can include the actual exception since we'll be looking at 268 # this locally; might be helpful. 269 response.set_local_exception(exc) 270 return response 271 272 async def _fetch_raw_response_awaitable( 273 self, bound_obj: Any, message: Message, send_awaitable: Awaitable[str] 274 ) -> Response | SysResponse: 275 try: 276 response_encoded = await send_awaitable 277 except Exception as exc: 278 response = ErrorSysResponse( 279 error_message='Error in MessageSender @send_async_method.', 280 error_type=( 281 ErrorSysResponse.ErrorType.COMMUNICATION 282 if isinstance(exc, CommunicationError) 283 else ErrorSysResponse.ErrorType.LOCAL 284 ), 285 ) 286 # Can include the actual exception since we'll be looking at 287 # this locally; might be helpful. 288 response.set_local_exception(exc) 289 return response 290 return self._decode_raw_response(bound_obj, message, response_encoded) 291 292 def unpack_raw_response( 293 self, 294 bound_obj: Any, 295 message: Message, 296 raw_response: Response | SysResponse, 297 ) -> Response | None: 298 """Convert a raw fetched response into a final response/error/etc. 299 300 Generally you can just call send(); calling fetch and unpack 301 manually is for when message sending and response handling need 302 to happen in different contexts/threads. 303 """ 304 response = self._unpack_raw_response(bound_obj, raw_response) 305 assert ( 306 response is None 307 or type(response) in type(message).get_response_types() 308 ) 309 return response 310 311 def _encode_message(self, bound_obj: Any, message: Message) -> str: 312 """Encode a message for sending.""" 313 msg_dict = self.protocol.message_to_dict(message) 314 if self._encode_filter_call is not None: 315 self._encode_filter_call(bound_obj, message, msg_dict) 316 return self.protocol.encode_dict(msg_dict) 317 318 def _decode_raw_response( 319 self, bound_obj: Any, message: Message, response_encoded: str 320 ) -> Response | SysResponse: 321 """Create a Response from returned data. 322 323 These Responses may encapsulate things like remote errors and 324 should not be handed directly to users. _unpack_raw_response() 325 should be used to translate to special values like None or raise 326 Exceptions. This function itself should never raise Exceptions. 327 """ 328 response: Response | SysResponse 329 try: 330 response_dict = self.protocol.decode_dict(response_encoded) 331 response = self.protocol.response_from_dict(response_dict) 332 if self._decode_filter_call is not None: 333 self._decode_filter_call( 334 bound_obj, message, response_dict, response 335 ) 336 except Exception as exc: 337 response = ErrorSysResponse( 338 error_message='Error decoding raw response.', 339 error_type=ErrorSysResponse.ErrorType.LOCAL, 340 ) 341 # Since we'll be looking at this locally, we can include 342 # extra info for logging/etc. 343 response.set_local_exception(exc) 344 return response 345 346 def _unpack_raw_response( 347 self, bound_obj: Any, raw_response: Response | SysResponse 348 ) -> Response | None: 349 """Given a raw Response, unpacks to special values or Exceptions. 350 351 The result of this call is what should be passed to users. 352 For complex messaging situations such as response callbacks 353 operating across different threads, this last stage should be 354 run such that any raised Exception is active when the callback 355 fires; not on the thread where the message was sent. 356 """ 357 # EmptySysResponse translates to None 358 if isinstance(raw_response, EmptySysResponse): 359 return None 360 361 # Some error occurred. Raise a local Exception for it. 362 if isinstance(raw_response, ErrorSysResponse): 363 # Errors that happened locally can attach their exceptions 364 # here for extra logging goodness. 365 local_exception = raw_response.get_local_exception() 366 367 if ( 368 raw_response.error_type 369 is ErrorSysResponse.ErrorType.COMMUNICATION 370 ): 371 raise CommunicationError( 372 raw_response.error_message 373 ) from local_exception 374 375 # If something went wrong on *our* end of the connection, 376 # don't say it was a remote error. 377 if raw_response.error_type is ErrorSysResponse.ErrorType.LOCAL: 378 raise RuntimeError( 379 raw_response.error_message 380 ) from local_exception 381 382 # If they want to support clean errors, do those. 383 if ( 384 self.protocol.forward_clean_errors 385 and raw_response.error_type 386 is ErrorSysResponse.ErrorType.REMOTE_CLEAN 387 ): 388 raise CleanError( 389 raw_response.error_message 390 ) from local_exception 391 392 if ( 393 self.protocol.forward_communication_errors 394 and raw_response.error_type 395 is ErrorSysResponse.ErrorType.REMOTE_COMMUNICATION 396 ): 397 raise CommunicationError( 398 raw_response.error_message 399 ) from local_exception 400 401 # Everything else gets lumped in as a remote error. 402 raise RemoteError( 403 raw_response.error_message, 404 peer_desc=( 405 'peer' 406 if self._peer_desc_call is None 407 else self._peer_desc_call(bound_obj) 408 ), 409 ) from local_exception 410 411 assert isinstance(raw_response, Response) 412 return raw_response
Facilitates sending messages to a target and receiving responses. This is instantiated at the class level and used to register unbound class methods to handle raw message sending.
Example:
class MyClass: msg = MyMessageSender(some_protocol)
@msg.send_method
def send_raw_message(self, message: str) -> str:
# Actually send the message here.
MyMessageSender class should provide overloads for send(), send_async(),
etc. to ensure all sending happens with valid types.
obj = MyClass() obj.msg.send(SomeMessageType())
42 def __init__(self, protocol: MessageProtocol) -> None: 43 self.protocol = protocol 44 self._send_raw_message_call: Callable[[Any, str], str] | None = None 45 self._send_async_raw_message_call: ( 46 Callable[[Any, str], Awaitable[str]] | None 47 ) = None 48 self._send_async_raw_message_ex_call: ( 49 Callable[[Any, str, Message], Awaitable[str]] | None 50 ) = None 51 self._encode_filter_call: ( 52 Callable[[Any, Message, dict], None] | None 53 ) = None 54 self._decode_filter_call: ( 55 Callable[[Any, Message, dict, Response | SysResponse], None] | None 56 ) = None 57 self._peer_desc_call: Callable[[Any], str] | None = None
59 def send_method( 60 self, call: Callable[[Any, str], str] 61 ) -> Callable[[Any, str], str]: 62 """Function decorator for setting raw send method. 63 64 Send methods take strings and should return strings. 65 CommunicationErrors raised here will be returned to the sender 66 as such; all other exceptions will result in a RuntimeError for 67 the sender. 68 """ 69 assert self._send_raw_message_call is None 70 self._send_raw_message_call = call 71 return call
Function decorator for setting raw send method.
Send methods take strings and should return strings. CommunicationErrors raised here will be returned to the sender as such; all other exceptions will result in a RuntimeError for the sender.
73 def send_async_method( 74 self, call: Callable[[Any, str], Awaitable[str]] 75 ) -> Callable[[Any, str], Awaitable[str]]: 76 """Function decorator for setting raw send-async method. 77 78 Send methods take strings and should return strings. 79 CommunicationErrors raised here will be returned to the sender 80 as such; all other exceptions will result in a RuntimeError for 81 the sender. 82 83 IMPORTANT: Generally async send methods should not be implemented 84 as 'async' methods, but instead should be regular methods that 85 return awaitable objects. This way it can be guaranteed that 86 outgoing messages are synchronously enqueued in the correct 87 order, and then async calls can be returned which finish each 88 send. If the entire call is async, they may be enqueued out of 89 order in rare cases. 90 """ 91 assert self._send_async_raw_message_call is None 92 self._send_async_raw_message_call = call 93 return call
Function decorator for setting raw send-async method.
Send methods take strings and should return strings. CommunicationErrors raised here will be returned to the sender as such; all other exceptions will result in a RuntimeError for the sender.
IMPORTANT: Generally async send methods should not be implemented as 'async' methods, but instead should be regular methods that return awaitable objects. This way it can be guaranteed that outgoing messages are synchronously enqueued in the correct order, and then async calls can be returned which finish each send. If the entire call is async, they may be enqueued out of order in rare cases.
95 def send_async_ex_method( 96 self, call: Callable[[Any, str, Message], Awaitable[str]] 97 ) -> Callable[[Any, str, Message], Awaitable[str]]: 98 """Function decorator for extended send-async method. 99 100 Version of send_async_method which is also is passed the original 101 unencoded message; can be useful for cases where metadata is sent 102 along with messages referring to their payloads/etc. 103 """ 104 assert self._send_async_raw_message_ex_call is None 105 self._send_async_raw_message_ex_call = call 106 return call
Function decorator for extended send-async method.
Version of send_async_method which is also is passed the original unencoded message; can be useful for cases where metadata is sent along with messages referring to their payloads/etc.
108 def encode_filter_method( 109 self, call: Callable[[Any, Message, dict], None] 110 ) -> Callable[[Any, Message, dict], None]: 111 """Function decorator for defining an encode filter. 112 113 Encode filters can be used to add extra data to the message 114 dict before is is encoded to a string and sent out. 115 """ 116 assert self._encode_filter_call is None 117 self._encode_filter_call = call 118 return call
Function decorator for defining an encode filter.
Encode filters can be used to add extra data to the message dict before is is encoded to a string and sent out.
120 def decode_filter_method( 121 self, call: Callable[[Any, Message, dict, Response | SysResponse], None] 122 ) -> Callable[[Any, Message, dict, Response], None]: 123 """Function decorator for defining a decode filter. 124 125 Decode filters can be used to extract extra data from incoming 126 message dicts. 127 """ 128 assert self._decode_filter_call is None 129 self._decode_filter_call = call 130 return call
Function decorator for defining a decode filter.
Decode filters can be used to extract extra data from incoming message dicts.
132 def peer_desc_method( 133 self, call: Callable[[Any], str] 134 ) -> Callable[[Any], str]: 135 """Function decorator for defining peer descriptions. 136 137 These are included in error messages or other diagnostics. 138 """ 139 assert self._peer_desc_call is None 140 self._peer_desc_call = call 141 return call
Function decorator for defining peer descriptions.
These are included in error messages or other diagnostics.
143 def send(self, bound_obj: Any, message: Message) -> Response | None: 144 """Send a message synchronously.""" 145 return self.unpack_raw_response( 146 bound_obj=bound_obj, 147 message=message, 148 raw_response=self.fetch_raw_response( 149 bound_obj=bound_obj, 150 message=message, 151 ), 152 )
Send a message synchronously.
154 def send_async( 155 self, bound_obj: Any, message: Message 156 ) -> Awaitable[Response | None]: 157 """Send a message asynchronously.""" 158 159 # Note: This call is synchronous so that the first part of it can 160 # happen synchronously. If the whole call were async we wouldn't be 161 # able to guarantee that messages sent in order would actually go 162 # out in order. 163 raw_response_awaitable = self.fetch_raw_response_async( 164 bound_obj=bound_obj, 165 message=message, 166 ) 167 # Now return an awaitable that will finish the send. 168 return self._send_async_awaitable( 169 bound_obj, message, raw_response_awaitable 170 )
Send a message asynchronously.
184 def fetch_raw_response( 185 self, bound_obj: Any, message: Message 186 ) -> Response | SysResponse: 187 """Send a message synchronously. 188 189 Generally you can just call send(); these split versions are 190 for when message sending and response handling need to happen 191 in different contexts/threads. 192 """ 193 if self._send_raw_message_call is None: 194 raise RuntimeError('send() is unimplemented for this type.') 195 196 msg_encoded = self._encode_message(bound_obj, message) 197 try: 198 response_encoded = self._send_raw_message_call( 199 bound_obj, msg_encoded 200 ) 201 except Exception as exc: 202 response = ErrorSysResponse( 203 error_message='Error in MessageSender @send_method.', 204 error_type=( 205 ErrorSysResponse.ErrorType.COMMUNICATION 206 if isinstance(exc, CommunicationError) 207 else ErrorSysResponse.ErrorType.LOCAL 208 ), 209 ) 210 # Can include the actual exception since we'll be looking at 211 # this locally; might be helpful. 212 response.set_local_exception(exc) 213 return response 214 return self._decode_raw_response(bound_obj, message, response_encoded)
Send a message synchronously.
Generally you can just call send(); these split versions are for when message sending and response handling need to happen in different contexts/threads.
216 def fetch_raw_response_async( 217 self, bound_obj: Any, message: Message 218 ) -> Awaitable[Response | SysResponse]: 219 """Fetch a raw message response awaitable. 220 221 The result of this should be awaited and then passed to 222 unpack_raw_response() to produce the final message result. 223 224 Generally you can just call send(); calling fetch and unpack 225 manually is for when message sending and response handling need 226 to happen in different contexts/threads. 227 """ 228 229 # Note: This call is synchronous so that the first part of it can 230 # happen synchronously. If the whole call were async we wouldn't be 231 # able to guarantee that messages sent in order would actually go 232 # out in order. 233 if ( 234 self._send_async_raw_message_call is None 235 and self._send_async_raw_message_ex_call is None 236 ): 237 raise RuntimeError('send_async() is unimplemented for this type.') 238 239 msg_encoded = self._encode_message(bound_obj, message) 240 try: 241 if self._send_async_raw_message_ex_call is not None: 242 send_awaitable = self._send_async_raw_message_ex_call( 243 bound_obj, msg_encoded, message 244 ) 245 else: 246 assert self._send_async_raw_message_call is not None 247 send_awaitable = self._send_async_raw_message_call( 248 bound_obj, msg_encoded 249 ) 250 except Exception as exc: 251 return self._error_awaitable(exc) 252 253 # Now return an awaitable to finish the job. 254 return self._fetch_raw_response_awaitable( 255 bound_obj, message, send_awaitable 256 )
Fetch a raw message response awaitable.
The result of this should be awaited and then passed to unpack_raw_response() to produce the final message result.
Generally you can just call send(); calling fetch and unpack manually is for when message sending and response handling need to happen in different contexts/threads.
292 def unpack_raw_response( 293 self, 294 bound_obj: Any, 295 message: Message, 296 raw_response: Response | SysResponse, 297 ) -> Response | None: 298 """Convert a raw fetched response into a final response/error/etc. 299 300 Generally you can just call send(); calling fetch and unpack 301 manually is for when message sending and response handling need 302 to happen in different contexts/threads. 303 """ 304 response = self._unpack_raw_response(bound_obj, raw_response) 305 assert ( 306 response is None 307 or type(response) in type(message).get_response_types() 308 ) 309 return response
Convert a raw fetched response into a final response/error/etc.
Generally you can just call send(); calling fetch and unpack manually is for when message sending and response handling need to happen in different contexts/threads.
415class BoundMessageSender: 416 """Base class for bound senders.""" 417 418 def __init__(self, obj: Any, sender: MessageSender) -> None: 419 # Note: not checking obj here since we want to support 420 # at least our protocol property when accessed via type. 421 self._obj = obj 422 self._sender = sender 423 424 @property 425 def protocol(self) -> MessageProtocol: 426 """Protocol associated with this sender.""" 427 return self._sender.protocol 428 429 def send_untyped(self, message: Message) -> Response | None: 430 """Send a message synchronously. 431 432 Whenever possible, use the send() call provided by generated 433 subclasses instead of this; it will provide better type safety. 434 """ 435 assert self._obj is not None 436 return self._sender.send(bound_obj=self._obj, message=message) 437 438 def send_async_untyped( 439 self, message: Message 440 ) -> Awaitable[Response | None]: 441 """Send a message asynchronously. 442 443 Whenever possible, use the send_async() call provided by generated 444 subclasses instead of this; it will provide better type safety. 445 """ 446 assert self._obj is not None 447 return self._sender.send_async(bound_obj=self._obj, message=message) 448 449 def fetch_raw_response_async_untyped( 450 self, message: Message 451 ) -> Awaitable[Response | SysResponse]: 452 """Split send (part 1 of 2).""" 453 assert self._obj is not None 454 return self._sender.fetch_raw_response_async( 455 bound_obj=self._obj, message=message 456 ) 457 458 def unpack_raw_response_untyped( 459 self, message: Message, raw_response: Response | SysResponse 460 ) -> Response | None: 461 """Split send (part 2 of 2).""" 462 return self._sender.unpack_raw_response( 463 bound_obj=self._obj, message=message, raw_response=raw_response 464 )
Base class for bound senders.
424 @property 425 def protocol(self) -> MessageProtocol: 426 """Protocol associated with this sender.""" 427 return self._sender.protocol
Protocol associated with this sender.
429 def send_untyped(self, message: Message) -> Response | None: 430 """Send a message synchronously. 431 432 Whenever possible, use the send() call provided by generated 433 subclasses instead of this; it will provide better type safety. 434 """ 435 assert self._obj is not None 436 return self._sender.send(bound_obj=self._obj, message=message)
Send a message synchronously.
Whenever possible, use the send() call provided by generated subclasses instead of this; it will provide better type safety.
438 def send_async_untyped( 439 self, message: Message 440 ) -> Awaitable[Response | None]: 441 """Send a message asynchronously. 442 443 Whenever possible, use the send_async() call provided by generated 444 subclasses instead of this; it will provide better type safety. 445 """ 446 assert self._obj is not None 447 return self._sender.send_async(bound_obj=self._obj, message=message)
Send a message asynchronously.
Whenever possible, use the send_async() call provided by generated subclasses instead of this; it will provide better type safety.
449 def fetch_raw_response_async_untyped( 450 self, message: Message 451 ) -> Awaitable[Response | SysResponse]: 452 """Split send (part 1 of 2).""" 453 assert self._obj is not None 454 return self._sender.fetch_raw_response_async( 455 bound_obj=self._obj, message=message 456 )
Split send (part 1 of 2).
458 def unpack_raw_response_untyped( 459 self, message: Message, raw_response: Response | SysResponse 460 ) -> Response | None: 461 """Split send (part 2 of 2).""" 462 return self._sender.unpack_raw_response( 463 bound_obj=self._obj, message=message, raw_response=raw_response 464 )
Split send (part 2 of 2).
29class MessageReceiver: 30 """Facilitates receiving & responding to messages from a remote source. 31 32 This is instantiated at the class level with unbound methods registered 33 as handlers for different message types in the protocol. 34 35 Example: 36 37 class MyClass: 38 receiver = MyMessageReceiver() 39 40 # MyMessageReceiver fills out handler() overloads to ensure all 41 # registered handlers have valid types/return-types. 42 @receiver.handler 43 def handle_some_message_type(self, message: SomeMsg) -> SomeResponse: 44 # Deal with this message type here. 45 46 # This will trigger the registered handler being called. 47 obj = MyClass() 48 obj.receiver.handle_raw_message(some_raw_data) 49 50 Any unhandled Exception occurring during message handling will result in 51 an Exception being raised on the sending end. 52 """ 53 54 is_async = False 55 56 def __init__(self, protocol: MessageProtocol) -> None: 57 self.protocol = protocol 58 self._handlers: dict[type[Message], Callable] = {} 59 self._decode_filter_call: ( 60 Callable[[Any, dict, Message], None] | None 61 ) = None 62 self._encode_filter_call: ( 63 Callable[[Any, Message | None, Response | SysResponse, dict], None] 64 | None 65 ) = None 66 67 # noinspection PyProtectedMember 68 def register_handler( 69 self, call: Callable[[Any, Message], Response | None] 70 ) -> None: 71 """Register a handler call. 72 73 The message type handled by the call is determined by its 74 type annotation. 75 """ 76 # TODO: can use types.GenericAlias in 3.9. 77 # (hmm though now that we're there, it seems a drop-in 78 # replace gives us errors. Should re-test in 3.11 as it seems 79 # that typing_extensions handles it differently in that case) 80 from typing import _GenericAlias # type: ignore 81 from typing import get_type_hints, get_args 82 83 sig = inspect.getfullargspec(call) 84 85 # The provided callable should be a method taking one 'msg' arg. 86 expectedsig = ['self', 'msg'] 87 if sig.args != expectedsig: 88 raise ValueError( 89 f'Expected callable signature of {expectedsig};' 90 f' got {sig.args}' 91 ) 92 93 # Make sure we are only given async methods if we are an async handler 94 # and sync ones otherwise. 95 # UPDATE - can't do this anymore since we now sometimes use 96 # regular functions which return awaitables instead of having 97 # the entire function be async. 98 # is_async = inspect.iscoroutinefunction(call) 99 # if self.is_async != is_async: 100 # msg = ( 101 # 'Expected a sync method; found an async one.' 102 # if is_async 103 # else 'Expected an async method; found a sync one.' 104 # ) 105 # raise ValueError(msg) 106 107 # Check annotation types to determine what message types we handle. 108 # Return-type annotation can be a Union, but we probably don't 109 # have it available at runtime. Explicitly pull it in. 110 # UPDATE: we've updated our pylint filter to where we should 111 # have all annotations available. 112 # anns = get_type_hints(call, localns={'Union': Union}) 113 anns = get_type_hints(call) 114 115 msgtype = anns.get('msg') 116 if not isinstance(msgtype, type): 117 raise TypeError( 118 f'expected a type for "msg" annotation; got {type(msgtype)}.' 119 ) 120 assert issubclass(msgtype, Message) 121 122 ret = anns.get('return') 123 responsetypes: tuple[type[Any] | None, ...] 124 125 # Return types can be a single type or a union of types. 126 if isinstance(ret, (_GenericAlias, types.UnionType)): 127 targs = get_args(ret) 128 if not all(isinstance(a, (type, type(None))) for a in targs): 129 raise TypeError( 130 f'expected only types for "return" annotation;' 131 f' got {targs}.' 132 ) 133 responsetypes = targs 134 else: 135 if not isinstance(ret, (type, type(None))): 136 raise TypeError( 137 f'expected one or more types for' 138 f' "return" annotation; got a {type(ret)}.' 139 ) 140 # This seems like maybe a mypy bug. Appeared after adding 141 # types.UnionType above. 142 responsetypes = (ret,) 143 144 # This will contain NoneType for empty return cases, but 145 # we expect it to be None. 146 # noinspection PyPep8 147 responsetypes = tuple( 148 None if r is type(None) else r for r in responsetypes 149 ) 150 151 # Make sure our protocol has this message type registered and our 152 # return types exactly match. (Technically we could return a subset 153 # of the supported types; can allow this in the future if it makes 154 # sense). 155 registered_types = self.protocol.message_ids_by_type.keys() 156 157 if msgtype not in registered_types: 158 raise TypeError( 159 f'Message type {msgtype} is not registered' 160 f' in this Protocol.' 161 ) 162 163 if msgtype in self._handlers: 164 raise TypeError( 165 f'Message type {msgtype} already has a registered' f' handler.' 166 ) 167 168 # Make sure the responses exactly matches what the message expects. 169 if set(responsetypes) != set(msgtype.get_response_types()): 170 raise TypeError( 171 f'Provided response types {responsetypes} do not' 172 f' match the set expected by message type {msgtype}: ' 173 f'({msgtype.get_response_types()})' 174 ) 175 176 # Ok; we're good! 177 self._handlers[msgtype] = call 178 179 def decode_filter_method( 180 self, call: Callable[[Any, dict, Message], None] 181 ) -> Callable[[Any, dict, Message], None]: 182 """Function decorator for defining a decode filter. 183 184 Decode filters can be used to extract extra data from incoming 185 message dicts. This version will work for both handle_raw_message() 186 and handle_raw_message_async() 187 """ 188 assert self._decode_filter_call is None 189 self._decode_filter_call = call 190 return call 191 192 def encode_filter_method( 193 self, 194 call: Callable[ 195 [Any, Message | None, Response | SysResponse, dict], None 196 ], 197 ) -> Callable[[Any, Message | None, Response, dict], None]: 198 """Function decorator for defining an encode filter. 199 200 Encode filters can be used to add extra data to the message 201 dict before is is encoded to a string and sent out. 202 """ 203 assert self._encode_filter_call is None 204 self._encode_filter_call = call 205 return call 206 207 def validate(self, log_only: bool = False) -> None: 208 """Check for handler completeness, valid types, etc.""" 209 for msgtype in self.protocol.message_ids_by_type.keys(): 210 if issubclass(msgtype, Response): 211 continue 212 if msgtype not in self._handlers: 213 msg = ( 214 f'Protocol message type {msgtype} is not handled' 215 f' by receiver type {type(self)}.' 216 ) 217 if log_only: 218 logging.error(msg) 219 else: 220 raise TypeError(msg) 221 222 def _decode_incoming_message_base( 223 self, bound_obj: Any, msg: str 224 ) -> tuple[Any, dict, Message]: 225 # Decode the incoming message. 226 msg_dict = self.protocol.decode_dict(msg) 227 msg_decoded = self.protocol.message_from_dict(msg_dict) 228 assert isinstance(msg_decoded, Message) 229 if self._decode_filter_call is not None: 230 self._decode_filter_call(bound_obj, msg_dict, msg_decoded) 231 return bound_obj, msg_dict, msg_decoded 232 233 def _decode_incoming_message(self, bound_obj: Any, msg: str) -> Message: 234 bound_obj, _msg_dict, msg_decoded = self._decode_incoming_message_base( 235 bound_obj=bound_obj, msg=msg 236 ) 237 return msg_decoded 238 239 def encode_user_response( 240 self, bound_obj: Any, message: Message, response: Response | None 241 ) -> str: 242 """Encode a response provided by the user for sending.""" 243 244 assert isinstance(response, Response | None) 245 # (user should never explicitly return error-responses) 246 assert ( 247 response is None or type(response) in message.get_response_types() 248 ) 249 250 # A return value of None equals EmptySysResponse. 251 out_response: Response | SysResponse 252 if response is None: 253 out_response = EmptySysResponse() 254 else: 255 out_response = response 256 257 response_dict = self.protocol.response_to_dict(out_response) 258 if self._encode_filter_call is not None: 259 self._encode_filter_call( 260 bound_obj, message, out_response, response_dict 261 ) 262 return self.protocol.encode_dict(response_dict) 263 264 def encode_error_response( 265 self, bound_obj: Any, message: Message | None, exc: Exception 266 ) -> tuple[str, bool]: 267 """Given an error, return sysresponse str and whether to log.""" 268 response, dolog = self.protocol.error_to_response(exc) 269 response_dict = self.protocol.response_to_dict(response) 270 if self._encode_filter_call is not None: 271 self._encode_filter_call( 272 bound_obj, message, response, response_dict 273 ) 274 return self.protocol.encode_dict(response_dict), dolog 275 276 def handle_raw_message( 277 self, bound_obj: Any, msg: str, raise_unregistered: bool = False 278 ) -> str: 279 """Decode, handle, and return an response for a message. 280 281 if 'raise_unregistered' is True, will raise an 282 efro.message.UnregisteredMessageIDError for messages not handled by 283 the protocol. In all other cases local errors will translate to 284 error responses returned to the sender. 285 """ 286 assert not self.is_async, "can't call sync handler on async receiver" 287 msg_decoded: Message | None = None 288 msgtype: type[Message] | None = None 289 try: 290 msg_decoded = self._decode_incoming_message(bound_obj, msg) 291 msgtype = type(msg_decoded) 292 handler = self._handlers.get(msgtype) 293 if handler is None: 294 raise RuntimeError(f'Got unhandled message type: {msgtype}.') 295 response = handler(bound_obj, msg_decoded) 296 assert isinstance(response, Response | None) 297 return self.encode_user_response(bound_obj, msg_decoded, response) 298 299 except Exception as exc: 300 if raise_unregistered and isinstance( 301 exc, UnregisteredMessageIDError 302 ): 303 raise 304 rstr, dolog = self.encode_error_response( 305 bound_obj, msg_decoded, exc 306 ) 307 if dolog: 308 if msgtype is not None: 309 logging.exception( 310 'Error handling %s.%s message.', 311 msgtype.__module__, 312 msgtype.__qualname__, 313 ) 314 else: 315 logging.exception( 316 'Error handling raw efro.message. msg=%s', msg 317 ) 318 return rstr 319 320 def handle_raw_message_async( 321 self, bound_obj: Any, msg: str, raise_unregistered: bool = False 322 ) -> Awaitable[str]: 323 """Should be called when the receiver gets a message. 324 325 The return value is the raw response to the message. 326 """ 327 328 # Note: This call is synchronous so that the first part of it can 329 # happen synchronously. If the whole call were async we wouldn't be 330 # able to guarantee that messages handlers would be called in the 331 # order the messages were received. 332 333 assert self.is_async, "can't call async handler on sync receiver" 334 msg_decoded: Message | None = None 335 msgtype: type[Message] | None = None 336 try: 337 msg_decoded = self._decode_incoming_message(bound_obj, msg) 338 msgtype = type(msg_decoded) 339 handler = self._handlers.get(msgtype) 340 if handler is None: 341 raise RuntimeError(f'Got unhandled message type: {msgtype}.') 342 handler_awaitable = handler(bound_obj, msg_decoded) 343 344 except Exception as exc: 345 if raise_unregistered and isinstance( 346 exc, UnregisteredMessageIDError 347 ): 348 raise 349 return self._handle_raw_message_async_error( 350 bound_obj, msg_decoded, msgtype, exc 351 ) 352 353 # Return an awaitable to handle the rest asynchronously. 354 return self._handle_raw_message_async( 355 bound_obj, msg_decoded, msgtype, handler_awaitable 356 ) 357 358 async def _handle_raw_message_async_error( 359 self, 360 bound_obj: Any, 361 msg_decoded: Message | None, 362 msgtype: type[Message] | None, 363 exc: Exception, 364 ) -> str: 365 rstr, dolog = self.encode_error_response(bound_obj, msg_decoded, exc) 366 if dolog: 367 if msgtype is not None: 368 logging.exception( 369 'Error handling %s.%s message.', 370 msgtype.__module__, 371 msgtype.__qualname__, 372 ) 373 else: 374 logging.exception( 375 'Error handling raw async efro.message.' 376 ' msgtype=%s msg_decoded=%s.', 377 msgtype, 378 msg_decoded, 379 ) 380 return rstr 381 382 async def _handle_raw_message_async( 383 self, 384 bound_obj: Any, 385 msg_decoded: Message, 386 msgtype: type[Message] | None, 387 handler_awaitable: Awaitable[Response | None], 388 ) -> str: 389 """Should be called when the receiver gets a message. 390 391 The return value is the raw response to the message. 392 """ 393 try: 394 response = await handler_awaitable 395 assert isinstance(response, Response | None) 396 return self.encode_user_response(bound_obj, msg_decoded, response) 397 398 except Exception as exc: 399 return await self._handle_raw_message_async_error( 400 bound_obj, msg_decoded, msgtype, exc 401 )
Facilitates receiving & responding to messages from a remote source.
This is instantiated at the class level with unbound methods registered as handlers for different message types in the protocol.
Example:
class MyClass: receiver = MyMessageReceiver()
# MyMessageReceiver fills out handler() overloads to ensure all
# registered handlers have valid types/return-types.
@receiver.handler
def handle_some_message_type(self, message: SomeMsg) -> SomeResponse:
# Deal with this message type here.
This will trigger the registered handler being called.
obj = MyClass() obj.receiver.handle_raw_message(some_raw_data)
Any unhandled Exception occurring during message handling will result in an Exception being raised on the sending end.
56 def __init__(self, protocol: MessageProtocol) -> None: 57 self.protocol = protocol 58 self._handlers: dict[type[Message], Callable] = {} 59 self._decode_filter_call: ( 60 Callable[[Any, dict, Message], None] | None 61 ) = None 62 self._encode_filter_call: ( 63 Callable[[Any, Message | None, Response | SysResponse, dict], None] 64 | None 65 ) = None
68 def register_handler( 69 self, call: Callable[[Any, Message], Response | None] 70 ) -> None: 71 """Register a handler call. 72 73 The message type handled by the call is determined by its 74 type annotation. 75 """ 76 # TODO: can use types.GenericAlias in 3.9. 77 # (hmm though now that we're there, it seems a drop-in 78 # replace gives us errors. Should re-test in 3.11 as it seems 79 # that typing_extensions handles it differently in that case) 80 from typing import _GenericAlias # type: ignore 81 from typing import get_type_hints, get_args 82 83 sig = inspect.getfullargspec(call) 84 85 # The provided callable should be a method taking one 'msg' arg. 86 expectedsig = ['self', 'msg'] 87 if sig.args != expectedsig: 88 raise ValueError( 89 f'Expected callable signature of {expectedsig};' 90 f' got {sig.args}' 91 ) 92 93 # Make sure we are only given async methods if we are an async handler 94 # and sync ones otherwise. 95 # UPDATE - can't do this anymore since we now sometimes use 96 # regular functions which return awaitables instead of having 97 # the entire function be async. 98 # is_async = inspect.iscoroutinefunction(call) 99 # if self.is_async != is_async: 100 # msg = ( 101 # 'Expected a sync method; found an async one.' 102 # if is_async 103 # else 'Expected an async method; found a sync one.' 104 # ) 105 # raise ValueError(msg) 106 107 # Check annotation types to determine what message types we handle. 108 # Return-type annotation can be a Union, but we probably don't 109 # have it available at runtime. Explicitly pull it in. 110 # UPDATE: we've updated our pylint filter to where we should 111 # have all annotations available. 112 # anns = get_type_hints(call, localns={'Union': Union}) 113 anns = get_type_hints(call) 114 115 msgtype = anns.get('msg') 116 if not isinstance(msgtype, type): 117 raise TypeError( 118 f'expected a type for "msg" annotation; got {type(msgtype)}.' 119 ) 120 assert issubclass(msgtype, Message) 121 122 ret = anns.get('return') 123 responsetypes: tuple[type[Any] | None, ...] 124 125 # Return types can be a single type or a union of types. 126 if isinstance(ret, (_GenericAlias, types.UnionType)): 127 targs = get_args(ret) 128 if not all(isinstance(a, (type, type(None))) for a in targs): 129 raise TypeError( 130 f'expected only types for "return" annotation;' 131 f' got {targs}.' 132 ) 133 responsetypes = targs 134 else: 135 if not isinstance(ret, (type, type(None))): 136 raise TypeError( 137 f'expected one or more types for' 138 f' "return" annotation; got a {type(ret)}.' 139 ) 140 # This seems like maybe a mypy bug. Appeared after adding 141 # types.UnionType above. 142 responsetypes = (ret,) 143 144 # This will contain NoneType for empty return cases, but 145 # we expect it to be None. 146 # noinspection PyPep8 147 responsetypes = tuple( 148 None if r is type(None) else r for r in responsetypes 149 ) 150 151 # Make sure our protocol has this message type registered and our 152 # return types exactly match. (Technically we could return a subset 153 # of the supported types; can allow this in the future if it makes 154 # sense). 155 registered_types = self.protocol.message_ids_by_type.keys() 156 157 if msgtype not in registered_types: 158 raise TypeError( 159 f'Message type {msgtype} is not registered' 160 f' in this Protocol.' 161 ) 162 163 if msgtype in self._handlers: 164 raise TypeError( 165 f'Message type {msgtype} already has a registered' f' handler.' 166 ) 167 168 # Make sure the responses exactly matches what the message expects. 169 if set(responsetypes) != set(msgtype.get_response_types()): 170 raise TypeError( 171 f'Provided response types {responsetypes} do not' 172 f' match the set expected by message type {msgtype}: ' 173 f'({msgtype.get_response_types()})' 174 ) 175 176 # Ok; we're good! 177 self._handlers[msgtype] = call
Register a handler call.
The message type handled by the call is determined by its type annotation.
179 def decode_filter_method( 180 self, call: Callable[[Any, dict, Message], None] 181 ) -> Callable[[Any, dict, Message], None]: 182 """Function decorator for defining a decode filter. 183 184 Decode filters can be used to extract extra data from incoming 185 message dicts. This version will work for both handle_raw_message() 186 and handle_raw_message_async() 187 """ 188 assert self._decode_filter_call is None 189 self._decode_filter_call = call 190 return call
Function decorator for defining a decode filter.
Decode filters can be used to extract extra data from incoming message dicts. This version will work for both handle_raw_message() and handle_raw_message_async()
192 def encode_filter_method( 193 self, 194 call: Callable[ 195 [Any, Message | None, Response | SysResponse, dict], None 196 ], 197 ) -> Callable[[Any, Message | None, Response, dict], None]: 198 """Function decorator for defining an encode filter. 199 200 Encode filters can be used to add extra data to the message 201 dict before is is encoded to a string and sent out. 202 """ 203 assert self._encode_filter_call is None 204 self._encode_filter_call = call 205 return call
Function decorator for defining an encode filter.
Encode filters can be used to add extra data to the message dict before is is encoded to a string and sent out.
207 def validate(self, log_only: bool = False) -> None: 208 """Check for handler completeness, valid types, etc.""" 209 for msgtype in self.protocol.message_ids_by_type.keys(): 210 if issubclass(msgtype, Response): 211 continue 212 if msgtype not in self._handlers: 213 msg = ( 214 f'Protocol message type {msgtype} is not handled' 215 f' by receiver type {type(self)}.' 216 ) 217 if log_only: 218 logging.error(msg) 219 else: 220 raise TypeError(msg)
Check for handler completeness, valid types, etc.
239 def encode_user_response( 240 self, bound_obj: Any, message: Message, response: Response | None 241 ) -> str: 242 """Encode a response provided by the user for sending.""" 243 244 assert isinstance(response, Response | None) 245 # (user should never explicitly return error-responses) 246 assert ( 247 response is None or type(response) in message.get_response_types() 248 ) 249 250 # A return value of None equals EmptySysResponse. 251 out_response: Response | SysResponse 252 if response is None: 253 out_response = EmptySysResponse() 254 else: 255 out_response = response 256 257 response_dict = self.protocol.response_to_dict(out_response) 258 if self._encode_filter_call is not None: 259 self._encode_filter_call( 260 bound_obj, message, out_response, response_dict 261 ) 262 return self.protocol.encode_dict(response_dict)
Encode a response provided by the user for sending.
264 def encode_error_response( 265 self, bound_obj: Any, message: Message | None, exc: Exception 266 ) -> tuple[str, bool]: 267 """Given an error, return sysresponse str and whether to log.""" 268 response, dolog = self.protocol.error_to_response(exc) 269 response_dict = self.protocol.response_to_dict(response) 270 if self._encode_filter_call is not None: 271 self._encode_filter_call( 272 bound_obj, message, response, response_dict 273 ) 274 return self.protocol.encode_dict(response_dict), dolog
Given an error, return sysresponse str and whether to log.
276 def handle_raw_message( 277 self, bound_obj: Any, msg: str, raise_unregistered: bool = False 278 ) -> str: 279 """Decode, handle, and return an response for a message. 280 281 if 'raise_unregistered' is True, will raise an 282 efro.message.UnregisteredMessageIDError for messages not handled by 283 the protocol. In all other cases local errors will translate to 284 error responses returned to the sender. 285 """ 286 assert not self.is_async, "can't call sync handler on async receiver" 287 msg_decoded: Message | None = None 288 msgtype: type[Message] | None = None 289 try: 290 msg_decoded = self._decode_incoming_message(bound_obj, msg) 291 msgtype = type(msg_decoded) 292 handler = self._handlers.get(msgtype) 293 if handler is None: 294 raise RuntimeError(f'Got unhandled message type: {msgtype}.') 295 response = handler(bound_obj, msg_decoded) 296 assert isinstance(response, Response | None) 297 return self.encode_user_response(bound_obj, msg_decoded, response) 298 299 except Exception as exc: 300 if raise_unregistered and isinstance( 301 exc, UnregisteredMessageIDError 302 ): 303 raise 304 rstr, dolog = self.encode_error_response( 305 bound_obj, msg_decoded, exc 306 ) 307 if dolog: 308 if msgtype is not None: 309 logging.exception( 310 'Error handling %s.%s message.', 311 msgtype.__module__, 312 msgtype.__qualname__, 313 ) 314 else: 315 logging.exception( 316 'Error handling raw efro.message. msg=%s', msg 317 ) 318 return rstr
Decode, handle, and return an response for a message.
if 'raise_unregistered' is True, will raise an efro.message.UnregisteredMessageIDError for messages not handled by the protocol. In all other cases local errors will translate to error responses returned to the sender.
320 def handle_raw_message_async( 321 self, bound_obj: Any, msg: str, raise_unregistered: bool = False 322 ) -> Awaitable[str]: 323 """Should be called when the receiver gets a message. 324 325 The return value is the raw response to the message. 326 """ 327 328 # Note: This call is synchronous so that the first part of it can 329 # happen synchronously. If the whole call were async we wouldn't be 330 # able to guarantee that messages handlers would be called in the 331 # order the messages were received. 332 333 assert self.is_async, "can't call async handler on sync receiver" 334 msg_decoded: Message | None = None 335 msgtype: type[Message] | None = None 336 try: 337 msg_decoded = self._decode_incoming_message(bound_obj, msg) 338 msgtype = type(msg_decoded) 339 handler = self._handlers.get(msgtype) 340 if handler is None: 341 raise RuntimeError(f'Got unhandled message type: {msgtype}.') 342 handler_awaitable = handler(bound_obj, msg_decoded) 343 344 except Exception as exc: 345 if raise_unregistered and isinstance( 346 exc, UnregisteredMessageIDError 347 ): 348 raise 349 return self._handle_raw_message_async_error( 350 bound_obj, msg_decoded, msgtype, exc 351 ) 352 353 # Return an awaitable to handle the rest asynchronously. 354 return self._handle_raw_message_async( 355 bound_obj, msg_decoded, msgtype, handler_awaitable 356 )
Should be called when the receiver gets a message.
The return value is the raw response to the message.
404class BoundMessageReceiver: 405 """Base bound receiver class.""" 406 407 def __init__( 408 self, 409 obj: Any, 410 receiver: MessageReceiver, 411 ) -> None: 412 assert obj is not None 413 self._obj = obj 414 self._receiver = receiver 415 416 @property 417 def protocol(self) -> MessageProtocol: 418 """Protocol associated with this receiver.""" 419 return self._receiver.protocol 420 421 def encode_error_response(self, exc: Exception) -> str: 422 """Given an error, return a response ready to send. 423 424 This should be used for any errors that happen outside of 425 standard handle_raw_message calls. Any errors within those 426 calls will be automatically returned as encoded strings. 427 """ 428 # Passing None for Message here; we would only have that available 429 # for things going wrong in the handler (which this is not for). 430 return self._receiver.encode_error_response(self._obj, None, exc)[0]
Base bound receiver class.
416 @property 417 def protocol(self) -> MessageProtocol: 418 """Protocol associated with this receiver.""" 419 return self._receiver.protocol
Protocol associated with this receiver.
421 def encode_error_response(self, exc: Exception) -> str: 422 """Given an error, return a response ready to send. 423 424 This should be used for any errors that happen outside of 425 standard handle_raw_message calls. Any errors within those 426 calls will be automatically returned as encoded strings. 427 """ 428 # Passing None for Message here; we would only have that available 429 # for things going wrong in the handler (which this is not for). 430 return self._receiver.encode_error_response(self._obj, None, exc)[0]
Given an error, return a response ready to send.
This should be used for any errors that happen outside of standard handle_raw_message calls. Any errors within those calls will be automatically returned as encoded strings.
18def create_sender_module( 19 basename: str, 20 protocol_create_code: str, 21 enable_sync_sends: bool, 22 enable_async_sends: bool, 23 private: bool = False, 24 protocol_module_level_import_code: str | None = None, 25 build_time_protocol_create_code: str | None = None, 26) -> str: 27 """Create a Python module defining a MessageSender subclass. 28 29 This class is primarily for type checking and will contain overrides 30 for the varieties of send calls for message/response types defined 31 in the protocol. 32 33 Code passed for 'protocol_create_code' should import necessary 34 modules and assign an instance of the Protocol to a 'protocol' 35 variable. 36 37 Class names are based on basename; a basename 'FooSender' will 38 result in classes FooSender and BoundFooSender. 39 40 If 'private' is True, class-names will be prefixed with an '_'. 41 42 Note: output code may have long lines and should generally be run 43 through a formatter. We should perhaps move this functionality to 44 efrotools so we can include that functionality inline. 45 """ 46 protocol = _protocol_from_code( 47 build_time_protocol_create_code 48 if build_time_protocol_create_code is not None 49 else protocol_create_code 50 ) 51 return protocol.do_create_sender_module( 52 basename=basename, 53 protocol_create_code=protocol_create_code, 54 enable_sync_sends=enable_sync_sends, 55 enable_async_sends=enable_async_sends, 56 private=private, 57 protocol_module_level_import_code=protocol_module_level_import_code, 58 )
Create a Python module defining a MessageSender subclass.
This class is primarily for type checking and will contain overrides for the varieties of send calls for message/response types defined in the protocol.
Code passed for 'protocol_create_code' should import necessary modules and assign an instance of the Protocol to a 'protocol' variable.
Class names are based on basename; a basename 'FooSender' will result in classes FooSender and BoundFooSender.
If 'private' is True, class-names will be prefixed with an '_'.
Note: output code may have long lines and should generally be run through a formatter. We should perhaps move this functionality to efrotools so we can include that functionality inline.
61def create_receiver_module( 62 basename: str, 63 protocol_create_code: str, 64 is_async: bool, 65 private: bool = False, 66 protocol_module_level_import_code: str | None = None, 67 build_time_protocol_create_code: str | None = None, 68) -> str: 69 """ "Create a Python module defining a MessageReceiver subclass. 70 71 This class is primarily for type checking and will contain overrides 72 for the register method for message/response types defined in 73 the protocol. 74 75 Class names are based on basename; a basename 'FooReceiver' will 76 result in FooReceiver and BoundFooReceiver. 77 78 If 'is_async' is True, handle_raw_message() will be an async method 79 and the @handler decorator will expect async methods. 80 81 If 'private' is True, class-names will be prefixed with an '_'. 82 83 Note that line lengths are not clipped, so output may need to be 84 run through a formatter to prevent lint warnings about excessive 85 line lengths. 86 """ 87 protocol = _protocol_from_code( 88 build_time_protocol_create_code 89 if build_time_protocol_create_code is not None 90 else protocol_create_code 91 ) 92 return protocol.do_create_receiver_module( 93 basename=basename, 94 protocol_create_code=protocol_create_code, 95 is_async=is_async, 96 private=private, 97 protocol_module_level_import_code=protocol_module_level_import_code, 98 )
"Create a Python module defining a MessageReceiver subclass.
This class is primarily for type checking and will contain overrides for the register method for message/response types defined in the protocol.
Class names are based on basename; a basename 'FooReceiver' will result in FooReceiver and BoundFooReceiver.
If 'is_async' is True, handle_raw_message() will be an async method and the @handler decorator will expect async methods.
If 'private' is True, class-names will be prefixed with an '_'.
Note that line lengths are not clipped, so output may need to be run through a formatter to prevent lint warnings about excessive line lengths.
20class UnregisteredMessageIDError(Exception): 21 """A message or response id is not covered by our protocol."""
A message or response id is not covered by our protocol.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- add_note
- args