efro.message
Functionality for sending and responding to messages. Supports static typing for message types and possible return types.
1# Released under the MIT License. See LICENSE for details. 2# 3"""Functionality for sending and responding to messages. 4Supports static typing for message types and possible return types. 5""" 6 7from __future__ import annotations 8 9from efro.util import set_canonical_module_names 10from efro.message._protocol import MessageProtocol 11from efro.message._sender import MessageSender, BoundMessageSender 12from efro.message._receiver import MessageReceiver, BoundMessageReceiver 13from efro.message._module import create_sender_module, create_receiver_module 14from efro.message._message import ( 15 Message, 16 Response, 17 SysResponse, 18 EmptySysResponse, 19 ErrorSysResponse, 20 StringResponse, 21 BoolResponse, 22 UnregisteredMessageIDError, 23) 24 25__all__ = [ 26 'Message', 27 'Response', 28 'SysResponse', 29 'EmptySysResponse', 30 'ErrorSysResponse', 31 'StringResponse', 32 'BoolResponse', 33 'MessageProtocol', 34 'MessageSender', 35 'BoundMessageSender', 36 'MessageReceiver', 37 'BoundMessageReceiver', 38 'create_sender_module', 39 'create_receiver_module', 40 'UnregisteredMessageIDError', 41] 42 43# Have these things present themselves cleanly as 'thismodule.SomeClass' 44# instead of 'thismodule._internalmodule.SomeClass' 45set_canonical_module_names(globals())
24class Message: 25 """Base class for messages.""" 26 27 @classmethod 28 def get_response_types(cls) -> list[type[Response] | None]: 29 """Return all Response types this Message can return when sent. 30 31 The default implementation specifies a None return type. 32 """ 33 return [None]
Base class for messages.
27 @classmethod 28 def get_response_types(cls) -> list[type[Response] | None]: 29 """Return all Response types this Message can return when sent. 30 31 The default implementation specifies a None return type. 32 """ 33 return [None]
Return all Response types this Message can return when sent.
The default implementation specifies a None return type.
Base class for responses to messages.
40class SysResponse: 41 """Base class for system-responses to messages. 42 43 These are only sent/handled by the messaging system itself; 44 users of the api never see them. 45 """ 46 47 def set_local_exception(self, exc: Exception) -> None: 48 """Attach a local exception to facilitate better logging/handling. 49 50 Be aware that this data does not get serialized and only 51 exists on the local object. 52 """ 53 setattr(self, '_sr_local_exception', exc) 54 55 def get_local_exception(self) -> Exception | None: 56 """Fetch a local attached exception.""" 57 value = getattr(self, '_sr_local_exception', None) 58 assert isinstance(value, Exception | None) 59 return value
Base class for system-responses to messages.
These are only sent/handled by the messaging system itself; users of the api never see them.
47 def set_local_exception(self, exc: Exception) -> None: 48 """Attach a local exception to facilitate better logging/handling. 49 50 Be aware that this data does not get serialized and only 51 exists on the local object. 52 """ 53 setattr(self, '_sr_local_exception', exc)
Attach a local exception to facilitate better logging/handling.
Be aware that this data does not get serialized and only exists on the local object.
86@ioprepped 87@dataclass 88class EmptySysResponse(SysResponse): 89 """The response equivalent of None."""
The response equivalent of None.
Inherited Members
65@ioprepped 66@dataclass 67class ErrorSysResponse(SysResponse): 68 """SysResponse saying some error has occurred for the send. 69 70 This generally results in an Exception being raised for the caller. 71 """ 72 73 class ErrorType(Enum): 74 """Type of error that occurred while sending a message.""" 75 76 REMOTE = 0 77 REMOTE_CLEAN = 1 78 LOCAL = 2 79 COMMUNICATION = 3 80 REMOTE_COMMUNICATION = 4 81 82 error_message: Annotated[str, IOAttrs('m')] 83 error_type: Annotated[ErrorType, IOAttrs('e')] = ErrorType.REMOTE
SysResponse saying some error has occurred for the send.
This generally results in an Exception being raised for the caller.
Inherited Members
73 class ErrorType(Enum): 74 """Type of error that occurred while sending a message.""" 75 76 REMOTE = 0 77 REMOTE_CLEAN = 1 78 LOCAL = 2 79 COMMUNICATION = 3 80 REMOTE_COMMUNICATION = 4
Type of error that occurred while sending a message.
Inherited Members
- enum.Enum
- name
- value
104@ioprepped 105@dataclass 106class StringResponse(Response): 107 """A simple string value response.""" 108 109 value: Annotated[str, IOAttrs('v')]
A simple string value response.
96@ioprepped 97@dataclass 98class BoolResponse(Response): 99 """A simple bool value response.""" 100 101 value: Annotated[bool, IOAttrs('v')]
A simple bool value response.
33class MessageProtocol: 34 """Wrangles a set of message types, formats, and response types. 35 Both endpoints must be using a compatible Protocol for communication 36 to succeed. To maintain Protocol compatibility between revisions, 37 all message types must retain the same id, message attr storage 38 names must not change, newly added attrs must have default values, 39 etc. 40 """ 41 42 def __init__( 43 self, 44 message_types: dict[int, type[Message]], 45 response_types: dict[int, type[Response]], 46 forward_communication_errors: bool = False, 47 forward_clean_errors: bool = False, 48 remote_errors_include_stack_traces: bool = False, 49 log_errors_on_receiver: bool = True, 50 ) -> None: 51 """Create a protocol with a given configuration. 52 53 If 'forward_communication_errors' is True, 54 efro.error.CommunicationErrors raised on the receiver end will 55 result in a matching error raised back on the sender. This can 56 be useful if the receiver will be in some way forwarding 57 messages along and the sender doesn't need to know where 58 communication breakdowns occurred; only that they did. 59 60 If 'forward_clean_errors' is True, efro.error.CleanError 61 exceptions raised on the receiver end will result in a matching 62 CleanError raised back on the sender. 63 64 When an exception is not covered by the optional forwarding 65 mechanisms above, it will come across as efro.error.RemoteError 66 and the exception will be logged on the receiver end - at least 67 by default (see details below). 68 69 If 'remote_errors_include_stack_traces' is True, stringified 70 stack traces will be returned with efro.error.RemoteError 71 exceptions. This is useful for debugging but should only be 72 enabled in cases where the sender is trusted to see internal 73 details of the receiver. 74 75 By default, when a message-handling exception will result in an 76 efro.error.RemoteError being returned to the sender, the 77 exception will be logged on the receiver. This is because the 78 goal is usually to avoid returning opaque RemoteErrors and to 79 instead return something meaningful as part of the expected 80 response type (even if that value itself represents a logical 81 error state). If 'log_errors_on_receiver' is False, however, such 82 exceptions will *not* be logged on the receiver. This can be 83 useful in combination with 'remote_errors_include_stack_traces' 84 and 'forward_clean_errors' in situations where all error 85 logging/management will be happening on the sender end. Be 86 aware, however, that in that case it may be possible for 87 communication errors to prevent such error messages from 88 ever being seen. 89 """ 90 # pylint: disable=too-many-locals 91 self.message_types_by_id: dict[int, type[Message]] = {} 92 self.message_ids_by_type: dict[type[Message], int] = {} 93 self.response_types_by_id: dict[ 94 int, type[Response] | type[SysResponse] 95 ] = {} 96 self.response_ids_by_type: dict[ 97 type[Response] | type[SysResponse], int 98 ] = {} 99 for m_id, m_type in message_types.items(): 100 # Make sure only valid message types were passed and each 101 # id was assigned only once. 102 assert isinstance(m_id, int) 103 assert m_id >= 0 104 assert is_ioprepped_dataclass(m_type) and issubclass( 105 m_type, Message 106 ) 107 assert self.message_types_by_id.get(m_id) is None 108 self.message_types_by_id[m_id] = m_type 109 self.message_ids_by_type[m_type] = m_id 110 111 for r_id, r_type in response_types.items(): 112 assert isinstance(r_id, int) 113 assert r_id >= 0 114 assert is_ioprepped_dataclass(r_type) and issubclass( 115 r_type, Response 116 ) 117 assert self.response_types_by_id.get(r_id) is None 118 self.response_types_by_id[r_id] = r_type 119 self.response_ids_by_type[r_type] = r_id 120 121 # Register our SysResponse types. These use negative 122 # IDs so as to never overlap with user Response types. 123 def _reg_sys(reg_tp: type[SysResponse], reg_id: int) -> None: 124 assert self.response_types_by_id.get(reg_id) is None 125 self.response_types_by_id[reg_id] = reg_tp 126 self.response_ids_by_type[reg_tp] = reg_id 127 128 _reg_sys(ErrorSysResponse, -1) 129 _reg_sys(EmptySysResponse, -2) 130 131 # Some extra-thorough validation in debug mode. 132 if __debug__: 133 # Make sure all Message types' return types are valid 134 # and have been assigned an ID as well. 135 all_response_types: set[type[Response] | None] = set() 136 for m_id, m_type in message_types.items(): 137 m_rtypes = m_type.get_response_types() 138 139 assert isinstance(m_rtypes, list) 140 assert ( 141 m_rtypes 142 ), f'Message type {m_type} specifies no return types.' 143 assert len(set(m_rtypes)) == len(m_rtypes) # check dups 144 for m_rtype in m_rtypes: 145 all_response_types.add(m_rtype) 146 for cls in all_response_types: 147 if cls is None: 148 continue 149 assert is_ioprepped_dataclass(cls) 150 assert issubclass(cls, Response) 151 if cls not in self.response_ids_by_type: 152 raise ValueError( 153 f'Possible response type {cls} needs to be included' 154 f' in response_types for this protocol.' 155 ) 156 157 # Make sure all registered types have unique base names. 158 # We can take advantage of this to generate cleaner looking 159 # protocol modules. Can revisit if this is ever a problem. 160 mtypenames = set(tp.__name__ for tp in self.message_ids_by_type) 161 if len(mtypenames) != len(message_types): 162 raise ValueError( 163 'message_types contains duplicate __name__s;' 164 ' all types are required to have unique names.' 165 ) 166 167 self.forward_clean_errors = forward_clean_errors 168 self.forward_communication_errors = forward_communication_errors 169 self.remote_errors_include_stack_traces = ( 170 remote_errors_include_stack_traces 171 ) 172 self.log_errors_on_receiver = log_errors_on_receiver 173 174 @staticmethod 175 def encode_dict(obj: dict) -> str: 176 """Json-encode a provided dict.""" 177 return json.dumps(obj, separators=(',', ':')) 178 179 def message_to_dict(self, message: Message) -> dict: 180 """Encode a message to a json ready dict.""" 181 return self._to_dict(message, self.message_ids_by_type, 'message') 182 183 def response_to_dict(self, response: Response | SysResponse) -> dict: 184 """Encode a response to a json ready dict.""" 185 return self._to_dict(response, self.response_ids_by_type, 'response') 186 187 def error_to_response(self, exc: Exception) -> tuple[SysResponse, bool]: 188 """Translate an Exception to a SysResponse. 189 190 Also returns whether the error should be logged if this happened 191 within handle_raw_message(). 192 """ 193 194 # If anything goes wrong, return a ErrorSysResponse instead. 195 # (either CLEAN or generic REMOTE) 196 if self.forward_clean_errors and isinstance(exc, CleanError): 197 return ( 198 ErrorSysResponse( 199 error_message=str(exc), 200 error_type=ErrorSysResponse.ErrorType.REMOTE_CLEAN, 201 ), 202 False, 203 ) 204 if self.forward_communication_errors and isinstance( 205 exc, CommunicationError 206 ): 207 return ( 208 ErrorSysResponse( 209 error_message=str(exc), 210 error_type=ErrorSysResponse.ErrorType.REMOTE_COMMUNICATION, 211 ), 212 False, 213 ) 214 return ( 215 ErrorSysResponse( 216 error_message=( 217 # Note: need to format exception ourself here; it 218 # might not be current so we can't use 219 # traceback.format_exc(). 220 ''.join( 221 traceback.format_exception( 222 type(exc), exc, exc.__traceback__ 223 ) 224 ) 225 if self.remote_errors_include_stack_traces 226 else 'An internal error has occurred.' 227 ), 228 error_type=ErrorSysResponse.ErrorType.REMOTE, 229 ), 230 self.log_errors_on_receiver, 231 ) 232 233 def _to_dict( 234 self, message: Any, ids_by_type: dict[type, int], opname: str 235 ) -> dict: 236 """Encode a message to a json string for transport.""" 237 238 m_id: int | None = ids_by_type.get(type(message)) 239 if m_id is None: 240 raise TypeError( 241 f'{opname} type is not registered in protocol:' 242 f' {type(message)}' 243 ) 244 out = {'t': m_id, 'm': dataclass_to_dict(message)} 245 return out 246 247 @staticmethod 248 def decode_dict(data: str) -> dict: 249 """Decode data to a dict.""" 250 out = json.loads(data) 251 assert isinstance(out, dict) 252 return out 253 254 def message_from_dict(self, data: dict) -> Message: 255 """Decode a message from a json string.""" 256 out = self._from_dict(data, self.message_types_by_id, 'message') 257 assert isinstance(out, Message) 258 return out 259 260 def response_from_dict(self, data: dict) -> Response | SysResponse: 261 """Decode a response from a json string.""" 262 out = self._from_dict(data, self.response_types_by_id, 'response') 263 assert isinstance(out, Response | SysResponse) 264 return out 265 266 # Weeeird; we get mypy errors returning dict[int, type] but 267 # dict[int, typing.Type] or dict[int, type[Any]] works.. 268 def _from_dict( 269 self, data: dict, types_by_id: dict[int, type[Any]], opname: str 270 ) -> Any: 271 """Decode a message from a json string.""" 272 msgdict: dict | None 273 274 m_id = data.get('t') 275 # Allow omitting 'm' dict if its empty. 276 msgdict = data.get('m', {}) 277 278 assert isinstance(m_id, int) 279 assert isinstance(msgdict, dict) 280 281 # Decode this particular type. 282 msgtype = types_by_id.get(m_id) 283 if msgtype is None: 284 raise UnregisteredMessageIDError( 285 f'Got unregistered {opname} id of {m_id}.' 286 ) 287 return dataclass_from_dict(msgtype, msgdict) 288 289 def _get_module_header( 290 self, 291 part: Literal['sender', 'receiver'], 292 extra_import_code: str | None, 293 enable_async_sends: bool, 294 ) -> str: 295 """Return common parts of generated modules.""" 296 # pylint: disable=too-many-locals 297 # pylint: disable=too-many-branches 298 # pylint: disable=too-many-statements 299 import textwrap 300 301 tpimports: dict[str, list[str]] = {} 302 imports: dict[str, list[str]] = {} 303 304 single_message_type = len(self.message_ids_by_type) == 1 305 306 msgtypes = list(self.message_ids_by_type) 307 if part == 'sender': 308 msgtypes.append(Message) 309 for msgtype in msgtypes: 310 tpimports.setdefault(msgtype.__module__, []).append( 311 msgtype.__name__ 312 ) 313 rsptypes = list(self.response_ids_by_type) 314 if part == 'sender': 315 rsptypes.append(Response) 316 for rsp_tp in rsptypes: 317 # Skip these as they don't actually show up in code. 318 if rsp_tp is EmptySysResponse or rsp_tp is ErrorSysResponse: 319 continue 320 if ( 321 single_message_type 322 and part == 'sender' 323 and rsp_tp is not Response 324 ): 325 # We need to cast to the single supported response type 326 # in this case so need response types at runtime. 327 imports.setdefault(rsp_tp.__module__, []).append( 328 rsp_tp.__name__ 329 ) 330 else: 331 tpimports.setdefault(rsp_tp.__module__, []).append( 332 rsp_tp.__name__ 333 ) 334 335 import_lines = '' 336 tpimport_lines = '' 337 338 for module, names in sorted(imports.items()): 339 jnames = ', '.join(names) 340 line = f'from {module} import {jnames}' 341 if len(line) > 80: 342 # Recreate in a wrapping-friendly form. 343 line = f'from {module} import ({jnames})' 344 import_lines += f'{line}\n' 345 for module, names in sorted(tpimports.items()): 346 jnames = ', '.join(names) 347 line = f'from {module} import {jnames}' 348 if len(line) > 75: # Account for indent 349 # Recreate in a wrapping-friendly form. 350 line = f'from {module} import ({jnames})' 351 tpimport_lines += f'{line}\n' 352 353 if part == 'sender': 354 import_lines += ( 355 'from efro.message import MessageSender, BoundMessageSender\n' 356 ) 357 tpimport_typing_extras = '' 358 else: 359 if single_message_type: 360 import_lines += ( 361 'from efro.message import (MessageReceiver,' 362 ' BoundMessageReceiver, Message, Response)\n' 363 ) 364 else: 365 import_lines += ( 366 'from efro.message import MessageReceiver,' 367 ' BoundMessageReceiver\n' 368 ) 369 tpimport_typing_extras = ', Awaitable' 370 371 if extra_import_code is not None: 372 import_lines += extra_import_code 373 374 ovld = ', overload' if not single_message_type else '' 375 ovld2 = ( 376 ', cast, Awaitable' 377 if (single_message_type and part == 'sender' and enable_async_sends) 378 else '' 379 ) 380 tpimport_lines = textwrap.indent(tpimport_lines, ' ') 381 382 baseimps = ['Any'] 383 if part == 'receiver': 384 baseimps.append('Callable') 385 if part == 'sender' and enable_async_sends: 386 baseimps.append('Awaitable') 387 baseimps_s = ', '.join(baseimps) 388 out = ( 389 '# Released under the MIT License. See LICENSE for details.\n' 390 f'#\n' 391 f'"""Auto-generated {part} module. Do not edit by hand."""\n' 392 f'\n' 393 f'from __future__ import annotations\n' 394 f'\n' 395 f'from typing import TYPE_CHECKING{ovld}{ovld2}\n' 396 f'\n' 397 f'{import_lines}' 398 f'\n' 399 f'if TYPE_CHECKING:\n' 400 f' from typing import {baseimps_s}' 401 f'{tpimport_typing_extras}\n' 402 f'{tpimport_lines}' 403 f'\n' 404 f'\n' 405 ) 406 return out 407 408 def do_create_sender_module( 409 self, 410 basename: str, 411 protocol_create_code: str, 412 enable_sync_sends: bool, 413 enable_async_sends: bool, 414 private: bool = False, 415 protocol_module_level_import_code: str | None = None, 416 ) -> str: 417 """Used by create_sender_module(); do not call directly.""" 418 # pylint: disable=too-many-locals 419 # pylint: disable=too-many-branches 420 import textwrap 421 422 msgtypes = list(self.message_ids_by_type.keys()) 423 424 ppre = '_' if private else '' 425 out = self._get_module_header( 426 'sender', 427 extra_import_code=protocol_module_level_import_code, 428 enable_async_sends=enable_async_sends, 429 ) 430 ccind = textwrap.indent(protocol_create_code, ' ') 431 out += ( 432 f'class {ppre}{basename}(MessageSender):\n' 433 f' """Protocol-specific sender."""\n' 434 f'\n' 435 f' def __init__(self) -> None:\n' 436 f'{ccind}\n' 437 f' super().__init__(protocol)\n' 438 f'\n' 439 f' def __get__(\n' 440 f' self, obj: Any, type_in: Any = None\n' 441 f' ) -> {ppre}Bound{basename}:\n' 442 f' return {ppre}Bound{basename}(obj, self)\n' 443 f'\n' 444 f'\n' 445 f'class {ppre}Bound{basename}(BoundMessageSender):\n' 446 f' """Protocol-specific bound sender."""\n' 447 ) 448 449 def _filt_tp_name(rtype: type[Response] | None) -> str: 450 return 'None' if rtype is None else rtype.__name__ 451 452 # Define handler() overloads for all registered message types. 453 if msgtypes: 454 for async_pass in False, True: 455 if async_pass and not enable_async_sends: 456 continue 457 if not async_pass and not enable_sync_sends: 458 continue 459 pfx = 'async ' if async_pass else '' 460 sfx = '_async' if async_pass else '' 461 # awt = 'await ' if async_pass else '' 462 awt = '' 463 how = 'asynchronously' if async_pass else 'synchronously' 464 465 if len(msgtypes) == 1: 466 # Special case: with a single message types we don't 467 # use overloads. 468 msgtype = msgtypes[0] 469 msgtypevar = msgtype.__name__ 470 rtypes = msgtype.get_response_types() 471 if len(rtypes) > 1: 472 rtypevar = ' | '.join(_filt_tp_name(t) for t in rtypes) 473 else: 474 rtypevar = _filt_tp_name(rtypes[0]) 475 if async_pass: 476 rtypevar = f'Awaitable[{rtypevar}]' 477 out += ( 478 f'\n' 479 f' def send{sfx}(self,' 480 f' message: {msgtypevar})' 481 f' -> {rtypevar}:\n' 482 f' """Send a message {how}."""\n' 483 f' out = {awt}self._sender.' 484 f'send{sfx}(self._obj, message)\n' 485 ) 486 if not async_pass: 487 out += ( 488 f' assert isinstance(out, {rtypevar})\n' 489 ' return out\n' 490 ) 491 else: 492 out += f' return cast({rtypevar}, out)\n' 493 494 else: 495 for msgtype in msgtypes: 496 msgtypevar = msgtype.__name__ 497 rtypes = msgtype.get_response_types() 498 if len(rtypes) > 1: 499 rtypevar = ' | '.join( 500 _filt_tp_name(t) for t in rtypes 501 ) 502 else: 503 rtypevar = _filt_tp_name(rtypes[0]) 504 out += ( 505 f'\n' 506 f' @overload\n' 507 f' {pfx}def send{sfx}(self,' 508 f' message: {msgtypevar})' 509 f' -> {rtypevar}: ...\n' 510 ) 511 rtypevar = 'Response | None' 512 if async_pass: 513 rtypevar = f'Awaitable[{rtypevar}]' 514 out += ( 515 f'\n' 516 f' def send{sfx}(self, message: Message)' 517 f' -> {rtypevar}:\n' 518 f' """Send a message {how}."""\n' 519 f' return {awt}self._sender.' 520 f'send{sfx}(self._obj, message)\n' 521 ) 522 523 return out 524 525 def do_create_receiver_module( 526 self, 527 basename: str, 528 protocol_create_code: str, 529 is_async: bool, 530 private: bool = False, 531 protocol_module_level_import_code: str | None = None, 532 ) -> str: 533 """Used by create_receiver_module(); do not call directly.""" 534 # pylint: disable=too-many-locals 535 import textwrap 536 537 desc = 'asynchronous' if is_async else 'synchronous' 538 ppre = '_' if private else '' 539 msgtypes = list(self.message_ids_by_type.keys()) 540 out = self._get_module_header( 541 'receiver', 542 extra_import_code=protocol_module_level_import_code, 543 enable_async_sends=False, 544 ) 545 ccind = textwrap.indent(protocol_create_code, ' ') 546 out += ( 547 f'class {ppre}{basename}(MessageReceiver):\n' 548 f' """Protocol-specific {desc} receiver."""\n' 549 f'\n' 550 f' is_async = {is_async}\n' 551 f'\n' 552 f' def __init__(self) -> None:\n' 553 f'{ccind}\n' 554 f' super().__init__(protocol)\n' 555 f'\n' 556 f' def __get__(\n' 557 f' self,\n' 558 f' obj: Any,\n' 559 f' type_in: Any = None,\n' 560 f' ) -> {ppre}Bound{basename}:\n' 561 f' return {ppre}Bound{basename}(' 562 f'obj, self)\n' 563 ) 564 565 # Define handler() overloads for all registered message types. 566 567 def _filt_tp_name(rtype: type[Response] | None) -> str: 568 return 'None' if rtype is None else rtype.__name__ 569 570 if msgtypes: 571 cbgn = 'Awaitable[' if is_async else '' 572 cend = ']' if is_async else '' 573 if len(msgtypes) == 1: 574 # Special case: when we have a single message type we don't 575 # use overloads. 576 msgtype = msgtypes[0] 577 msgtypevar = msgtype.__name__ 578 rtypes = msgtype.get_response_types() 579 if len(rtypes) > 1: 580 rtypevar = ' | '.join(_filt_tp_name(t) for t in rtypes) 581 else: 582 rtypevar = _filt_tp_name(rtypes[0]) 583 rtypevar = f'{cbgn}{rtypevar}{cend}' 584 out += ( 585 f'\n' 586 f' def handler(\n' 587 f' self,\n' 588 f' call: Callable[[Any, {msgtypevar}], ' 589 f'{rtypevar}],\n' 590 f' )' 591 f' -> Callable[[Any, {msgtypevar}], {rtypevar}]:\n' 592 f' """Decorator to register message handlers."""\n' 593 f' from typing import cast, Callable, Any\n' 594 f'\n' 595 f' self.register_handler(cast(Callable' 596 f'[[Any, Message], Response], call))\n' 597 f' return call\n' 598 ) 599 else: 600 for msgtype in msgtypes: 601 msgtypevar = msgtype.__name__ 602 rtypes = msgtype.get_response_types() 603 if len(rtypes) > 1: 604 rtypevar = ' | '.join(_filt_tp_name(t) for t in rtypes) 605 else: 606 rtypevar = _filt_tp_name(rtypes[0]) 607 rtypevar = f'{cbgn}{rtypevar}{cend}' 608 out += ( 609 f'\n' 610 f' @overload\n' 611 f' def handler(\n' 612 f' self,\n' 613 f' call: Callable[[Any, {msgtypevar}], ' 614 f'{rtypevar}],\n' 615 f' )' 616 f' -> Callable[[Any, {msgtypevar}], {rtypevar}]: ...\n' 617 ) 618 out += ( 619 '\n' 620 ' def handler(self, call: Callable) -> Callable:\n' 621 ' """Decorator to register message handlers."""\n' 622 ' self.register_handler(call)\n' 623 ' return call\n' 624 ) 625 626 out += ( 627 f'\n' 628 f'\n' 629 f'class {ppre}Bound{basename}(BoundMessageReceiver):\n' 630 f' """Protocol-specific bound receiver."""\n' 631 ) 632 if is_async: 633 out += ( 634 '\n' 635 ' def handle_raw_message(\n' 636 ' self, message: str, raise_unregistered: bool = False\n' 637 ' ) -> Awaitable[str]:\n' 638 ' """Asynchronously handle a raw incoming message."""\n' 639 ' return self._receiver.' 640 'handle_raw_message_async(\n' 641 ' self._obj, message, raise_unregistered\n' 642 ' )\n' 643 ) 644 645 else: 646 out += ( 647 '\n' 648 ' def handle_raw_message(\n' 649 ' self, message: str, raise_unregistered: bool = False\n' 650 ' ) -> str:\n' 651 ' """Synchronously handle a raw incoming message."""\n' 652 ' return self._receiver.handle_raw_message(\n' 653 ' self._obj, message, raise_unregistered\n' 654 ' )\n' 655 ) 656 657 return out
Wrangles a set of message types, formats, and response types. Both endpoints must be using a compatible Protocol for communication to succeed. To maintain Protocol compatibility between revisions, all message types must retain the same id, message attr storage names must not change, newly added attrs must have default values, etc.
42 def __init__( 43 self, 44 message_types: dict[int, type[Message]], 45 response_types: dict[int, type[Response]], 46 forward_communication_errors: bool = False, 47 forward_clean_errors: bool = False, 48 remote_errors_include_stack_traces: bool = False, 49 log_errors_on_receiver: bool = True, 50 ) -> None: 51 """Create a protocol with a given configuration. 52 53 If 'forward_communication_errors' is True, 54 efro.error.CommunicationErrors raised on the receiver end will 55 result in a matching error raised back on the sender. This can 56 be useful if the receiver will be in some way forwarding 57 messages along and the sender doesn't need to know where 58 communication breakdowns occurred; only that they did. 59 60 If 'forward_clean_errors' is True, efro.error.CleanError 61 exceptions raised on the receiver end will result in a matching 62 CleanError raised back on the sender. 63 64 When an exception is not covered by the optional forwarding 65 mechanisms above, it will come across as efro.error.RemoteError 66 and the exception will be logged on the receiver end - at least 67 by default (see details below). 68 69 If 'remote_errors_include_stack_traces' is True, stringified 70 stack traces will be returned with efro.error.RemoteError 71 exceptions. This is useful for debugging but should only be 72 enabled in cases where the sender is trusted to see internal 73 details of the receiver. 74 75 By default, when a message-handling exception will result in an 76 efro.error.RemoteError being returned to the sender, the 77 exception will be logged on the receiver. This is because the 78 goal is usually to avoid returning opaque RemoteErrors and to 79 instead return something meaningful as part of the expected 80 response type (even if that value itself represents a logical 81 error state). If 'log_errors_on_receiver' is False, however, such 82 exceptions will *not* be logged on the receiver. This can be 83 useful in combination with 'remote_errors_include_stack_traces' 84 and 'forward_clean_errors' in situations where all error 85 logging/management will be happening on the sender end. Be 86 aware, however, that in that case it may be possible for 87 communication errors to prevent such error messages from 88 ever being seen. 89 """ 90 # pylint: disable=too-many-locals 91 self.message_types_by_id: dict[int, type[Message]] = {} 92 self.message_ids_by_type: dict[type[Message], int] = {} 93 self.response_types_by_id: dict[ 94 int, type[Response] | type[SysResponse] 95 ] = {} 96 self.response_ids_by_type: dict[ 97 type[Response] | type[SysResponse], int 98 ] = {} 99 for m_id, m_type in message_types.items(): 100 # Make sure only valid message types were passed and each 101 # id was assigned only once. 102 assert isinstance(m_id, int) 103 assert m_id >= 0 104 assert is_ioprepped_dataclass(m_type) and issubclass( 105 m_type, Message 106 ) 107 assert self.message_types_by_id.get(m_id) is None 108 self.message_types_by_id[m_id] = m_type 109 self.message_ids_by_type[m_type] = m_id 110 111 for r_id, r_type in response_types.items(): 112 assert isinstance(r_id, int) 113 assert r_id >= 0 114 assert is_ioprepped_dataclass(r_type) and issubclass( 115 r_type, Response 116 ) 117 assert self.response_types_by_id.get(r_id) is None 118 self.response_types_by_id[r_id] = r_type 119 self.response_ids_by_type[r_type] = r_id 120 121 # Register our SysResponse types. These use negative 122 # IDs so as to never overlap with user Response types. 123 def _reg_sys(reg_tp: type[SysResponse], reg_id: int) -> None: 124 assert self.response_types_by_id.get(reg_id) is None 125 self.response_types_by_id[reg_id] = reg_tp 126 self.response_ids_by_type[reg_tp] = reg_id 127 128 _reg_sys(ErrorSysResponse, -1) 129 _reg_sys(EmptySysResponse, -2) 130 131 # Some extra-thorough validation in debug mode. 132 if __debug__: 133 # Make sure all Message types' return types are valid 134 # and have been assigned an ID as well. 135 all_response_types: set[type[Response] | None] = set() 136 for m_id, m_type in message_types.items(): 137 m_rtypes = m_type.get_response_types() 138 139 assert isinstance(m_rtypes, list) 140 assert ( 141 m_rtypes 142 ), f'Message type {m_type} specifies no return types.' 143 assert len(set(m_rtypes)) == len(m_rtypes) # check dups 144 for m_rtype in m_rtypes: 145 all_response_types.add(m_rtype) 146 for cls in all_response_types: 147 if cls is None: 148 continue 149 assert is_ioprepped_dataclass(cls) 150 assert issubclass(cls, Response) 151 if cls not in self.response_ids_by_type: 152 raise ValueError( 153 f'Possible response type {cls} needs to be included' 154 f' in response_types for this protocol.' 155 ) 156 157 # Make sure all registered types have unique base names. 158 # We can take advantage of this to generate cleaner looking 159 # protocol modules. Can revisit if this is ever a problem. 160 mtypenames = set(tp.__name__ for tp in self.message_ids_by_type) 161 if len(mtypenames) != len(message_types): 162 raise ValueError( 163 'message_types contains duplicate __name__s;' 164 ' all types are required to have unique names.' 165 ) 166 167 self.forward_clean_errors = forward_clean_errors 168 self.forward_communication_errors = forward_communication_errors 169 self.remote_errors_include_stack_traces = ( 170 remote_errors_include_stack_traces 171 ) 172 self.log_errors_on_receiver = log_errors_on_receiver
Create a protocol with a given configuration.
If 'forward_communication_errors' is True, efro.error.CommunicationErrors raised on the receiver end will result in a matching error raised back on the sender. This can be useful if the receiver will be in some way forwarding messages along and the sender doesn't need to know where communication breakdowns occurred; only that they did.
If 'forward_clean_errors' is True, efro.error.CleanError exceptions raised on the receiver end will result in a matching CleanError raised back on the sender.
When an exception is not covered by the optional forwarding mechanisms above, it will come across as efro.error.RemoteError and the exception will be logged on the receiver end - at least by default (see details below).
If 'remote_errors_include_stack_traces' is True, stringified stack traces will be returned with efro.error.RemoteError exceptions. This is useful for debugging but should only be enabled in cases where the sender is trusted to see internal details of the receiver.
By default, when a message-handling exception will result in an efro.error.RemoteError being returned to the sender, the exception will be logged on the receiver. This is because the goal is usually to avoid returning opaque RemoteErrors and to instead return something meaningful as part of the expected response type (even if that value itself represents a logical error state). If 'log_errors_on_receiver' is False, however, such exceptions will *not* be logged on the receiver. This can be useful in combination with 'remote_errors_include_stack_traces' and 'forward_clean_errors' in situations where all error logging/management will be happening on the sender end. Be aware, however, that in that case it may be possible for communication errors to prevent such error messages from ever being seen.
174 @staticmethod 175 def encode_dict(obj: dict) -> str: 176 """Json-encode a provided dict.""" 177 return json.dumps(obj, separators=(',', ':'))
Json-encode a provided dict.
179 def message_to_dict(self, message: Message) -> dict: 180 """Encode a message to a json ready dict.""" 181 return self._to_dict(message, self.message_ids_by_type, 'message')
Encode a message to a json ready dict.
183 def response_to_dict(self, response: Response | SysResponse) -> dict: 184 """Encode a response to a json ready dict.""" 185 return self._to_dict(response, self.response_ids_by_type, 'response')
Encode a response to a json ready dict.
187 def error_to_response(self, exc: Exception) -> tuple[SysResponse, bool]: 188 """Translate an Exception to a SysResponse. 189 190 Also returns whether the error should be logged if this happened 191 within handle_raw_message(). 192 """ 193 194 # If anything goes wrong, return a ErrorSysResponse instead. 195 # (either CLEAN or generic REMOTE) 196 if self.forward_clean_errors and isinstance(exc, CleanError): 197 return ( 198 ErrorSysResponse( 199 error_message=str(exc), 200 error_type=ErrorSysResponse.ErrorType.REMOTE_CLEAN, 201 ), 202 False, 203 ) 204 if self.forward_communication_errors and isinstance( 205 exc, CommunicationError 206 ): 207 return ( 208 ErrorSysResponse( 209 error_message=str(exc), 210 error_type=ErrorSysResponse.ErrorType.REMOTE_COMMUNICATION, 211 ), 212 False, 213 ) 214 return ( 215 ErrorSysResponse( 216 error_message=( 217 # Note: need to format exception ourself here; it 218 # might not be current so we can't use 219 # traceback.format_exc(). 220 ''.join( 221 traceback.format_exception( 222 type(exc), exc, exc.__traceback__ 223 ) 224 ) 225 if self.remote_errors_include_stack_traces 226 else 'An internal error has occurred.' 227 ), 228 error_type=ErrorSysResponse.ErrorType.REMOTE, 229 ), 230 self.log_errors_on_receiver, 231 )
Translate an Exception to a SysResponse.
Also returns whether the error should be logged if this happened within handle_raw_message().
247 @staticmethod 248 def decode_dict(data: str) -> dict: 249 """Decode data to a dict.""" 250 out = json.loads(data) 251 assert isinstance(out, dict) 252 return out
Decode data to a dict.
254 def message_from_dict(self, data: dict) -> Message: 255 """Decode a message from a json string.""" 256 out = self._from_dict(data, self.message_types_by_id, 'message') 257 assert isinstance(out, Message) 258 return out
Decode a message from a json string.
260 def response_from_dict(self, data: dict) -> Response | SysResponse: 261 """Decode a response from a json string.""" 262 out = self._from_dict(data, self.response_types_by_id, 'response') 263 assert isinstance(out, Response | SysResponse) 264 return out
Decode a response from a json string.
408 def do_create_sender_module( 409 self, 410 basename: str, 411 protocol_create_code: str, 412 enable_sync_sends: bool, 413 enable_async_sends: bool, 414 private: bool = False, 415 protocol_module_level_import_code: str | None = None, 416 ) -> str: 417 """Used by create_sender_module(); do not call directly.""" 418 # pylint: disable=too-many-locals 419 # pylint: disable=too-many-branches 420 import textwrap 421 422 msgtypes = list(self.message_ids_by_type.keys()) 423 424 ppre = '_' if private else '' 425 out = self._get_module_header( 426 'sender', 427 extra_import_code=protocol_module_level_import_code, 428 enable_async_sends=enable_async_sends, 429 ) 430 ccind = textwrap.indent(protocol_create_code, ' ') 431 out += ( 432 f'class {ppre}{basename}(MessageSender):\n' 433 f' """Protocol-specific sender."""\n' 434 f'\n' 435 f' def __init__(self) -> None:\n' 436 f'{ccind}\n' 437 f' super().__init__(protocol)\n' 438 f'\n' 439 f' def __get__(\n' 440 f' self, obj: Any, type_in: Any = None\n' 441 f' ) -> {ppre}Bound{basename}:\n' 442 f' return {ppre}Bound{basename}(obj, self)\n' 443 f'\n' 444 f'\n' 445 f'class {ppre}Bound{basename}(BoundMessageSender):\n' 446 f' """Protocol-specific bound sender."""\n' 447 ) 448 449 def _filt_tp_name(rtype: type[Response] | None) -> str: 450 return 'None' if rtype is None else rtype.__name__ 451 452 # Define handler() overloads for all registered message types. 453 if msgtypes: 454 for async_pass in False, True: 455 if async_pass and not enable_async_sends: 456 continue 457 if not async_pass and not enable_sync_sends: 458 continue 459 pfx = 'async ' if async_pass else '' 460 sfx = '_async' if async_pass else '' 461 # awt = 'await ' if async_pass else '' 462 awt = '' 463 how = 'asynchronously' if async_pass else 'synchronously' 464 465 if len(msgtypes) == 1: 466 # Special case: with a single message types we don't 467 # use overloads. 468 msgtype = msgtypes[0] 469 msgtypevar = msgtype.__name__ 470 rtypes = msgtype.get_response_types() 471 if len(rtypes) > 1: 472 rtypevar = ' | '.join(_filt_tp_name(t) for t in rtypes) 473 else: 474 rtypevar = _filt_tp_name(rtypes[0]) 475 if async_pass: 476 rtypevar = f'Awaitable[{rtypevar}]' 477 out += ( 478 f'\n' 479 f' def send{sfx}(self,' 480 f' message: {msgtypevar})' 481 f' -> {rtypevar}:\n' 482 f' """Send a message {how}."""\n' 483 f' out = {awt}self._sender.' 484 f'send{sfx}(self._obj, message)\n' 485 ) 486 if not async_pass: 487 out += ( 488 f' assert isinstance(out, {rtypevar})\n' 489 ' return out\n' 490 ) 491 else: 492 out += f' return cast({rtypevar}, out)\n' 493 494 else: 495 for msgtype in msgtypes: 496 msgtypevar = msgtype.__name__ 497 rtypes = msgtype.get_response_types() 498 if len(rtypes) > 1: 499 rtypevar = ' | '.join( 500 _filt_tp_name(t) for t in rtypes 501 ) 502 else: 503 rtypevar = _filt_tp_name(rtypes[0]) 504 out += ( 505 f'\n' 506 f' @overload\n' 507 f' {pfx}def send{sfx}(self,' 508 f' message: {msgtypevar})' 509 f' -> {rtypevar}: ...\n' 510 ) 511 rtypevar = 'Response | None' 512 if async_pass: 513 rtypevar = f'Awaitable[{rtypevar}]' 514 out += ( 515 f'\n' 516 f' def send{sfx}(self, message: Message)' 517 f' -> {rtypevar}:\n' 518 f' """Send a message {how}."""\n' 519 f' return {awt}self._sender.' 520 f'send{sfx}(self._obj, message)\n' 521 ) 522 523 return out
Used by create_sender_module(); do not call directly.
525 def do_create_receiver_module( 526 self, 527 basename: str, 528 protocol_create_code: str, 529 is_async: bool, 530 private: bool = False, 531 protocol_module_level_import_code: str | None = None, 532 ) -> str: 533 """Used by create_receiver_module(); do not call directly.""" 534 # pylint: disable=too-many-locals 535 import textwrap 536 537 desc = 'asynchronous' if is_async else 'synchronous' 538 ppre = '_' if private else '' 539 msgtypes = list(self.message_ids_by_type.keys()) 540 out = self._get_module_header( 541 'receiver', 542 extra_import_code=protocol_module_level_import_code, 543 enable_async_sends=False, 544 ) 545 ccind = textwrap.indent(protocol_create_code, ' ') 546 out += ( 547 f'class {ppre}{basename}(MessageReceiver):\n' 548 f' """Protocol-specific {desc} receiver."""\n' 549 f'\n' 550 f' is_async = {is_async}\n' 551 f'\n' 552 f' def __init__(self) -> None:\n' 553 f'{ccind}\n' 554 f' super().__init__(protocol)\n' 555 f'\n' 556 f' def __get__(\n' 557 f' self,\n' 558 f' obj: Any,\n' 559 f' type_in: Any = None,\n' 560 f' ) -> {ppre}Bound{basename}:\n' 561 f' return {ppre}Bound{basename}(' 562 f'obj, self)\n' 563 ) 564 565 # Define handler() overloads for all registered message types. 566 567 def _filt_tp_name(rtype: type[Response] | None) -> str: 568 return 'None' if rtype is None else rtype.__name__ 569 570 if msgtypes: 571 cbgn = 'Awaitable[' if is_async else '' 572 cend = ']' if is_async else '' 573 if len(msgtypes) == 1: 574 # Special case: when we have a single message type we don't 575 # use overloads. 576 msgtype = msgtypes[0] 577 msgtypevar = msgtype.__name__ 578 rtypes = msgtype.get_response_types() 579 if len(rtypes) > 1: 580 rtypevar = ' | '.join(_filt_tp_name(t) for t in rtypes) 581 else: 582 rtypevar = _filt_tp_name(rtypes[0]) 583 rtypevar = f'{cbgn}{rtypevar}{cend}' 584 out += ( 585 f'\n' 586 f' def handler(\n' 587 f' self,\n' 588 f' call: Callable[[Any, {msgtypevar}], ' 589 f'{rtypevar}],\n' 590 f' )' 591 f' -> Callable[[Any, {msgtypevar}], {rtypevar}]:\n' 592 f' """Decorator to register message handlers."""\n' 593 f' from typing import cast, Callable, Any\n' 594 f'\n' 595 f' self.register_handler(cast(Callable' 596 f'[[Any, Message], Response], call))\n' 597 f' return call\n' 598 ) 599 else: 600 for msgtype in msgtypes: 601 msgtypevar = msgtype.__name__ 602 rtypes = msgtype.get_response_types() 603 if len(rtypes) > 1: 604 rtypevar = ' | '.join(_filt_tp_name(t) for t in rtypes) 605 else: 606 rtypevar = _filt_tp_name(rtypes[0]) 607 rtypevar = f'{cbgn}{rtypevar}{cend}' 608 out += ( 609 f'\n' 610 f' @overload\n' 611 f' def handler(\n' 612 f' self,\n' 613 f' call: Callable[[Any, {msgtypevar}], ' 614 f'{rtypevar}],\n' 615 f' )' 616 f' -> Callable[[Any, {msgtypevar}], {rtypevar}]: ...\n' 617 ) 618 out += ( 619 '\n' 620 ' def handler(self, call: Callable) -> Callable:\n' 621 ' """Decorator to register message handlers."""\n' 622 ' self.register_handler(call)\n' 623 ' return call\n' 624 ) 625 626 out += ( 627 f'\n' 628 f'\n' 629 f'class {ppre}Bound{basename}(BoundMessageReceiver):\n' 630 f' """Protocol-specific bound receiver."""\n' 631 ) 632 if is_async: 633 out += ( 634 '\n' 635 ' def handle_raw_message(\n' 636 ' self, message: str, raise_unregistered: bool = False\n' 637 ' ) -> Awaitable[str]:\n' 638 ' """Asynchronously handle a raw incoming message."""\n' 639 ' return self._receiver.' 640 'handle_raw_message_async(\n' 641 ' self._obj, message, raise_unregistered\n' 642 ' )\n' 643 ) 644 645 else: 646 out += ( 647 '\n' 648 ' def handle_raw_message(\n' 649 ' self, message: str, raise_unregistered: bool = False\n' 650 ' ) -> str:\n' 651 ' """Synchronously handle a raw incoming message."""\n' 652 ' return self._receiver.handle_raw_message(\n' 653 ' self._obj, message, raise_unregistered\n' 654 ' )\n' 655 ) 656 657 return out
Used by create_receiver_module(); do not call directly.
22class MessageSender: 23 """Facilitates sending messages to a target and receiving responses. 24 25 These are instantiated at the class level and used to register unbound 26 class methods to handle raw message sending. Generally this class is not 27 used directly, but instead autogenerated subclasses which provide type 28 safe overloads are used instead. 29 30 Example: 31 (In this example, MyMessageSender is an autogenerated class that 32 inherits from MessageSender). 33 34 class MyClass: 35 msg = MyMessageSender() 36 37 @msg.send_method 38 def send_raw_message(self, message: str) -> str: 39 # Actually send the message here. 40 41 obj = MyClass() 42 43 # The MyMessageSender generated class would provides overloads for 44 # send(), send_async(), etc. to provide type-safety for message types 45 # and their associated response types. 46 # Thus, given the statement below, a type-checker would know that 47 # 'response' is a SomeResponseType or whatever is associated with 48 # SomeMessageType. 49 response = obj.msg.send(SomeMessageType()) 50 51 """ 52 53 def __init__(self, protocol: MessageProtocol) -> None: 54 self.protocol = protocol 55 self._send_raw_message_call: Callable[[Any, str], str] | None = None 56 self._send_raw_message_ex_call: ( 57 Callable[[Any, str, Message], str] | None 58 ) = None 59 self._send_async_raw_message_call: ( 60 Callable[[Any, str], Awaitable[str]] | None 61 ) = None 62 self._send_async_raw_message_ex_call: ( 63 Callable[[Any, str, Message], Awaitable[str]] | None 64 ) = None 65 self._encode_filter_call: ( 66 Callable[[Any, Message, dict], None] | None 67 ) = None 68 self._decode_filter_call: ( 69 Callable[[Any, Message, dict, Response | SysResponse], None] | None 70 ) = None 71 self._peer_desc_call: Callable[[Any], str] | None = None 72 73 def send_method( 74 self, call: Callable[[Any, str], str] 75 ) -> Callable[[Any, str], str]: 76 """Function decorator for setting raw send method. 77 78 Send methods take strings and should return strings. 79 CommunicationErrors raised here will be returned to the sender 80 as such; all other exceptions will result in a RuntimeError for 81 the sender. 82 """ 83 assert self._send_raw_message_call is None 84 self._send_raw_message_call = call 85 return call 86 87 def send_ex_method( 88 self, call: Callable[[Any, str, Message], str] 89 ) -> Callable[[Any, str, Message], str]: 90 """Function decorator for extended send method. 91 92 Version of send_method which is also is passed the original 93 unencoded message; can be useful for cases where metadata is sent 94 along with messages referring to their payloads/etc. 95 """ 96 assert self._send_raw_message_ex_call is None 97 self._send_raw_message_ex_call = call 98 return call 99 100 def send_async_method( 101 self, call: Callable[[Any, str], Awaitable[str]] 102 ) -> Callable[[Any, str], Awaitable[str]]: 103 """Function decorator for setting raw send-async method. 104 105 Send methods take strings and should return strings. 106 CommunicationErrors raised here will be returned to the sender 107 as such; all other exceptions will result in a RuntimeError for 108 the sender. 109 110 IMPORTANT: Generally async send methods should not be implemented 111 as 'async' methods, but instead should be regular methods that 112 return awaitable objects. This way it can be guaranteed that 113 outgoing messages are synchronously enqueued in the correct 114 order, and then async calls can be returned which finish each 115 send. If the entire call is async, they may be enqueued out of 116 order in rare cases. 117 """ 118 assert self._send_async_raw_message_call is None 119 self._send_async_raw_message_call = call 120 return call 121 122 def send_async_ex_method( 123 self, call: Callable[[Any, str, Message], Awaitable[str]] 124 ) -> Callable[[Any, str, Message], Awaitable[str]]: 125 """Function decorator for extended send-async method. 126 127 Version of send_async_method which is also is passed the original 128 unencoded message; can be useful for cases where metadata is sent 129 along with messages referring to their payloads/etc. 130 """ 131 assert self._send_async_raw_message_ex_call is None 132 self._send_async_raw_message_ex_call = call 133 return call 134 135 def encode_filter_method( 136 self, call: Callable[[Any, Message, dict], None] 137 ) -> Callable[[Any, Message, dict], None]: 138 """Function decorator for defining an encode filter. 139 140 Encode filters can be used to add extra data to the message 141 dict before is is encoded to a string and sent out. 142 """ 143 assert self._encode_filter_call is None 144 self._encode_filter_call = call 145 return call 146 147 def decode_filter_method( 148 self, call: Callable[[Any, Message, dict, Response | SysResponse], None] 149 ) -> Callable[[Any, Message, dict, Response], None]: 150 """Function decorator for defining a decode filter. 151 152 Decode filters can be used to extract extra data from incoming 153 message dicts. 154 """ 155 assert self._decode_filter_call is None 156 self._decode_filter_call = call 157 return call 158 159 def peer_desc_method( 160 self, call: Callable[[Any], str] 161 ) -> Callable[[Any], str]: 162 """Function decorator for defining peer descriptions. 163 164 These are included in error messages or other diagnostics. 165 """ 166 assert self._peer_desc_call is None 167 self._peer_desc_call = call 168 return call 169 170 def send(self, bound_obj: Any, message: Message) -> Response | None: 171 """Send a message synchronously.""" 172 return self.unpack_raw_response( 173 bound_obj=bound_obj, 174 message=message, 175 raw_response=self.fetch_raw_response( 176 bound_obj=bound_obj, 177 message=message, 178 ), 179 ) 180 181 def send_async( 182 self, bound_obj: Any, message: Message 183 ) -> Awaitable[Response | None]: 184 """Send a message asynchronously.""" 185 186 # Note: This call is synchronous so that the first part of it can 187 # happen synchronously. If the whole call were async we wouldn't be 188 # able to guarantee that messages sent in order would actually go 189 # out in order. 190 raw_response_awaitable = self.fetch_raw_response_async( 191 bound_obj=bound_obj, 192 message=message, 193 ) 194 # Now return an awaitable that will finish the send. 195 return self._send_async_awaitable( 196 bound_obj, message, raw_response_awaitable 197 ) 198 199 async def _send_async_awaitable( 200 self, 201 bound_obj: Any, 202 message: Message, 203 raw_response_awaitable: Awaitable[Response | SysResponse], 204 ) -> Response | None: 205 return self.unpack_raw_response( 206 bound_obj=bound_obj, 207 message=message, 208 raw_response=await raw_response_awaitable, 209 ) 210 211 def fetch_raw_response( 212 self, bound_obj: Any, message: Message 213 ) -> Response | SysResponse: 214 """Send a message synchronously. 215 216 Generally you can just call send(); these split versions are 217 for when message sending and response handling need to happen 218 in different contexts/threads. 219 """ 220 if ( 221 self._send_raw_message_call is None 222 and self._send_raw_message_ex_call is None 223 ): 224 raise RuntimeError('send() is unimplemented for this type.') 225 226 msg_encoded = self._encode_message(bound_obj, message) 227 try: 228 if self._send_raw_message_ex_call is not None: 229 response_encoded = self._send_raw_message_ex_call( 230 bound_obj, msg_encoded, message 231 ) 232 else: 233 assert self._send_raw_message_call is not None 234 response_encoded = self._send_raw_message_call( 235 bound_obj, msg_encoded 236 ) 237 except Exception as exc: 238 response = ErrorSysResponse( 239 error_message='Error in MessageSender @send_method.', 240 error_type=( 241 ErrorSysResponse.ErrorType.COMMUNICATION 242 if isinstance(exc, CommunicationError) 243 else ErrorSysResponse.ErrorType.LOCAL 244 ), 245 ) 246 # Can include the actual exception since we'll be looking at 247 # this locally; might be helpful. 248 response.set_local_exception(exc) 249 return response 250 return self._decode_raw_response(bound_obj, message, response_encoded) 251 252 def fetch_raw_response_async( 253 self, bound_obj: Any, message: Message 254 ) -> Awaitable[Response | SysResponse]: 255 """Fetch a raw message response awaitable. 256 257 The result of this should be awaited and then passed to 258 unpack_raw_response() to produce the final message result. 259 260 Generally you can just call send(); calling fetch and unpack 261 manually is for when message sending and response handling need 262 to happen in different contexts/threads. 263 """ 264 265 # Note: This call is synchronous so that the first part of it can 266 # happen synchronously. If the whole call were async we wouldn't be 267 # able to guarantee that messages sent in order would actually go 268 # out in order. 269 if ( 270 self._send_async_raw_message_call is None 271 and self._send_async_raw_message_ex_call is None 272 ): 273 raise RuntimeError('send_async() is unimplemented for this type.') 274 275 msg_encoded = self._encode_message(bound_obj, message) 276 try: 277 if self._send_async_raw_message_ex_call is not None: 278 send_awaitable = self._send_async_raw_message_ex_call( 279 bound_obj, msg_encoded, message 280 ) 281 else: 282 assert self._send_async_raw_message_call is not None 283 send_awaitable = self._send_async_raw_message_call( 284 bound_obj, msg_encoded 285 ) 286 except Exception as exc: 287 return self._error_awaitable(exc) 288 289 # Now return an awaitable to finish the job. 290 return self._fetch_raw_response_awaitable( 291 bound_obj, message, send_awaitable 292 ) 293 294 async def _error_awaitable(self, exc: Exception) -> SysResponse: 295 response = ErrorSysResponse( 296 error_message='Error in MessageSender @send_async_method.', 297 error_type=( 298 ErrorSysResponse.ErrorType.COMMUNICATION 299 if isinstance(exc, CommunicationError) 300 else ErrorSysResponse.ErrorType.LOCAL 301 ), 302 ) 303 # Can include the actual exception since we'll be looking at 304 # this locally; might be helpful. 305 response.set_local_exception(exc) 306 return response 307 308 async def _fetch_raw_response_awaitable( 309 self, bound_obj: Any, message: Message, send_awaitable: Awaitable[str] 310 ) -> Response | SysResponse: 311 try: 312 response_encoded = await send_awaitable 313 except Exception as exc: 314 response = ErrorSysResponse( 315 error_message='Error in MessageSender @send_async_method.', 316 error_type=( 317 ErrorSysResponse.ErrorType.COMMUNICATION 318 if isinstance(exc, CommunicationError) 319 else ErrorSysResponse.ErrorType.LOCAL 320 ), 321 ) 322 # Can include the actual exception since we'll be looking at 323 # this locally; might be helpful. 324 response.set_local_exception(exc) 325 return response 326 return self._decode_raw_response(bound_obj, message, response_encoded) 327 328 def unpack_raw_response( 329 self, 330 bound_obj: Any, 331 message: Message, 332 raw_response: Response | SysResponse, 333 ) -> Response | None: 334 """Convert a raw fetched response into a final response/error/etc. 335 336 Generally you can just call send(); calling fetch and unpack 337 manually is for when message sending and response handling need 338 to happen in different contexts/threads. 339 """ 340 response = self._unpack_raw_response(bound_obj, raw_response) 341 assert ( 342 response is None 343 or type(response) in type(message).get_response_types() 344 ) 345 return response 346 347 def _encode_message(self, bound_obj: Any, message: Message) -> str: 348 """Encode a message for sending.""" 349 msg_dict = self.protocol.message_to_dict(message) 350 if self._encode_filter_call is not None: 351 self._encode_filter_call(bound_obj, message, msg_dict) 352 return self.protocol.encode_dict(msg_dict) 353 354 def _decode_raw_response( 355 self, bound_obj: Any, message: Message, response_encoded: str 356 ) -> Response | SysResponse: 357 """Create a Response from returned data. 358 359 These Responses may encapsulate things like remote errors and 360 should not be handed directly to users. _unpack_raw_response() 361 should be used to translate to special values like None or raise 362 Exceptions. This function itself should never raise Exceptions. 363 """ 364 response: Response | SysResponse 365 try: 366 response_dict = self.protocol.decode_dict(response_encoded) 367 response = self.protocol.response_from_dict(response_dict) 368 if self._decode_filter_call is not None: 369 self._decode_filter_call( 370 bound_obj, message, response_dict, response 371 ) 372 except Exception as exc: 373 response = ErrorSysResponse( 374 error_message='Error decoding raw response.', 375 error_type=ErrorSysResponse.ErrorType.LOCAL, 376 ) 377 # Since we'll be looking at this locally, we can include 378 # extra info for logging/etc. 379 response.set_local_exception(exc) 380 return response 381 382 def _unpack_raw_response( 383 self, bound_obj: Any, raw_response: Response | SysResponse 384 ) -> Response | None: 385 """Given a raw Response, unpacks to special values or Exceptions. 386 387 The result of this call is what should be passed to users. 388 For complex messaging situations such as response callbacks 389 operating across different threads, this last stage should be 390 run such that any raised Exception is active when the callback 391 fires; not on the thread where the message was sent. 392 """ 393 # EmptySysResponse translates to None 394 if isinstance(raw_response, EmptySysResponse): 395 return None 396 397 # Some error occurred. Raise a local Exception for it. 398 if isinstance(raw_response, ErrorSysResponse): 399 # Errors that happened locally can attach their exceptions 400 # here for extra logging goodness. 401 local_exception = raw_response.get_local_exception() 402 403 if ( 404 raw_response.error_type 405 is ErrorSysResponse.ErrorType.COMMUNICATION 406 ): 407 raise CommunicationError( 408 raw_response.error_message 409 ) from local_exception 410 411 # If something went wrong on *our* end of the connection, 412 # don't say it was a remote error. 413 if raw_response.error_type is ErrorSysResponse.ErrorType.LOCAL: 414 raise RuntimeError( 415 raw_response.error_message 416 ) from local_exception 417 418 # If they want to support clean errors, do those. 419 if ( 420 self.protocol.forward_clean_errors 421 and raw_response.error_type 422 is ErrorSysResponse.ErrorType.REMOTE_CLEAN 423 ): 424 raise CleanError( 425 raw_response.error_message 426 ) from local_exception 427 428 if ( 429 self.protocol.forward_communication_errors 430 and raw_response.error_type 431 is ErrorSysResponse.ErrorType.REMOTE_COMMUNICATION 432 ): 433 raise CommunicationError( 434 raw_response.error_message 435 ) from local_exception 436 437 # Everything else gets lumped in as a remote error. 438 raise RemoteError( 439 raw_response.error_message, 440 peer_desc=( 441 'peer' 442 if self._peer_desc_call is None 443 else self._peer_desc_call(bound_obj) 444 ), 445 ) from local_exception 446 447 assert isinstance(raw_response, Response) 448 return raw_response
Facilitates sending messages to a target and receiving responses.
These are instantiated at the class level and used to register unbound class methods to handle raw message sending. Generally this class is not used directly, but instead autogenerated subclasses which provide type safe overloads are used instead.
Example: (In this example, MyMessageSender is an autogenerated class that inherits from MessageSender).
class MyClass: msg = MyMessageSender()
@msg.send_method
def send_raw_message(self, message: str) -> str:
# Actually send the message here.
obj = MyClass()
The MyMessageSender generated class would provides overloads for
send(), send_async(), etc. to provide type-safety for message types
and their associated response types.
Thus, given the statement below, a type-checker would know that
'response' is a SomeResponseType or whatever is associated with
SomeMessageType.
response = obj.msg.send(SomeMessageType())
53 def __init__(self, protocol: MessageProtocol) -> None: 54 self.protocol = protocol 55 self._send_raw_message_call: Callable[[Any, str], str] | None = None 56 self._send_raw_message_ex_call: ( 57 Callable[[Any, str, Message], str] | None 58 ) = None 59 self._send_async_raw_message_call: ( 60 Callable[[Any, str], Awaitable[str]] | None 61 ) = None 62 self._send_async_raw_message_ex_call: ( 63 Callable[[Any, str, Message], Awaitable[str]] | None 64 ) = None 65 self._encode_filter_call: ( 66 Callable[[Any, Message, dict], None] | None 67 ) = None 68 self._decode_filter_call: ( 69 Callable[[Any, Message, dict, Response | SysResponse], None] | None 70 ) = None 71 self._peer_desc_call: Callable[[Any], str] | None = None
73 def send_method( 74 self, call: Callable[[Any, str], str] 75 ) -> Callable[[Any, str], str]: 76 """Function decorator for setting raw send method. 77 78 Send methods take strings and should return strings. 79 CommunicationErrors raised here will be returned to the sender 80 as such; all other exceptions will result in a RuntimeError for 81 the sender. 82 """ 83 assert self._send_raw_message_call is None 84 self._send_raw_message_call = call 85 return call
Function decorator for setting raw send method.
Send methods take strings and should return strings. CommunicationErrors raised here will be returned to the sender as such; all other exceptions will result in a RuntimeError for the sender.
87 def send_ex_method( 88 self, call: Callable[[Any, str, Message], str] 89 ) -> Callable[[Any, str, Message], str]: 90 """Function decorator for extended send method. 91 92 Version of send_method which is also is passed the original 93 unencoded message; can be useful for cases where metadata is sent 94 along with messages referring to their payloads/etc. 95 """ 96 assert self._send_raw_message_ex_call is None 97 self._send_raw_message_ex_call = call 98 return call
Function decorator for extended send method.
Version of send_method which is also is passed the original unencoded message; can be useful for cases where metadata is sent along with messages referring to their payloads/etc.
100 def send_async_method( 101 self, call: Callable[[Any, str], Awaitable[str]] 102 ) -> Callable[[Any, str], Awaitable[str]]: 103 """Function decorator for setting raw send-async method. 104 105 Send methods take strings and should return strings. 106 CommunicationErrors raised here will be returned to the sender 107 as such; all other exceptions will result in a RuntimeError for 108 the sender. 109 110 IMPORTANT: Generally async send methods should not be implemented 111 as 'async' methods, but instead should be regular methods that 112 return awaitable objects. This way it can be guaranteed that 113 outgoing messages are synchronously enqueued in the correct 114 order, and then async calls can be returned which finish each 115 send. If the entire call is async, they may be enqueued out of 116 order in rare cases. 117 """ 118 assert self._send_async_raw_message_call is None 119 self._send_async_raw_message_call = call 120 return call
Function decorator for setting raw send-async method.
Send methods take strings and should return strings. CommunicationErrors raised here will be returned to the sender as such; all other exceptions will result in a RuntimeError for the sender.
IMPORTANT: Generally async send methods should not be implemented as 'async' methods, but instead should be regular methods that return awaitable objects. This way it can be guaranteed that outgoing messages are synchronously enqueued in the correct order, and then async calls can be returned which finish each send. If the entire call is async, they may be enqueued out of order in rare cases.
122 def send_async_ex_method( 123 self, call: Callable[[Any, str, Message], Awaitable[str]] 124 ) -> Callable[[Any, str, Message], Awaitable[str]]: 125 """Function decorator for extended send-async method. 126 127 Version of send_async_method which is also is passed the original 128 unencoded message; can be useful for cases where metadata is sent 129 along with messages referring to their payloads/etc. 130 """ 131 assert self._send_async_raw_message_ex_call is None 132 self._send_async_raw_message_ex_call = call 133 return call
Function decorator for extended send-async method.
Version of send_async_method which is also is passed the original unencoded message; can be useful for cases where metadata is sent along with messages referring to their payloads/etc.
135 def encode_filter_method( 136 self, call: Callable[[Any, Message, dict], None] 137 ) -> Callable[[Any, Message, dict], None]: 138 """Function decorator for defining an encode filter. 139 140 Encode filters can be used to add extra data to the message 141 dict before is is encoded to a string and sent out. 142 """ 143 assert self._encode_filter_call is None 144 self._encode_filter_call = call 145 return call
Function decorator for defining an encode filter.
Encode filters can be used to add extra data to the message dict before is is encoded to a string and sent out.
147 def decode_filter_method( 148 self, call: Callable[[Any, Message, dict, Response | SysResponse], None] 149 ) -> Callable[[Any, Message, dict, Response], None]: 150 """Function decorator for defining a decode filter. 151 152 Decode filters can be used to extract extra data from incoming 153 message dicts. 154 """ 155 assert self._decode_filter_call is None 156 self._decode_filter_call = call 157 return call
Function decorator for defining a decode filter.
Decode filters can be used to extract extra data from incoming message dicts.
159 def peer_desc_method( 160 self, call: Callable[[Any], str] 161 ) -> Callable[[Any], str]: 162 """Function decorator for defining peer descriptions. 163 164 These are included in error messages or other diagnostics. 165 """ 166 assert self._peer_desc_call is None 167 self._peer_desc_call = call 168 return call
Function decorator for defining peer descriptions.
These are included in error messages or other diagnostics.
170 def send(self, bound_obj: Any, message: Message) -> Response | None: 171 """Send a message synchronously.""" 172 return self.unpack_raw_response( 173 bound_obj=bound_obj, 174 message=message, 175 raw_response=self.fetch_raw_response( 176 bound_obj=bound_obj, 177 message=message, 178 ), 179 )
Send a message synchronously.
181 def send_async( 182 self, bound_obj: Any, message: Message 183 ) -> Awaitable[Response | None]: 184 """Send a message asynchronously.""" 185 186 # Note: This call is synchronous so that the first part of it can 187 # happen synchronously. If the whole call were async we wouldn't be 188 # able to guarantee that messages sent in order would actually go 189 # out in order. 190 raw_response_awaitable = self.fetch_raw_response_async( 191 bound_obj=bound_obj, 192 message=message, 193 ) 194 # Now return an awaitable that will finish the send. 195 return self._send_async_awaitable( 196 bound_obj, message, raw_response_awaitable 197 )
Send a message asynchronously.
211 def fetch_raw_response( 212 self, bound_obj: Any, message: Message 213 ) -> Response | SysResponse: 214 """Send a message synchronously. 215 216 Generally you can just call send(); these split versions are 217 for when message sending and response handling need to happen 218 in different contexts/threads. 219 """ 220 if ( 221 self._send_raw_message_call is None 222 and self._send_raw_message_ex_call is None 223 ): 224 raise RuntimeError('send() is unimplemented for this type.') 225 226 msg_encoded = self._encode_message(bound_obj, message) 227 try: 228 if self._send_raw_message_ex_call is not None: 229 response_encoded = self._send_raw_message_ex_call( 230 bound_obj, msg_encoded, message 231 ) 232 else: 233 assert self._send_raw_message_call is not None 234 response_encoded = self._send_raw_message_call( 235 bound_obj, msg_encoded 236 ) 237 except Exception as exc: 238 response = ErrorSysResponse( 239 error_message='Error in MessageSender @send_method.', 240 error_type=( 241 ErrorSysResponse.ErrorType.COMMUNICATION 242 if isinstance(exc, CommunicationError) 243 else ErrorSysResponse.ErrorType.LOCAL 244 ), 245 ) 246 # Can include the actual exception since we'll be looking at 247 # this locally; might be helpful. 248 response.set_local_exception(exc) 249 return response 250 return self._decode_raw_response(bound_obj, message, response_encoded)
Send a message synchronously.
Generally you can just call send(); these split versions are for when message sending and response handling need to happen in different contexts/threads.
252 def fetch_raw_response_async( 253 self, bound_obj: Any, message: Message 254 ) -> Awaitable[Response | SysResponse]: 255 """Fetch a raw message response awaitable. 256 257 The result of this should be awaited and then passed to 258 unpack_raw_response() to produce the final message result. 259 260 Generally you can just call send(); calling fetch and unpack 261 manually is for when message sending and response handling need 262 to happen in different contexts/threads. 263 """ 264 265 # Note: This call is synchronous so that the first part of it can 266 # happen synchronously. If the whole call were async we wouldn't be 267 # able to guarantee that messages sent in order would actually go 268 # out in order. 269 if ( 270 self._send_async_raw_message_call is None 271 and self._send_async_raw_message_ex_call is None 272 ): 273 raise RuntimeError('send_async() is unimplemented for this type.') 274 275 msg_encoded = self._encode_message(bound_obj, message) 276 try: 277 if self._send_async_raw_message_ex_call is not None: 278 send_awaitable = self._send_async_raw_message_ex_call( 279 bound_obj, msg_encoded, message 280 ) 281 else: 282 assert self._send_async_raw_message_call is not None 283 send_awaitable = self._send_async_raw_message_call( 284 bound_obj, msg_encoded 285 ) 286 except Exception as exc: 287 return self._error_awaitable(exc) 288 289 # Now return an awaitable to finish the job. 290 return self._fetch_raw_response_awaitable( 291 bound_obj, message, send_awaitable 292 )
Fetch a raw message response awaitable.
The result of this should be awaited and then passed to unpack_raw_response() to produce the final message result.
Generally you can just call send(); calling fetch and unpack manually is for when message sending and response handling need to happen in different contexts/threads.
328 def unpack_raw_response( 329 self, 330 bound_obj: Any, 331 message: Message, 332 raw_response: Response | SysResponse, 333 ) -> Response | None: 334 """Convert a raw fetched response into a final response/error/etc. 335 336 Generally you can just call send(); calling fetch and unpack 337 manually is for when message sending and response handling need 338 to happen in different contexts/threads. 339 """ 340 response = self._unpack_raw_response(bound_obj, raw_response) 341 assert ( 342 response is None 343 or type(response) in type(message).get_response_types() 344 ) 345 return response
Convert a raw fetched response into a final response/error/etc.
Generally you can just call send(); calling fetch and unpack manually is for when message sending and response handling need to happen in different contexts/threads.
451class BoundMessageSender: 452 """Base class for bound senders.""" 453 454 def __init__(self, obj: Any, sender: MessageSender) -> None: 455 # Note: not checking obj here since we want to support 456 # at least our protocol property when accessed via type. 457 self._obj = obj 458 self._sender = sender 459 460 @property 461 def protocol(self) -> MessageProtocol: 462 """Protocol associated with this sender.""" 463 return self._sender.protocol 464 465 def send_untyped(self, message: Message) -> Response | None: 466 """Send a message synchronously. 467 468 Whenever possible, use the send() call provided by generated 469 subclasses instead of this; it will provide better type safety. 470 """ 471 assert self._obj is not None 472 return self._sender.send(bound_obj=self._obj, message=message) 473 474 def send_async_untyped( 475 self, message: Message 476 ) -> Awaitable[Response | None]: 477 """Send a message asynchronously. 478 479 Whenever possible, use the send_async() call provided by generated 480 subclasses instead of this; it will provide better type safety. 481 """ 482 assert self._obj is not None 483 return self._sender.send_async(bound_obj=self._obj, message=message) 484 485 def fetch_raw_response_async_untyped( 486 self, message: Message 487 ) -> Awaitable[Response | SysResponse]: 488 """Split send (part 1 of 2).""" 489 assert self._obj is not None 490 return self._sender.fetch_raw_response_async( 491 bound_obj=self._obj, message=message 492 ) 493 494 def unpack_raw_response_untyped( 495 self, message: Message, raw_response: Response | SysResponse 496 ) -> Response | None: 497 """Split send (part 2 of 2).""" 498 return self._sender.unpack_raw_response( 499 bound_obj=self._obj, message=message, raw_response=raw_response 500 )
Base class for bound senders.
460 @property 461 def protocol(self) -> MessageProtocol: 462 """Protocol associated with this sender.""" 463 return self._sender.protocol
Protocol associated with this sender.
465 def send_untyped(self, message: Message) -> Response | None: 466 """Send a message synchronously. 467 468 Whenever possible, use the send() call provided by generated 469 subclasses instead of this; it will provide better type safety. 470 """ 471 assert self._obj is not None 472 return self._sender.send(bound_obj=self._obj, message=message)
Send a message synchronously.
Whenever possible, use the send() call provided by generated subclasses instead of this; it will provide better type safety.
474 def send_async_untyped( 475 self, message: Message 476 ) -> Awaitable[Response | None]: 477 """Send a message asynchronously. 478 479 Whenever possible, use the send_async() call provided by generated 480 subclasses instead of this; it will provide better type safety. 481 """ 482 assert self._obj is not None 483 return self._sender.send_async(bound_obj=self._obj, message=message)
Send a message asynchronously.
Whenever possible, use the send_async() call provided by generated subclasses instead of this; it will provide better type safety.
485 def fetch_raw_response_async_untyped( 486 self, message: Message 487 ) -> Awaitable[Response | SysResponse]: 488 """Split send (part 1 of 2).""" 489 assert self._obj is not None 490 return self._sender.fetch_raw_response_async( 491 bound_obj=self._obj, message=message 492 )
Split send (part 1 of 2).
494 def unpack_raw_response_untyped( 495 self, message: Message, raw_response: Response | SysResponse 496 ) -> Response | None: 497 """Split send (part 2 of 2).""" 498 return self._sender.unpack_raw_response( 499 bound_obj=self._obj, message=message, raw_response=raw_response 500 )
Split send (part 2 of 2).
29class MessageReceiver: 30 """Facilitates receiving & responding to messages from a remote source. 31 32 This is instantiated at the class level with unbound methods registered 33 as handlers for different message types in the protocol. 34 35 Example: 36 37 class MyClass: 38 receiver = MyMessageReceiver() 39 40 # MyMessageReceiver fills out handler() overloads to ensure all 41 # registered handlers have valid types/return-types. 42 43 @receiver.handler 44 def handle_some_message_type(self, message: SomeMsg) -> SomeResponse: 45 # Deal with this message type here. 46 47 # This will trigger the registered handler being called. 48 obj = MyClass() 49 obj.receiver.handle_raw_message(some_raw_data) 50 51 Any unhandled Exception occurring during message handling will result in 52 an efro.error.RemoteError being raised on the sending end. 53 """ 54 55 is_async = False 56 57 def __init__(self, protocol: MessageProtocol) -> None: 58 self.protocol = protocol 59 self._handlers: dict[type[Message], Callable] = {} 60 self._decode_filter_call: ( 61 Callable[[Any, dict, Message], None] | None 62 ) = None 63 self._encode_filter_call: ( 64 Callable[[Any, Message | None, Response | SysResponse, dict], None] 65 | None 66 ) = None 67 68 # noinspection PyProtectedMember 69 def register_handler( 70 self, call: Callable[[Any, Message], Response | None] 71 ) -> None: 72 """Register a handler call. 73 74 The message type handled by the call is determined by its 75 type annotation. 76 """ 77 # TODO: can use types.GenericAlias in 3.9. 78 # (hmm though now that we're there, it seems a drop-in 79 # replace gives us errors. Should re-test in 3.11 as it seems 80 # that typing_extensions handles it differently in that case) 81 from typing import _GenericAlias # type: ignore 82 from typing import get_type_hints, get_args 83 84 sig = inspect.getfullargspec(call) 85 86 # The provided callable should be a method taking one 'msg' arg. 87 expectedsig = ['self', 'msg'] 88 if sig.args != expectedsig: 89 raise ValueError( 90 f'Expected callable signature of {expectedsig};' 91 f' got {sig.args}' 92 ) 93 94 # Check annotation types to determine what message types we handle. 95 # Return-type annotation can be a Union, but we probably don't 96 # have it available at runtime. Explicitly pull it in. 97 # UPDATE: we've updated our pylint filter to where we should 98 # have all annotations available. 99 # anns = get_type_hints(call, localns={'Union': Union}) 100 anns = get_type_hints(call) 101 102 msgtype = anns.get('msg') 103 if not isinstance(msgtype, type): 104 raise TypeError( 105 f'expected a type for "msg" annotation; got {type(msgtype)}.' 106 ) 107 assert issubclass(msgtype, Message) 108 109 ret = anns.get('return') 110 responsetypes: tuple[type[Any] | None, ...] 111 112 # Return types can be a single type or a union of types. 113 if isinstance(ret, (_GenericAlias, types.UnionType)): 114 targs = get_args(ret) 115 if not all(isinstance(a, (type, type(None))) for a in targs): 116 raise TypeError( 117 f'expected only types for "return" annotation;' 118 f' got {targs}.' 119 ) 120 responsetypes = targs 121 else: 122 if not isinstance(ret, (type, type(None))): 123 raise TypeError( 124 f'expected one or more types for' 125 f' "return" annotation; got a {type(ret)}.' 126 ) 127 # This seems like maybe a mypy bug. Appeared after adding 128 # types.UnionType above. 129 responsetypes = (ret,) 130 131 # This will contain NoneType for empty return cases, but 132 # we expect it to be None. 133 # noinspection PyPep8 134 responsetypes = tuple( 135 None if r is type(None) else r for r in responsetypes 136 ) 137 138 # Make sure our protocol has this message type registered and our 139 # return types exactly match. (Technically we could return a subset 140 # of the supported types; can allow this in the future if it makes 141 # sense). 142 registered_types = self.protocol.message_ids_by_type.keys() 143 144 if msgtype not in registered_types: 145 raise TypeError( 146 f'Message type {msgtype} is not registered' 147 f' in this Protocol.' 148 ) 149 150 if msgtype in self._handlers: 151 raise TypeError( 152 f'Message type {msgtype} already has a registered handler.' 153 ) 154 155 # Make sure the responses exactly matches what the message expects. 156 if set(responsetypes) != set(msgtype.get_response_types()): 157 raise TypeError( 158 f'Provided response types {responsetypes} do not' 159 f' match the set expected by message type {msgtype}: ' 160 f'({msgtype.get_response_types()})' 161 ) 162 163 # Ok; we're good! 164 self._handlers[msgtype] = call 165 166 def decode_filter_method( 167 self, call: Callable[[Any, dict, Message], None] 168 ) -> Callable[[Any, dict, Message], None]: 169 """Function decorator for defining a decode filter. 170 171 Decode filters can be used to extract extra data from incoming 172 message dicts. This version will work for both handle_raw_message() 173 and handle_raw_message_async() 174 """ 175 assert self._decode_filter_call is None 176 self._decode_filter_call = call 177 return call 178 179 def encode_filter_method( 180 self, 181 call: Callable[ 182 [Any, Message | None, Response | SysResponse, dict], None 183 ], 184 ) -> Callable[[Any, Message | None, Response, dict], None]: 185 """Function decorator for defining an encode filter. 186 187 Encode filters can be used to add extra data to the message 188 dict before is is encoded to a string and sent out. 189 """ 190 assert self._encode_filter_call is None 191 self._encode_filter_call = call 192 return call 193 194 def validate(self, log_only: bool = False) -> None: 195 """Check for handler completeness, valid types, etc.""" 196 for msgtype in self.protocol.message_ids_by_type.keys(): 197 if issubclass(msgtype, Response): 198 continue 199 if msgtype not in self._handlers: 200 msg = ( 201 f'Protocol message type {msgtype} is not handled' 202 f' by receiver type {type(self)}.' 203 ) 204 if log_only: 205 logging.error(msg) 206 else: 207 raise TypeError(msg) 208 209 def _decode_incoming_message_base( 210 self, bound_obj: Any, msg: str 211 ) -> tuple[Any, dict, Message]: 212 # Decode the incoming message. 213 msg_dict = self.protocol.decode_dict(msg) 214 msg_decoded = self.protocol.message_from_dict(msg_dict) 215 assert isinstance(msg_decoded, Message) 216 if self._decode_filter_call is not None: 217 self._decode_filter_call(bound_obj, msg_dict, msg_decoded) 218 return bound_obj, msg_dict, msg_decoded 219 220 def _decode_incoming_message(self, bound_obj: Any, msg: str) -> Message: 221 bound_obj, _msg_dict, msg_decoded = self._decode_incoming_message_base( 222 bound_obj=bound_obj, msg=msg 223 ) 224 return msg_decoded 225 226 def encode_user_response( 227 self, bound_obj: Any, message: Message, response: Response | None 228 ) -> str: 229 """Encode a response provided by the user for sending.""" 230 231 assert isinstance(response, Response | None) 232 # (user should never explicitly return error-responses) 233 assert ( 234 response is None or type(response) in message.get_response_types() 235 ) 236 237 # A return value of None equals EmptySysResponse. 238 out_response: Response | SysResponse 239 if response is None: 240 out_response = EmptySysResponse() 241 else: 242 out_response = response 243 244 response_dict = self.protocol.response_to_dict(out_response) 245 if self._encode_filter_call is not None: 246 self._encode_filter_call( 247 bound_obj, message, out_response, response_dict 248 ) 249 return self.protocol.encode_dict(response_dict) 250 251 def encode_error_response( 252 self, bound_obj: Any, message: Message | None, exc: Exception 253 ) -> tuple[str, bool]: 254 """Given an error, return sysresponse str and whether to log.""" 255 response, dolog = self.protocol.error_to_response(exc) 256 response_dict = self.protocol.response_to_dict(response) 257 if self._encode_filter_call is not None: 258 self._encode_filter_call( 259 bound_obj, message, response, response_dict 260 ) 261 return self.protocol.encode_dict(response_dict), dolog 262 263 def handle_raw_message( 264 self, bound_obj: Any, msg: str, raise_unregistered: bool = False 265 ) -> str: 266 """Decode, handle, and return an response for a message. 267 268 if 'raise_unregistered' is True, will raise an 269 efro.message.UnregisteredMessageIDError for messages not handled by 270 the protocol. In all other cases local errors will translate to 271 error responses returned to the sender. 272 """ 273 assert not self.is_async, "can't call sync handler on async receiver" 274 msg_decoded: Message | None = None 275 try: 276 msg_decoded = self._decode_incoming_message(bound_obj, msg) 277 msgtype = type(msg_decoded) 278 handler = self._handlers.get(msgtype) 279 if handler is None: 280 raise RuntimeError(f'Got unhandled message type: {msgtype}.') 281 response = handler(bound_obj, msg_decoded) 282 assert isinstance(response, Response | None) 283 return self.encode_user_response(bound_obj, msg_decoded, response) 284 285 except Exception as exc: 286 if raise_unregistered and isinstance( 287 exc, UnregisteredMessageIDError 288 ): 289 raise 290 rstr, dolog = self.encode_error_response( 291 bound_obj, msg_decoded, exc 292 ) 293 if dolog: 294 if msg_decoded is not None: 295 msgtype = type(msg_decoded) 296 logging.exception( 297 'Error handling %s.%s message.', 298 msgtype.__module__, 299 msgtype.__qualname__, 300 ) 301 else: 302 logging.exception( 303 'Error handling raw efro.message' 304 ' (likely a message format incompatibility): %s.', 305 msg, 306 ) 307 return rstr 308 309 def handle_raw_message_async( 310 self, bound_obj: Any, msg: str, raise_unregistered: bool = False 311 ) -> Awaitable[str]: 312 """Should be called when the receiver gets a message. 313 314 The return value is the raw response to the message. 315 """ 316 317 # Note: This call is synchronous so that the first part of it can 318 # happen synchronously. If the whole call were async we wouldn't be 319 # able to guarantee that messages handlers would be called in the 320 # order the messages were received. 321 322 assert self.is_async, "Can't call async handler on sync receiver." 323 msg_decoded: Message | None = None 324 try: 325 msg_decoded = self._decode_incoming_message(bound_obj, msg) 326 msgtype = type(msg_decoded) 327 handler = self._handlers.get(msgtype) 328 if handler is None: 329 raise RuntimeError(f'Got unhandled message type: {msgtype}.') 330 handler_awaitable = handler(bound_obj, msg_decoded) 331 332 except Exception as exc: 333 if raise_unregistered and isinstance( 334 exc, UnregisteredMessageIDError 335 ): 336 raise 337 return self._handle_raw_message_async_error( 338 bound_obj, msg, msg_decoded, exc 339 ) 340 341 # Return an awaitable to handle the rest asynchronously. 342 return self._handle_raw_message_async( 343 bound_obj, msg, msg_decoded, handler_awaitable 344 ) 345 346 async def _handle_raw_message_async_error( 347 self, 348 bound_obj: Any, 349 msg_raw: str, 350 msg_decoded: Message | None, 351 exc: Exception, 352 ) -> str: 353 rstr, dolog = self.encode_error_response(bound_obj, msg_decoded, exc) 354 if dolog: 355 if msg_decoded is not None: 356 msgtype = type(msg_decoded) 357 logging.exception( 358 'Error handling %s.%s message.', 359 msgtype.__module__, 360 msgtype.__qualname__, 361 # We need to explicitly provide the exception here, 362 # otherwise it shows up at None. I assume related to 363 # the fact that we're an async function. 364 exc_info=exc, 365 ) 366 else: 367 logging.exception( 368 'Error handling raw async efro.message' 369 ' (likely a message format incompatibility): %s.', 370 msg_raw, 371 # We need to explicitly provide the exception here, 372 # otherwise it shows up at None. I assume related to 373 # the fact that we're an async function. 374 exc_info=exc, 375 ) 376 return rstr 377 378 async def _handle_raw_message_async( 379 self, 380 bound_obj: Any, 381 msg_raw: str, 382 msg_decoded: Message, 383 handler_awaitable: Awaitable[Response | None], 384 ) -> str: 385 """Should be called when the receiver gets a message. 386 387 The return value is the raw response to the message. 388 """ 389 try: 390 response = await handler_awaitable 391 assert isinstance(response, Response | None) 392 return self.encode_user_response(bound_obj, msg_decoded, response) 393 394 except Exception as exc: 395 return await self._handle_raw_message_async_error( 396 bound_obj, msg_raw, msg_decoded, exc 397 )
Facilitates receiving & responding to messages from a remote source.
This is instantiated at the class level with unbound methods registered as handlers for different message types in the protocol.
Example:
class MyClass: receiver = MyMessageReceiver()
# MyMessageReceiver fills out handler() overloads to ensure all
# registered handlers have valid types/return-types.
@receiver.handler
def handle_some_message_type(self, message: SomeMsg) -> SomeResponse:
# Deal with this message type here.
This will trigger the registered handler being called.
obj = MyClass() obj.receiver.handle_raw_message(some_raw_data)
Any unhandled Exception occurring during message handling will result in an efro.error.RemoteError being raised on the sending end.
57 def __init__(self, protocol: MessageProtocol) -> None: 58 self.protocol = protocol 59 self._handlers: dict[type[Message], Callable] = {} 60 self._decode_filter_call: ( 61 Callable[[Any, dict, Message], None] | None 62 ) = None 63 self._encode_filter_call: ( 64 Callable[[Any, Message | None, Response | SysResponse, dict], None] 65 | None 66 ) = None
69 def register_handler( 70 self, call: Callable[[Any, Message], Response | None] 71 ) -> None: 72 """Register a handler call. 73 74 The message type handled by the call is determined by its 75 type annotation. 76 """ 77 # TODO: can use types.GenericAlias in 3.9. 78 # (hmm though now that we're there, it seems a drop-in 79 # replace gives us errors. Should re-test in 3.11 as it seems 80 # that typing_extensions handles it differently in that case) 81 from typing import _GenericAlias # type: ignore 82 from typing import get_type_hints, get_args 83 84 sig = inspect.getfullargspec(call) 85 86 # The provided callable should be a method taking one 'msg' arg. 87 expectedsig = ['self', 'msg'] 88 if sig.args != expectedsig: 89 raise ValueError( 90 f'Expected callable signature of {expectedsig};' 91 f' got {sig.args}' 92 ) 93 94 # Check annotation types to determine what message types we handle. 95 # Return-type annotation can be a Union, but we probably don't 96 # have it available at runtime. Explicitly pull it in. 97 # UPDATE: we've updated our pylint filter to where we should 98 # have all annotations available. 99 # anns = get_type_hints(call, localns={'Union': Union}) 100 anns = get_type_hints(call) 101 102 msgtype = anns.get('msg') 103 if not isinstance(msgtype, type): 104 raise TypeError( 105 f'expected a type for "msg" annotation; got {type(msgtype)}.' 106 ) 107 assert issubclass(msgtype, Message) 108 109 ret = anns.get('return') 110 responsetypes: tuple[type[Any] | None, ...] 111 112 # Return types can be a single type or a union of types. 113 if isinstance(ret, (_GenericAlias, types.UnionType)): 114 targs = get_args(ret) 115 if not all(isinstance(a, (type, type(None))) for a in targs): 116 raise TypeError( 117 f'expected only types for "return" annotation;' 118 f' got {targs}.' 119 ) 120 responsetypes = targs 121 else: 122 if not isinstance(ret, (type, type(None))): 123 raise TypeError( 124 f'expected one or more types for' 125 f' "return" annotation; got a {type(ret)}.' 126 ) 127 # This seems like maybe a mypy bug. Appeared after adding 128 # types.UnionType above. 129 responsetypes = (ret,) 130 131 # This will contain NoneType for empty return cases, but 132 # we expect it to be None. 133 # noinspection PyPep8 134 responsetypes = tuple( 135 None if r is type(None) else r for r in responsetypes 136 ) 137 138 # Make sure our protocol has this message type registered and our 139 # return types exactly match. (Technically we could return a subset 140 # of the supported types; can allow this in the future if it makes 141 # sense). 142 registered_types = self.protocol.message_ids_by_type.keys() 143 144 if msgtype not in registered_types: 145 raise TypeError( 146 f'Message type {msgtype} is not registered' 147 f' in this Protocol.' 148 ) 149 150 if msgtype in self._handlers: 151 raise TypeError( 152 f'Message type {msgtype} already has a registered handler.' 153 ) 154 155 # Make sure the responses exactly matches what the message expects. 156 if set(responsetypes) != set(msgtype.get_response_types()): 157 raise TypeError( 158 f'Provided response types {responsetypes} do not' 159 f' match the set expected by message type {msgtype}: ' 160 f'({msgtype.get_response_types()})' 161 ) 162 163 # Ok; we're good! 164 self._handlers[msgtype] = call
Register a handler call.
The message type handled by the call is determined by its type annotation.
166 def decode_filter_method( 167 self, call: Callable[[Any, dict, Message], None] 168 ) -> Callable[[Any, dict, Message], None]: 169 """Function decorator for defining a decode filter. 170 171 Decode filters can be used to extract extra data from incoming 172 message dicts. This version will work for both handle_raw_message() 173 and handle_raw_message_async() 174 """ 175 assert self._decode_filter_call is None 176 self._decode_filter_call = call 177 return call
Function decorator for defining a decode filter.
Decode filters can be used to extract extra data from incoming message dicts. This version will work for both handle_raw_message() and handle_raw_message_async()
179 def encode_filter_method( 180 self, 181 call: Callable[ 182 [Any, Message | None, Response | SysResponse, dict], None 183 ], 184 ) -> Callable[[Any, Message | None, Response, dict], None]: 185 """Function decorator for defining an encode filter. 186 187 Encode filters can be used to add extra data to the message 188 dict before is is encoded to a string and sent out. 189 """ 190 assert self._encode_filter_call is None 191 self._encode_filter_call = call 192 return call
Function decorator for defining an encode filter.
Encode filters can be used to add extra data to the message dict before is is encoded to a string and sent out.
194 def validate(self, log_only: bool = False) -> None: 195 """Check for handler completeness, valid types, etc.""" 196 for msgtype in self.protocol.message_ids_by_type.keys(): 197 if issubclass(msgtype, Response): 198 continue 199 if msgtype not in self._handlers: 200 msg = ( 201 f'Protocol message type {msgtype} is not handled' 202 f' by receiver type {type(self)}.' 203 ) 204 if log_only: 205 logging.error(msg) 206 else: 207 raise TypeError(msg)
Check for handler completeness, valid types, etc.
226 def encode_user_response( 227 self, bound_obj: Any, message: Message, response: Response | None 228 ) -> str: 229 """Encode a response provided by the user for sending.""" 230 231 assert isinstance(response, Response | None) 232 # (user should never explicitly return error-responses) 233 assert ( 234 response is None or type(response) in message.get_response_types() 235 ) 236 237 # A return value of None equals EmptySysResponse. 238 out_response: Response | SysResponse 239 if response is None: 240 out_response = EmptySysResponse() 241 else: 242 out_response = response 243 244 response_dict = self.protocol.response_to_dict(out_response) 245 if self._encode_filter_call is not None: 246 self._encode_filter_call( 247 bound_obj, message, out_response, response_dict 248 ) 249 return self.protocol.encode_dict(response_dict)
Encode a response provided by the user for sending.
251 def encode_error_response( 252 self, bound_obj: Any, message: Message | None, exc: Exception 253 ) -> tuple[str, bool]: 254 """Given an error, return sysresponse str and whether to log.""" 255 response, dolog = self.protocol.error_to_response(exc) 256 response_dict = self.protocol.response_to_dict(response) 257 if self._encode_filter_call is not None: 258 self._encode_filter_call( 259 bound_obj, message, response, response_dict 260 ) 261 return self.protocol.encode_dict(response_dict), dolog
Given an error, return sysresponse str and whether to log.
263 def handle_raw_message( 264 self, bound_obj: Any, msg: str, raise_unregistered: bool = False 265 ) -> str: 266 """Decode, handle, and return an response for a message. 267 268 if 'raise_unregistered' is True, will raise an 269 efro.message.UnregisteredMessageIDError for messages not handled by 270 the protocol. In all other cases local errors will translate to 271 error responses returned to the sender. 272 """ 273 assert not self.is_async, "can't call sync handler on async receiver" 274 msg_decoded: Message | None = None 275 try: 276 msg_decoded = self._decode_incoming_message(bound_obj, msg) 277 msgtype = type(msg_decoded) 278 handler = self._handlers.get(msgtype) 279 if handler is None: 280 raise RuntimeError(f'Got unhandled message type: {msgtype}.') 281 response = handler(bound_obj, msg_decoded) 282 assert isinstance(response, Response | None) 283 return self.encode_user_response(bound_obj, msg_decoded, response) 284 285 except Exception as exc: 286 if raise_unregistered and isinstance( 287 exc, UnregisteredMessageIDError 288 ): 289 raise 290 rstr, dolog = self.encode_error_response( 291 bound_obj, msg_decoded, exc 292 ) 293 if dolog: 294 if msg_decoded is not None: 295 msgtype = type(msg_decoded) 296 logging.exception( 297 'Error handling %s.%s message.', 298 msgtype.__module__, 299 msgtype.__qualname__, 300 ) 301 else: 302 logging.exception( 303 'Error handling raw efro.message' 304 ' (likely a message format incompatibility): %s.', 305 msg, 306 ) 307 return rstr
Decode, handle, and return an response for a message.
if 'raise_unregistered' is True, will raise an UnregisteredMessageIDError for messages not handled by the protocol. In all other cases local errors will translate to error responses returned to the sender.
309 def handle_raw_message_async( 310 self, bound_obj: Any, msg: str, raise_unregistered: bool = False 311 ) -> Awaitable[str]: 312 """Should be called when the receiver gets a message. 313 314 The return value is the raw response to the message. 315 """ 316 317 # Note: This call is synchronous so that the first part of it can 318 # happen synchronously. If the whole call were async we wouldn't be 319 # able to guarantee that messages handlers would be called in the 320 # order the messages were received. 321 322 assert self.is_async, "Can't call async handler on sync receiver." 323 msg_decoded: Message | None = None 324 try: 325 msg_decoded = self._decode_incoming_message(bound_obj, msg) 326 msgtype = type(msg_decoded) 327 handler = self._handlers.get(msgtype) 328 if handler is None: 329 raise RuntimeError(f'Got unhandled message type: {msgtype}.') 330 handler_awaitable = handler(bound_obj, msg_decoded) 331 332 except Exception as exc: 333 if raise_unregistered and isinstance( 334 exc, UnregisteredMessageIDError 335 ): 336 raise 337 return self._handle_raw_message_async_error( 338 bound_obj, msg, msg_decoded, exc 339 ) 340 341 # Return an awaitable to handle the rest asynchronously. 342 return self._handle_raw_message_async( 343 bound_obj, msg, msg_decoded, handler_awaitable 344 )
Should be called when the receiver gets a message.
The return value is the raw response to the message.
400class BoundMessageReceiver: 401 """Base bound receiver class.""" 402 403 def __init__( 404 self, 405 obj: Any, 406 receiver: MessageReceiver, 407 ) -> None: 408 assert obj is not None 409 self._obj = obj 410 self._receiver = receiver 411 412 @property 413 def protocol(self) -> MessageProtocol: 414 """Protocol associated with this receiver.""" 415 return self._receiver.protocol 416 417 def encode_error_response(self, exc: Exception) -> str: 418 """Given an error, return a response ready to send. 419 420 This should be used for any errors that happen outside of 421 standard handle_raw_message calls. Any errors within those 422 calls will be automatically returned as encoded strings. 423 """ 424 # Passing None for Message here; we would only have that available 425 # for things going wrong in the handler (which this is not for). 426 return self._receiver.encode_error_response(self._obj, None, exc)[0]
Base bound receiver class.
412 @property 413 def protocol(self) -> MessageProtocol: 414 """Protocol associated with this receiver.""" 415 return self._receiver.protocol
Protocol associated with this receiver.
417 def encode_error_response(self, exc: Exception) -> str: 418 """Given an error, return a response ready to send. 419 420 This should be used for any errors that happen outside of 421 standard handle_raw_message calls. Any errors within those 422 calls will be automatically returned as encoded strings. 423 """ 424 # Passing None for Message here; we would only have that available 425 # for things going wrong in the handler (which this is not for). 426 return self._receiver.encode_error_response(self._obj, None, exc)[0]
Given an error, return a response ready to send.
This should be used for any errors that happen outside of standard handle_raw_message calls. Any errors within those calls will be automatically returned as encoded strings.
18def create_sender_module( 19 basename: str, 20 protocol_create_code: str, 21 enable_sync_sends: bool, 22 enable_async_sends: bool, 23 private: bool = False, 24 protocol_module_level_import_code: str | None = None, 25 build_time_protocol_create_code: str | None = None, 26) -> str: 27 """Create a Python module defining a MessageSender subclass. 28 29 This class is primarily for type checking and will contain overrides 30 for the varieties of send calls for message/response types defined 31 in the protocol. 32 33 Code passed for 'protocol_create_code' should import necessary 34 modules and assign an instance of the Protocol to a 'protocol' 35 variable. 36 37 Class names are based on basename; a basename 'FooSender' will 38 result in classes FooSender and BoundFooSender. 39 40 If 'private' is True, class-names will be prefixed with an '_'. 41 42 Note: output code may have long lines and should generally be run 43 through a formatter. We should perhaps move this functionality to 44 efrotools so we can include that functionality inline. 45 """ 46 protocol = _protocol_from_code( 47 build_time_protocol_create_code 48 if build_time_protocol_create_code is not None 49 else protocol_create_code 50 ) 51 return protocol.do_create_sender_module( 52 basename=basename, 53 protocol_create_code=protocol_create_code, 54 enable_sync_sends=enable_sync_sends, 55 enable_async_sends=enable_async_sends, 56 private=private, 57 protocol_module_level_import_code=protocol_module_level_import_code, 58 )
Create a Python module defining a MessageSender subclass.
This class is primarily for type checking and will contain overrides for the varieties of send calls for message/response types defined in the protocol.
Code passed for 'protocol_create_code' should import necessary modules and assign an instance of the Protocol to a 'protocol' variable.
Class names are based on basename; a basename 'FooSender' will result in classes FooSender and BoundFooSender.
If 'private' is True, class-names will be prefixed with an '_'.
Note: output code may have long lines and should generally be run through a formatter. We should perhaps move this functionality to efrotools so we can include that functionality inline.
61def create_receiver_module( 62 basename: str, 63 protocol_create_code: str, 64 is_async: bool, 65 private: bool = False, 66 protocol_module_level_import_code: str | None = None, 67 build_time_protocol_create_code: str | None = None, 68) -> str: 69 """ "Create a Python module defining a MessageReceiver subclass. 70 71 This class is primarily for type checking and will contain overrides 72 for the register method for message/response types defined in 73 the protocol. 74 75 Class names are based on basename; a basename 'FooReceiver' will 76 result in FooReceiver and BoundFooReceiver. 77 78 If 'is_async' is True, handle_raw_message() will be an async method 79 and the @handler decorator will expect async methods. 80 81 If 'private' is True, class-names will be prefixed with an '_'. 82 83 Note that line lengths are not clipped, so output may need to be 84 run through a formatter to prevent lint warnings about excessive 85 line lengths. 86 """ 87 protocol = _protocol_from_code( 88 build_time_protocol_create_code 89 if build_time_protocol_create_code is not None 90 else protocol_create_code 91 ) 92 return protocol.do_create_receiver_module( 93 basename=basename, 94 protocol_create_code=protocol_create_code, 95 is_async=is_async, 96 private=private, 97 protocol_module_level_import_code=protocol_module_level_import_code, 98 )
"Create a Python module defining a MessageReceiver subclass.
This class is primarily for type checking and will contain overrides for the register method for message/response types defined in the protocol.
Class names are based on basename; a basename 'FooReceiver' will result in FooReceiver and BoundFooReceiver.
If 'is_async' is True, handle_raw_message() will be an async method and the @handler decorator will expect async methods.
If 'private' is True, class-names will be prefixed with an '_'.
Note that line lengths are not clipped, so output may need to be run through a formatter to prevent lint warnings about excessive line lengths.
20class UnregisteredMessageIDError(Exception): 21 """A message or response id is not covered by our protocol."""
A message or response id is not covered by our protocol.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- add_note
- args