efro.message
Functionality for sending and responding to messages. Supports static typing for message types and possible return types.
1# Released under the MIT License. See LICENSE for details. 2# 3"""Functionality for sending and responding to messages. 4Supports static typing for message types and possible return types. 5""" 6 7from __future__ import annotations 8 9from efro.util import set_canonical_module_names 10from efro.message._protocol import MessageProtocol 11from efro.message._sender import MessageSender, BoundMessageSender 12from efro.message._receiver import MessageReceiver, BoundMessageReceiver 13from efro.message._module import create_sender_module, create_receiver_module 14from efro.message._message import ( 15 Message, 16 Response, 17 SysResponse, 18 EmptySysResponse, 19 ErrorSysResponse, 20 StringResponse, 21 BoolResponse, 22 UnregisteredMessageIDError, 23) 24 25__all__ = [ 26 'Message', 27 'Response', 28 'SysResponse', 29 'EmptySysResponse', 30 'ErrorSysResponse', 31 'StringResponse', 32 'BoolResponse', 33 'MessageProtocol', 34 'MessageSender', 35 'BoundMessageSender', 36 'MessageReceiver', 37 'BoundMessageReceiver', 38 'create_sender_module', 39 'create_receiver_module', 40 'UnregisteredMessageIDError', 41] 42 43# Have these things present themselves cleanly as 'thismodule.SomeClass' 44# instead of 'thismodule._internalmodule.SomeClass' 45set_canonical_module_names(globals())
24class Message: 25 """Base class for messages.""" 26 27 @classmethod 28 def get_response_types(cls) -> list[type[Response] | None]: 29 """Return all Response types this Message can return when sent. 30 31 The default implementation specifies a None return type. 32 """ 33 return [None]
Base class for messages.
27 @classmethod 28 def get_response_types(cls) -> list[type[Response] | None]: 29 """Return all Response types this Message can return when sent. 30 31 The default implementation specifies a None return type. 32 """ 33 return [None]
Return all Response types this Message can return when sent.
The default implementation specifies a None return type.
Base class for responses to messages.
40class SysResponse: 41 """Base class for system-responses to messages. 42 43 These are only sent/handled by the messaging system itself; 44 users of the api never see them. 45 """ 46 47 def set_local_exception(self, exc: Exception) -> None: 48 """Attach a local exception to facilitate better logging/handling. 49 50 Be aware that this data does not get serialized and only 51 exists on the local object. 52 """ 53 setattr(self, '_sr_local_exception', exc) 54 55 def get_local_exception(self) -> Exception | None: 56 """Fetch a local attached exception.""" 57 value = getattr(self, '_sr_local_exception', None) 58 assert isinstance(value, Exception | None) 59 return value
Base class for system-responses to messages.
These are only sent/handled by the messaging system itself; users of the api never see them.
47 def set_local_exception(self, exc: Exception) -> None: 48 """Attach a local exception to facilitate better logging/handling. 49 50 Be aware that this data does not get serialized and only 51 exists on the local object. 52 """ 53 setattr(self, '_sr_local_exception', exc)
Attach a local exception to facilitate better logging/handling.
Be aware that this data does not get serialized and only exists on the local object.
86@ioprepped 87@dataclass 88class EmptySysResponse(SysResponse): 89 """The response equivalent of None."""
The response equivalent of None.
Inherited Members
65@ioprepped 66@dataclass 67class ErrorSysResponse(SysResponse): 68 """SysResponse saying some error has occurred for the send. 69 70 This generally results in an Exception being raised for the caller. 71 """ 72 73 class ErrorType(Enum): 74 """Type of error that occurred while sending a message.""" 75 76 REMOTE = 0 77 REMOTE_CLEAN = 1 78 LOCAL = 2 79 COMMUNICATION = 3 80 REMOTE_COMMUNICATION = 4 81 82 error_message: Annotated[str, IOAttrs('m')] 83 error_type: Annotated[ErrorType, IOAttrs('e')] = ErrorType.REMOTE
SysResponse saying some error has occurred for the send.
This generally results in an Exception being raised for the caller.
Inherited Members
73 class ErrorType(Enum): 74 """Type of error that occurred while sending a message.""" 75 76 REMOTE = 0 77 REMOTE_CLEAN = 1 78 LOCAL = 2 79 COMMUNICATION = 3 80 REMOTE_COMMUNICATION = 4
Type of error that occurred while sending a message.
Inherited Members
- enum.Enum
- name
- value
104@ioprepped 105@dataclass 106class StringResponse(Response): 107 """A simple string value response.""" 108 109 value: Annotated[str, IOAttrs('v')]
A simple string value response.
96@ioprepped 97@dataclass 98class BoolResponse(Response): 99 """A simple bool value response.""" 100 101 value: Annotated[bool, IOAttrs('v')]
A simple bool value response.
33class MessageProtocol: 34 """Wrangles a set of message types, formats, and response types. 35 Both endpoints must be using a compatible Protocol for communication 36 to succeed. To maintain Protocol compatibility between revisions, 37 all message types must retain the same id, message attr storage 38 names must not change, newly added attrs must have default values, 39 etc. 40 """ 41 42 def __init__( 43 self, 44 message_types: dict[int, type[Message]], 45 response_types: dict[int, type[Response]], 46 *, 47 forward_communication_errors: bool = False, 48 forward_clean_errors: bool = False, 49 remote_errors_include_stack_traces: bool = False, 50 log_errors_on_receiver: bool = True, 51 ) -> None: 52 """Create a protocol with a given configuration. 53 54 If 'forward_communication_errors' is True, 55 efro.error.CommunicationErrors raised on the receiver end will 56 result in a matching error raised back on the sender. This can 57 be useful if the receiver will be in some way forwarding 58 messages along and the sender doesn't need to know where 59 communication breakdowns occurred; only that they did. 60 61 If 'forward_clean_errors' is True, efro.error.CleanError 62 exceptions raised on the receiver end will result in a matching 63 CleanError raised back on the sender. 64 65 When an exception is not covered by the optional forwarding 66 mechanisms above, it will come across as efro.error.RemoteError 67 and the exception will be logged on the receiver end - at least 68 by default (see details below). 69 70 If 'remote_errors_include_stack_traces' is True, stringified 71 stack traces will be returned with efro.error.RemoteError 72 exceptions. This is useful for debugging but should only be 73 enabled in cases where the sender is trusted to see internal 74 details of the receiver. 75 76 By default, when a message-handling exception will result in an 77 efro.error.RemoteError being returned to the sender, the 78 exception will be logged on the receiver. This is because the 79 goal is usually to avoid returning opaque RemoteErrors and to 80 instead return something meaningful as part of the expected 81 response type (even if that value itself represents a logical 82 error state). If 'log_errors_on_receiver' is False, however, such 83 exceptions will *not* be logged on the receiver. This can be 84 useful in combination with 'remote_errors_include_stack_traces' 85 and 'forward_clean_errors' in situations where all error 86 logging/management will be happening on the sender end. Be 87 aware, however, that in that case it may be possible for 88 communication errors to prevent such error messages from 89 ever being seen. 90 """ 91 # pylint: disable=too-many-locals 92 self.message_types_by_id: dict[int, type[Message]] = {} 93 self.message_ids_by_type: dict[type[Message], int] = {} 94 self.response_types_by_id: dict[ 95 int, type[Response] | type[SysResponse] 96 ] = {} 97 self.response_ids_by_type: dict[ 98 type[Response] | type[SysResponse], int 99 ] = {} 100 for m_id, m_type in message_types.items(): 101 # Make sure only valid message types were passed and each 102 # id was assigned only once. 103 assert isinstance(m_id, int) 104 assert m_id >= 0 105 assert is_ioprepped_dataclass(m_type) and issubclass( 106 m_type, Message 107 ) 108 assert self.message_types_by_id.get(m_id) is None 109 self.message_types_by_id[m_id] = m_type 110 self.message_ids_by_type[m_type] = m_id 111 112 for r_id, r_type in response_types.items(): 113 assert isinstance(r_id, int) 114 assert r_id >= 0 115 assert is_ioprepped_dataclass(r_type) and issubclass( 116 r_type, Response 117 ) 118 assert self.response_types_by_id.get(r_id) is None 119 self.response_types_by_id[r_id] = r_type 120 self.response_ids_by_type[r_type] = r_id 121 122 # Register our SysResponse types. These use negative 123 # IDs so as to never overlap with user Response types. 124 def _reg_sys(reg_tp: type[SysResponse], reg_id: int) -> None: 125 assert self.response_types_by_id.get(reg_id) is None 126 self.response_types_by_id[reg_id] = reg_tp 127 self.response_ids_by_type[reg_tp] = reg_id 128 129 _reg_sys(ErrorSysResponse, -1) 130 _reg_sys(EmptySysResponse, -2) 131 132 # Some extra-thorough validation in debug mode. 133 if __debug__: 134 # Make sure all Message types' return types are valid 135 # and have been assigned an ID as well. 136 all_response_types: set[type[Response] | None] = set() 137 for m_id, m_type in message_types.items(): 138 m_rtypes = m_type.get_response_types() 139 140 assert isinstance(m_rtypes, list) 141 assert ( 142 m_rtypes 143 ), f'Message type {m_type} specifies no return types.' 144 assert len(set(m_rtypes)) == len(m_rtypes) # check dups 145 for m_rtype in m_rtypes: 146 all_response_types.add(m_rtype) 147 for cls in all_response_types: 148 if cls is None: 149 continue 150 assert is_ioprepped_dataclass(cls) 151 assert issubclass(cls, Response) 152 if cls not in self.response_ids_by_type: 153 raise ValueError( 154 f'Possible response type {cls} needs to be included' 155 f' in response_types for this protocol.' 156 ) 157 158 # Make sure all registered types have unique base names. 159 # We can take advantage of this to generate cleaner looking 160 # protocol modules. Can revisit if this is ever a problem. 161 mtypenames = set(tp.__name__ for tp in self.message_ids_by_type) 162 if len(mtypenames) != len(message_types): 163 raise ValueError( 164 'message_types contains duplicate __name__s;' 165 ' all types are required to have unique names.' 166 ) 167 168 self.forward_clean_errors = forward_clean_errors 169 self.forward_communication_errors = forward_communication_errors 170 self.remote_errors_include_stack_traces = ( 171 remote_errors_include_stack_traces 172 ) 173 self.log_errors_on_receiver = log_errors_on_receiver 174 175 @staticmethod 176 def encode_dict(obj: dict) -> str: 177 """Json-encode a provided dict.""" 178 return json.dumps(obj, separators=(',', ':')) 179 180 def message_to_dict(self, message: Message) -> dict: 181 """Encode a message to a json ready dict.""" 182 return self._to_dict(message, self.message_ids_by_type, 'message') 183 184 def response_to_dict(self, response: Response | SysResponse) -> dict: 185 """Encode a response to a json ready dict.""" 186 return self._to_dict(response, self.response_ids_by_type, 'response') 187 188 def error_to_response(self, exc: Exception) -> tuple[SysResponse, bool]: 189 """Translate an Exception to a SysResponse. 190 191 Also returns whether the error should be logged if this happened 192 within handle_raw_message(). 193 """ 194 195 # If anything goes wrong, return a ErrorSysResponse instead. 196 # (either CLEAN or generic REMOTE) 197 if self.forward_clean_errors and isinstance(exc, CleanError): 198 return ( 199 ErrorSysResponse( 200 error_message=str(exc), 201 error_type=ErrorSysResponse.ErrorType.REMOTE_CLEAN, 202 ), 203 False, 204 ) 205 if self.forward_communication_errors and isinstance( 206 exc, CommunicationError 207 ): 208 return ( 209 ErrorSysResponse( 210 error_message=str(exc), 211 error_type=ErrorSysResponse.ErrorType.REMOTE_COMMUNICATION, 212 ), 213 False, 214 ) 215 return ( 216 ErrorSysResponse( 217 error_message=( 218 # Note: need to format exception ourself here; it 219 # might not be current so we can't use 220 # traceback.format_exc(). 221 ''.join( 222 traceback.format_exception( 223 type(exc), exc, exc.__traceback__ 224 ) 225 ) 226 if self.remote_errors_include_stack_traces 227 else 'An internal error has occurred.' 228 ), 229 error_type=ErrorSysResponse.ErrorType.REMOTE, 230 ), 231 self.log_errors_on_receiver, 232 ) 233 234 def _to_dict( 235 self, message: Any, ids_by_type: dict[type, int], opname: str 236 ) -> dict: 237 """Encode a message to a json string for transport.""" 238 239 m_id: int | None = ids_by_type.get(type(message)) 240 if m_id is None: 241 raise TypeError( 242 f'{opname} type is not registered in protocol:' 243 f' {type(message)}' 244 ) 245 out = {'t': m_id, 'm': dataclass_to_dict(message)} 246 return out 247 248 @staticmethod 249 def decode_dict(data: str) -> dict: 250 """Decode data to a dict.""" 251 out = json.loads(data) 252 assert isinstance(out, dict) 253 return out 254 255 def message_from_dict(self, data: dict) -> Message: 256 """Decode a message from a json string.""" 257 out = self._from_dict(data, self.message_types_by_id, 'message') 258 assert isinstance(out, Message) 259 return out 260 261 def response_from_dict(self, data: dict) -> Response | SysResponse: 262 """Decode a response from a json string.""" 263 out = self._from_dict(data, self.response_types_by_id, 'response') 264 assert isinstance(out, Response | SysResponse) 265 return out 266 267 # Weeeird; we get mypy errors returning dict[int, type] but 268 # dict[int, typing.Type] or dict[int, type[Any]] works.. 269 def _from_dict( 270 self, data: dict, types_by_id: dict[int, type[Any]], opname: str 271 ) -> Any: 272 """Decode a message from a json string.""" 273 msgdict: dict | None 274 275 m_id = data.get('t') 276 # Allow omitting 'm' dict if its empty. 277 msgdict = data.get('m', {}) 278 279 assert isinstance(m_id, int) 280 assert isinstance(msgdict, dict) 281 282 # Decode this particular type. 283 msgtype = types_by_id.get(m_id) 284 if msgtype is None: 285 raise UnregisteredMessageIDError( 286 f'Got unregistered {opname} id of {m_id}.' 287 ) 288 return dataclass_from_dict(msgtype, msgdict) 289 290 def _get_module_header( 291 self, 292 part: Literal['sender', 'receiver'], 293 extra_import_code: str | None, 294 enable_async_sends: bool, 295 ) -> str: 296 """Return common parts of generated modules.""" 297 # pylint: disable=too-many-locals 298 # pylint: disable=too-many-branches 299 # pylint: disable=too-many-statements 300 import textwrap 301 302 tpimports: dict[str, list[str]] = {} 303 imports: dict[str, list[str]] = {} 304 305 single_message_type = len(self.message_ids_by_type) == 1 306 307 msgtypes = list(self.message_ids_by_type) 308 if part == 'sender': 309 msgtypes.append(Message) 310 for msgtype in msgtypes: 311 tpimports.setdefault(msgtype.__module__, []).append( 312 msgtype.__name__ 313 ) 314 rsptypes = list(self.response_ids_by_type) 315 if part == 'sender': 316 rsptypes.append(Response) 317 for rsp_tp in rsptypes: 318 # Skip these as they don't actually show up in code. 319 if rsp_tp is EmptySysResponse or rsp_tp is ErrorSysResponse: 320 continue 321 if ( 322 single_message_type 323 and part == 'sender' 324 and rsp_tp is not Response 325 ): 326 # We need to cast to the single supported response type 327 # in this case so need response types at runtime. 328 imports.setdefault(rsp_tp.__module__, []).append( 329 rsp_tp.__name__ 330 ) 331 else: 332 tpimports.setdefault(rsp_tp.__module__, []).append( 333 rsp_tp.__name__ 334 ) 335 336 import_lines = '' 337 tpimport_lines = '' 338 339 for module, names in sorted(imports.items()): 340 jnames = ', '.join(names) 341 line = f'from {module} import {jnames}' 342 if len(line) > 80: 343 # Recreate in a wrapping-friendly form. 344 line = f'from {module} import ({jnames})' 345 import_lines += f'{line}\n' 346 for module, names in sorted(tpimports.items()): 347 jnames = ', '.join(names) 348 line = f'from {module} import {jnames}' 349 if len(line) > 75: # Account for indent 350 # Recreate in a wrapping-friendly form. 351 line = f'from {module} import ({jnames})' 352 tpimport_lines += f'{line}\n' 353 354 if part == 'sender': 355 import_lines += ( 356 'from efro.message import MessageSender, BoundMessageSender\n' 357 ) 358 tpimport_typing_extras = '' 359 else: 360 if single_message_type: 361 import_lines += ( 362 'from efro.message import (MessageReceiver,' 363 ' BoundMessageReceiver, Message, Response)\n' 364 ) 365 else: 366 import_lines += ( 367 'from efro.message import MessageReceiver,' 368 ' BoundMessageReceiver\n' 369 ) 370 tpimport_typing_extras = ', Awaitable' 371 372 if extra_import_code is not None: 373 import_lines += extra_import_code 374 375 ovld = ', overload' if not single_message_type else '' 376 ovld2 = ( 377 ', cast, Awaitable' 378 if (single_message_type and part == 'sender' and enable_async_sends) 379 else '' 380 ) 381 tpimport_lines = textwrap.indent(tpimport_lines, ' ') 382 383 baseimps = ['Any'] 384 if part == 'receiver': 385 baseimps.append('Callable') 386 if part == 'sender' and enable_async_sends: 387 baseimps.append('Awaitable') 388 baseimps_s = ', '.join(baseimps) 389 out = ( 390 '# Released under the MIT License. See LICENSE for details.\n' 391 f'#\n' 392 f'"""Auto-generated {part} module. Do not edit by hand."""\n' 393 f'\n' 394 f'from __future__ import annotations\n' 395 f'\n' 396 f'from typing import TYPE_CHECKING{ovld}{ovld2}\n' 397 f'\n' 398 f'{import_lines}' 399 f'\n' 400 f'if TYPE_CHECKING:\n' 401 f' from typing import {baseimps_s}' 402 f'{tpimport_typing_extras}\n' 403 f'{tpimport_lines}' 404 f'\n' 405 f'\n' 406 ) 407 return out 408 409 def do_create_sender_module( 410 self, 411 basename: str, 412 protocol_create_code: str, 413 enable_sync_sends: bool, 414 enable_async_sends: bool, 415 private: bool = False, 416 protocol_module_level_import_code: str | None = None, 417 ) -> str: 418 """Used by create_sender_module(); do not call directly.""" 419 # pylint: disable=too-many-positional-arguments 420 # pylint: disable=too-many-locals 421 # pylint: disable=too-many-branches 422 import textwrap 423 424 msgtypes = list(self.message_ids_by_type.keys()) 425 426 ppre = '_' if private else '' 427 out = self._get_module_header( 428 'sender', 429 extra_import_code=protocol_module_level_import_code, 430 enable_async_sends=enable_async_sends, 431 ) 432 ccind = textwrap.indent(protocol_create_code, ' ') 433 out += ( 434 f'class {ppre}{basename}(MessageSender):\n' 435 f' """Protocol-specific sender."""\n' 436 f'\n' 437 f' def __init__(self) -> None:\n' 438 f'{ccind}\n' 439 f' super().__init__(protocol)\n' 440 f'\n' 441 f' def __get__(\n' 442 f' self, obj: Any, type_in: Any = None\n' 443 f' ) -> {ppre}Bound{basename}:\n' 444 f' return {ppre}Bound{basename}(obj, self)\n' 445 f'\n' 446 f'\n' 447 f'class {ppre}Bound{basename}(BoundMessageSender):\n' 448 f' """Protocol-specific bound sender."""\n' 449 ) 450 451 def _filt_tp_name(rtype: type[Response] | None) -> str: 452 return 'None' if rtype is None else rtype.__name__ 453 454 # Define handler() overloads for all registered message types. 455 if msgtypes: 456 for async_pass in False, True: 457 if async_pass and not enable_async_sends: 458 continue 459 if not async_pass and not enable_sync_sends: 460 continue 461 pfx = 'async ' if async_pass else '' 462 sfx = '_async' if async_pass else '' 463 # awt = 'await ' if async_pass else '' 464 awt = '' 465 how = 'asynchronously' if async_pass else 'synchronously' 466 467 if len(msgtypes) == 1: 468 # Special case: with a single message types we don't 469 # use overloads. 470 msgtype = msgtypes[0] 471 msgtypevar = msgtype.__name__ 472 rtypes = msgtype.get_response_types() 473 if len(rtypes) > 1: 474 rtypevar = ' | '.join(_filt_tp_name(t) for t in rtypes) 475 else: 476 rtypevar = _filt_tp_name(rtypes[0]) 477 if async_pass: 478 rtypevar = f'Awaitable[{rtypevar}]' 479 out += ( 480 f'\n' 481 f' def send{sfx}(self,' 482 f' message: {msgtypevar})' 483 f' -> {rtypevar}:\n' 484 f' """Send a message {how}."""\n' 485 f' out = {awt}self._sender.' 486 f'send{sfx}(self._obj, message)\n' 487 ) 488 if not async_pass: 489 out += ( 490 f' assert isinstance(out, {rtypevar})\n' 491 ' return out\n' 492 ) 493 else: 494 out += f' return cast({rtypevar}, out)\n' 495 496 else: 497 for msgtype in msgtypes: 498 msgtypevar = msgtype.__name__ 499 rtypes = msgtype.get_response_types() 500 if len(rtypes) > 1: 501 rtypevar = ' | '.join( 502 _filt_tp_name(t) for t in rtypes 503 ) 504 else: 505 rtypevar = _filt_tp_name(rtypes[0]) 506 out += ( 507 f'\n' 508 f' @overload\n' 509 f' {pfx}def send{sfx}(self,' 510 f' message: {msgtypevar})' 511 f' -> {rtypevar}: ...\n' 512 ) 513 rtypevar = 'Response | None' 514 if async_pass: 515 rtypevar = f'Awaitable[{rtypevar}]' 516 out += ( 517 f'\n' 518 f' def send{sfx}(self, message: Message)' 519 f' -> {rtypevar}:\n' 520 f' """Send a message {how}."""\n' 521 f' return {awt}self._sender.' 522 f'send{sfx}(self._obj, message)\n' 523 ) 524 525 return out 526 527 def do_create_receiver_module( 528 self, 529 basename: str, 530 protocol_create_code: str, 531 is_async: bool, 532 private: bool = False, 533 protocol_module_level_import_code: str | None = None, 534 ) -> str: 535 """Used by create_receiver_module(); do not call directly.""" 536 # pylint: disable=too-many-locals 537 # pylint: disable=too-many-positional-arguments 538 import textwrap 539 540 desc = 'asynchronous' if is_async else 'synchronous' 541 ppre = '_' if private else '' 542 msgtypes = list(self.message_ids_by_type.keys()) 543 out = self._get_module_header( 544 'receiver', 545 extra_import_code=protocol_module_level_import_code, 546 enable_async_sends=False, 547 ) 548 ccind = textwrap.indent(protocol_create_code, ' ') 549 out += ( 550 f'class {ppre}{basename}(MessageReceiver):\n' 551 f' """Protocol-specific {desc} receiver."""\n' 552 f'\n' 553 f' is_async = {is_async}\n' 554 f'\n' 555 f' def __init__(self) -> None:\n' 556 f'{ccind}\n' 557 f' super().__init__(protocol)\n' 558 f'\n' 559 f' def __get__(\n' 560 f' self,\n' 561 f' obj: Any,\n' 562 f' type_in: Any = None,\n' 563 f' ) -> {ppre}Bound{basename}:\n' 564 f' return {ppre}Bound{basename}(' 565 f'obj, self)\n' 566 ) 567 568 # Define handler() overloads for all registered message types. 569 570 def _filt_tp_name(rtype: type[Response] | None) -> str: 571 return 'None' if rtype is None else rtype.__name__ 572 573 if msgtypes: 574 cbgn = 'Awaitable[' if is_async else '' 575 cend = ']' if is_async else '' 576 if len(msgtypes) == 1: 577 # Special case: when we have a single message type we don't 578 # use overloads. 579 msgtype = msgtypes[0] 580 msgtypevar = msgtype.__name__ 581 rtypes = msgtype.get_response_types() 582 if len(rtypes) > 1: 583 rtypevar = ' | '.join(_filt_tp_name(t) for t in rtypes) 584 else: 585 rtypevar = _filt_tp_name(rtypes[0]) 586 rtypevar = f'{cbgn}{rtypevar}{cend}' 587 out += ( 588 f'\n' 589 f' def handler(\n' 590 f' self,\n' 591 f' call: Callable[[Any, {msgtypevar}], ' 592 f'{rtypevar}],\n' 593 f' )' 594 f' -> Callable[[Any, {msgtypevar}], {rtypevar}]:\n' 595 f' """Decorator to register message handlers."""\n' 596 f' from typing import cast, Callable, Any\n' 597 f'\n' 598 f' self.register_handler(cast(Callable' 599 f'[[Any, Message], Response], call))\n' 600 f' return call\n' 601 ) 602 else: 603 for msgtype in msgtypes: 604 msgtypevar = msgtype.__name__ 605 rtypes = msgtype.get_response_types() 606 if len(rtypes) > 1: 607 rtypevar = ' | '.join(_filt_tp_name(t) for t in rtypes) 608 else: 609 rtypevar = _filt_tp_name(rtypes[0]) 610 rtypevar = f'{cbgn}{rtypevar}{cend}' 611 out += ( 612 f'\n' 613 f' @overload\n' 614 f' def handler(\n' 615 f' self,\n' 616 f' call: Callable[[Any, {msgtypevar}], ' 617 f'{rtypevar}],\n' 618 f' )' 619 f' -> Callable[[Any, {msgtypevar}], {rtypevar}]: ...\n' 620 ) 621 out += ( 622 '\n' 623 ' def handler(self, call: Callable) -> Callable:\n' 624 ' """Decorator to register message handlers."""\n' 625 ' self.register_handler(call)\n' 626 ' return call\n' 627 ) 628 629 out += ( 630 f'\n' 631 f'\n' 632 f'class {ppre}Bound{basename}(BoundMessageReceiver):\n' 633 f' """Protocol-specific bound receiver."""\n' 634 ) 635 if is_async: 636 out += ( 637 '\n' 638 ' def handle_raw_message(\n' 639 ' self, message: str, raise_unregistered: bool = False\n' 640 ' ) -> Awaitable[str]:\n' 641 ' """Asynchronously handle a raw incoming message."""\n' 642 ' return self._receiver.' 643 'handle_raw_message_async(\n' 644 ' self._obj, message, raise_unregistered\n' 645 ' )\n' 646 ) 647 648 else: 649 out += ( 650 '\n' 651 ' def handle_raw_message(\n' 652 ' self, message: str, raise_unregistered: bool = False\n' 653 ' ) -> str:\n' 654 ' """Synchronously handle a raw incoming message."""\n' 655 ' return self._receiver.handle_raw_message(\n' 656 ' self._obj, message, raise_unregistered\n' 657 ' )\n' 658 ) 659 660 return out
Wrangles a set of message types, formats, and response types. Both endpoints must be using a compatible Protocol for communication to succeed. To maintain Protocol compatibility between revisions, all message types must retain the same id, message attr storage names must not change, newly added attrs must have default values, etc.
42 def __init__( 43 self, 44 message_types: dict[int, type[Message]], 45 response_types: dict[int, type[Response]], 46 *, 47 forward_communication_errors: bool = False, 48 forward_clean_errors: bool = False, 49 remote_errors_include_stack_traces: bool = False, 50 log_errors_on_receiver: bool = True, 51 ) -> None: 52 """Create a protocol with a given configuration. 53 54 If 'forward_communication_errors' is True, 55 efro.error.CommunicationErrors raised on the receiver end will 56 result in a matching error raised back on the sender. This can 57 be useful if the receiver will be in some way forwarding 58 messages along and the sender doesn't need to know where 59 communication breakdowns occurred; only that they did. 60 61 If 'forward_clean_errors' is True, efro.error.CleanError 62 exceptions raised on the receiver end will result in a matching 63 CleanError raised back on the sender. 64 65 When an exception is not covered by the optional forwarding 66 mechanisms above, it will come across as efro.error.RemoteError 67 and the exception will be logged on the receiver end - at least 68 by default (see details below). 69 70 If 'remote_errors_include_stack_traces' is True, stringified 71 stack traces will be returned with efro.error.RemoteError 72 exceptions. This is useful for debugging but should only be 73 enabled in cases where the sender is trusted to see internal 74 details of the receiver. 75 76 By default, when a message-handling exception will result in an 77 efro.error.RemoteError being returned to the sender, the 78 exception will be logged on the receiver. This is because the 79 goal is usually to avoid returning opaque RemoteErrors and to 80 instead return something meaningful as part of the expected 81 response type (even if that value itself represents a logical 82 error state). If 'log_errors_on_receiver' is False, however, such 83 exceptions will *not* be logged on the receiver. This can be 84 useful in combination with 'remote_errors_include_stack_traces' 85 and 'forward_clean_errors' in situations where all error 86 logging/management will be happening on the sender end. Be 87 aware, however, that in that case it may be possible for 88 communication errors to prevent such error messages from 89 ever being seen. 90 """ 91 # pylint: disable=too-many-locals 92 self.message_types_by_id: dict[int, type[Message]] = {} 93 self.message_ids_by_type: dict[type[Message], int] = {} 94 self.response_types_by_id: dict[ 95 int, type[Response] | type[SysResponse] 96 ] = {} 97 self.response_ids_by_type: dict[ 98 type[Response] | type[SysResponse], int 99 ] = {} 100 for m_id, m_type in message_types.items(): 101 # Make sure only valid message types were passed and each 102 # id was assigned only once. 103 assert isinstance(m_id, int) 104 assert m_id >= 0 105 assert is_ioprepped_dataclass(m_type) and issubclass( 106 m_type, Message 107 ) 108 assert self.message_types_by_id.get(m_id) is None 109 self.message_types_by_id[m_id] = m_type 110 self.message_ids_by_type[m_type] = m_id 111 112 for r_id, r_type in response_types.items(): 113 assert isinstance(r_id, int) 114 assert r_id >= 0 115 assert is_ioprepped_dataclass(r_type) and issubclass( 116 r_type, Response 117 ) 118 assert self.response_types_by_id.get(r_id) is None 119 self.response_types_by_id[r_id] = r_type 120 self.response_ids_by_type[r_type] = r_id 121 122 # Register our SysResponse types. These use negative 123 # IDs so as to never overlap with user Response types. 124 def _reg_sys(reg_tp: type[SysResponse], reg_id: int) -> None: 125 assert self.response_types_by_id.get(reg_id) is None 126 self.response_types_by_id[reg_id] = reg_tp 127 self.response_ids_by_type[reg_tp] = reg_id 128 129 _reg_sys(ErrorSysResponse, -1) 130 _reg_sys(EmptySysResponse, -2) 131 132 # Some extra-thorough validation in debug mode. 133 if __debug__: 134 # Make sure all Message types' return types are valid 135 # and have been assigned an ID as well. 136 all_response_types: set[type[Response] | None] = set() 137 for m_id, m_type in message_types.items(): 138 m_rtypes = m_type.get_response_types() 139 140 assert isinstance(m_rtypes, list) 141 assert ( 142 m_rtypes 143 ), f'Message type {m_type} specifies no return types.' 144 assert len(set(m_rtypes)) == len(m_rtypes) # check dups 145 for m_rtype in m_rtypes: 146 all_response_types.add(m_rtype) 147 for cls in all_response_types: 148 if cls is None: 149 continue 150 assert is_ioprepped_dataclass(cls) 151 assert issubclass(cls, Response) 152 if cls not in self.response_ids_by_type: 153 raise ValueError( 154 f'Possible response type {cls} needs to be included' 155 f' in response_types for this protocol.' 156 ) 157 158 # Make sure all registered types have unique base names. 159 # We can take advantage of this to generate cleaner looking 160 # protocol modules. Can revisit if this is ever a problem. 161 mtypenames = set(tp.__name__ for tp in self.message_ids_by_type) 162 if len(mtypenames) != len(message_types): 163 raise ValueError( 164 'message_types contains duplicate __name__s;' 165 ' all types are required to have unique names.' 166 ) 167 168 self.forward_clean_errors = forward_clean_errors 169 self.forward_communication_errors = forward_communication_errors 170 self.remote_errors_include_stack_traces = ( 171 remote_errors_include_stack_traces 172 ) 173 self.log_errors_on_receiver = log_errors_on_receiver
Create a protocol with a given configuration.
If 'forward_communication_errors' is True, efro.error.CommunicationErrors raised on the receiver end will result in a matching error raised back on the sender. This can be useful if the receiver will be in some way forwarding messages along and the sender doesn't need to know where communication breakdowns occurred; only that they did.
If 'forward_clean_errors' is True, efro.error.CleanError exceptions raised on the receiver end will result in a matching CleanError raised back on the sender.
When an exception is not covered by the optional forwarding mechanisms above, it will come across as efro.error.RemoteError and the exception will be logged on the receiver end - at least by default (see details below).
If 'remote_errors_include_stack_traces' is True, stringified stack traces will be returned with efro.error.RemoteError exceptions. This is useful for debugging but should only be enabled in cases where the sender is trusted to see internal details of the receiver.
By default, when a message-handling exception will result in an efro.error.RemoteError being returned to the sender, the exception will be logged on the receiver. This is because the goal is usually to avoid returning opaque RemoteErrors and to instead return something meaningful as part of the expected response type (even if that value itself represents a logical error state). If 'log_errors_on_receiver' is False, however, such exceptions will *not* be logged on the receiver. This can be useful in combination with 'remote_errors_include_stack_traces' and 'forward_clean_errors' in situations where all error logging/management will be happening on the sender end. Be aware, however, that in that case it may be possible for communication errors to prevent such error messages from ever being seen.
175 @staticmethod 176 def encode_dict(obj: dict) -> str: 177 """Json-encode a provided dict.""" 178 return json.dumps(obj, separators=(',', ':'))
Json-encode a provided dict.
180 def message_to_dict(self, message: Message) -> dict: 181 """Encode a message to a json ready dict.""" 182 return self._to_dict(message, self.message_ids_by_type, 'message')
Encode a message to a json ready dict.
184 def response_to_dict(self, response: Response | SysResponse) -> dict: 185 """Encode a response to a json ready dict.""" 186 return self._to_dict(response, self.response_ids_by_type, 'response')
Encode a response to a json ready dict.
188 def error_to_response(self, exc: Exception) -> tuple[SysResponse, bool]: 189 """Translate an Exception to a SysResponse. 190 191 Also returns whether the error should be logged if this happened 192 within handle_raw_message(). 193 """ 194 195 # If anything goes wrong, return a ErrorSysResponse instead. 196 # (either CLEAN or generic REMOTE) 197 if self.forward_clean_errors and isinstance(exc, CleanError): 198 return ( 199 ErrorSysResponse( 200 error_message=str(exc), 201 error_type=ErrorSysResponse.ErrorType.REMOTE_CLEAN, 202 ), 203 False, 204 ) 205 if self.forward_communication_errors and isinstance( 206 exc, CommunicationError 207 ): 208 return ( 209 ErrorSysResponse( 210 error_message=str(exc), 211 error_type=ErrorSysResponse.ErrorType.REMOTE_COMMUNICATION, 212 ), 213 False, 214 ) 215 return ( 216 ErrorSysResponse( 217 error_message=( 218 # Note: need to format exception ourself here; it 219 # might not be current so we can't use 220 # traceback.format_exc(). 221 ''.join( 222 traceback.format_exception( 223 type(exc), exc, exc.__traceback__ 224 ) 225 ) 226 if self.remote_errors_include_stack_traces 227 else 'An internal error has occurred.' 228 ), 229 error_type=ErrorSysResponse.ErrorType.REMOTE, 230 ), 231 self.log_errors_on_receiver, 232 )
Translate an Exception to a SysResponse.
Also returns whether the error should be logged if this happened within handle_raw_message().
248 @staticmethod 249 def decode_dict(data: str) -> dict: 250 """Decode data to a dict.""" 251 out = json.loads(data) 252 assert isinstance(out, dict) 253 return out
Decode data to a dict.
255 def message_from_dict(self, data: dict) -> Message: 256 """Decode a message from a json string.""" 257 out = self._from_dict(data, self.message_types_by_id, 'message') 258 assert isinstance(out, Message) 259 return out
Decode a message from a json string.
261 def response_from_dict(self, data: dict) -> Response | SysResponse: 262 """Decode a response from a json string.""" 263 out = self._from_dict(data, self.response_types_by_id, 'response') 264 assert isinstance(out, Response | SysResponse) 265 return out
Decode a response from a json string.
409 def do_create_sender_module( 410 self, 411 basename: str, 412 protocol_create_code: str, 413 enable_sync_sends: bool, 414 enable_async_sends: bool, 415 private: bool = False, 416 protocol_module_level_import_code: str | None = None, 417 ) -> str: 418 """Used by create_sender_module(); do not call directly.""" 419 # pylint: disable=too-many-positional-arguments 420 # pylint: disable=too-many-locals 421 # pylint: disable=too-many-branches 422 import textwrap 423 424 msgtypes = list(self.message_ids_by_type.keys()) 425 426 ppre = '_' if private else '' 427 out = self._get_module_header( 428 'sender', 429 extra_import_code=protocol_module_level_import_code, 430 enable_async_sends=enable_async_sends, 431 ) 432 ccind = textwrap.indent(protocol_create_code, ' ') 433 out += ( 434 f'class {ppre}{basename}(MessageSender):\n' 435 f' """Protocol-specific sender."""\n' 436 f'\n' 437 f' def __init__(self) -> None:\n' 438 f'{ccind}\n' 439 f' super().__init__(protocol)\n' 440 f'\n' 441 f' def __get__(\n' 442 f' self, obj: Any, type_in: Any = None\n' 443 f' ) -> {ppre}Bound{basename}:\n' 444 f' return {ppre}Bound{basename}(obj, self)\n' 445 f'\n' 446 f'\n' 447 f'class {ppre}Bound{basename}(BoundMessageSender):\n' 448 f' """Protocol-specific bound sender."""\n' 449 ) 450 451 def _filt_tp_name(rtype: type[Response] | None) -> str: 452 return 'None' if rtype is None else rtype.__name__ 453 454 # Define handler() overloads for all registered message types. 455 if msgtypes: 456 for async_pass in False, True: 457 if async_pass and not enable_async_sends: 458 continue 459 if not async_pass and not enable_sync_sends: 460 continue 461 pfx = 'async ' if async_pass else '' 462 sfx = '_async' if async_pass else '' 463 # awt = 'await ' if async_pass else '' 464 awt = '' 465 how = 'asynchronously' if async_pass else 'synchronously' 466 467 if len(msgtypes) == 1: 468 # Special case: with a single message types we don't 469 # use overloads. 470 msgtype = msgtypes[0] 471 msgtypevar = msgtype.__name__ 472 rtypes = msgtype.get_response_types() 473 if len(rtypes) > 1: 474 rtypevar = ' | '.join(_filt_tp_name(t) for t in rtypes) 475 else: 476 rtypevar = _filt_tp_name(rtypes[0]) 477 if async_pass: 478 rtypevar = f'Awaitable[{rtypevar}]' 479 out += ( 480 f'\n' 481 f' def send{sfx}(self,' 482 f' message: {msgtypevar})' 483 f' -> {rtypevar}:\n' 484 f' """Send a message {how}."""\n' 485 f' out = {awt}self._sender.' 486 f'send{sfx}(self._obj, message)\n' 487 ) 488 if not async_pass: 489 out += ( 490 f' assert isinstance(out, {rtypevar})\n' 491 ' return out\n' 492 ) 493 else: 494 out += f' return cast({rtypevar}, out)\n' 495 496 else: 497 for msgtype in msgtypes: 498 msgtypevar = msgtype.__name__ 499 rtypes = msgtype.get_response_types() 500 if len(rtypes) > 1: 501 rtypevar = ' | '.join( 502 _filt_tp_name(t) for t in rtypes 503 ) 504 else: 505 rtypevar = _filt_tp_name(rtypes[0]) 506 out += ( 507 f'\n' 508 f' @overload\n' 509 f' {pfx}def send{sfx}(self,' 510 f' message: {msgtypevar})' 511 f' -> {rtypevar}: ...\n' 512 ) 513 rtypevar = 'Response | None' 514 if async_pass: 515 rtypevar = f'Awaitable[{rtypevar}]' 516 out += ( 517 f'\n' 518 f' def send{sfx}(self, message: Message)' 519 f' -> {rtypevar}:\n' 520 f' """Send a message {how}."""\n' 521 f' return {awt}self._sender.' 522 f'send{sfx}(self._obj, message)\n' 523 ) 524 525 return out
Used by create_sender_module(); do not call directly.
527 def do_create_receiver_module( 528 self, 529 basename: str, 530 protocol_create_code: str, 531 is_async: bool, 532 private: bool = False, 533 protocol_module_level_import_code: str | None = None, 534 ) -> str: 535 """Used by create_receiver_module(); do not call directly.""" 536 # pylint: disable=too-many-locals 537 # pylint: disable=too-many-positional-arguments 538 import textwrap 539 540 desc = 'asynchronous' if is_async else 'synchronous' 541 ppre = '_' if private else '' 542 msgtypes = list(self.message_ids_by_type.keys()) 543 out = self._get_module_header( 544 'receiver', 545 extra_import_code=protocol_module_level_import_code, 546 enable_async_sends=False, 547 ) 548 ccind = textwrap.indent(protocol_create_code, ' ') 549 out += ( 550 f'class {ppre}{basename}(MessageReceiver):\n' 551 f' """Protocol-specific {desc} receiver."""\n' 552 f'\n' 553 f' is_async = {is_async}\n' 554 f'\n' 555 f' def __init__(self) -> None:\n' 556 f'{ccind}\n' 557 f' super().__init__(protocol)\n' 558 f'\n' 559 f' def __get__(\n' 560 f' self,\n' 561 f' obj: Any,\n' 562 f' type_in: Any = None,\n' 563 f' ) -> {ppre}Bound{basename}:\n' 564 f' return {ppre}Bound{basename}(' 565 f'obj, self)\n' 566 ) 567 568 # Define handler() overloads for all registered message types. 569 570 def _filt_tp_name(rtype: type[Response] | None) -> str: 571 return 'None' if rtype is None else rtype.__name__ 572 573 if msgtypes: 574 cbgn = 'Awaitable[' if is_async else '' 575 cend = ']' if is_async else '' 576 if len(msgtypes) == 1: 577 # Special case: when we have a single message type we don't 578 # use overloads. 579 msgtype = msgtypes[0] 580 msgtypevar = msgtype.__name__ 581 rtypes = msgtype.get_response_types() 582 if len(rtypes) > 1: 583 rtypevar = ' | '.join(_filt_tp_name(t) for t in rtypes) 584 else: 585 rtypevar = _filt_tp_name(rtypes[0]) 586 rtypevar = f'{cbgn}{rtypevar}{cend}' 587 out += ( 588 f'\n' 589 f' def handler(\n' 590 f' self,\n' 591 f' call: Callable[[Any, {msgtypevar}], ' 592 f'{rtypevar}],\n' 593 f' )' 594 f' -> Callable[[Any, {msgtypevar}], {rtypevar}]:\n' 595 f' """Decorator to register message handlers."""\n' 596 f' from typing import cast, Callable, Any\n' 597 f'\n' 598 f' self.register_handler(cast(Callable' 599 f'[[Any, Message], Response], call))\n' 600 f' return call\n' 601 ) 602 else: 603 for msgtype in msgtypes: 604 msgtypevar = msgtype.__name__ 605 rtypes = msgtype.get_response_types() 606 if len(rtypes) > 1: 607 rtypevar = ' | '.join(_filt_tp_name(t) for t in rtypes) 608 else: 609 rtypevar = _filt_tp_name(rtypes[0]) 610 rtypevar = f'{cbgn}{rtypevar}{cend}' 611 out += ( 612 f'\n' 613 f' @overload\n' 614 f' def handler(\n' 615 f' self,\n' 616 f' call: Callable[[Any, {msgtypevar}], ' 617 f'{rtypevar}],\n' 618 f' )' 619 f' -> Callable[[Any, {msgtypevar}], {rtypevar}]: ...\n' 620 ) 621 out += ( 622 '\n' 623 ' def handler(self, call: Callable) -> Callable:\n' 624 ' """Decorator to register message handlers."""\n' 625 ' self.register_handler(call)\n' 626 ' return call\n' 627 ) 628 629 out += ( 630 f'\n' 631 f'\n' 632 f'class {ppre}Bound{basename}(BoundMessageReceiver):\n' 633 f' """Protocol-specific bound receiver."""\n' 634 ) 635 if is_async: 636 out += ( 637 '\n' 638 ' def handle_raw_message(\n' 639 ' self, message: str, raise_unregistered: bool = False\n' 640 ' ) -> Awaitable[str]:\n' 641 ' """Asynchronously handle a raw incoming message."""\n' 642 ' return self._receiver.' 643 'handle_raw_message_async(\n' 644 ' self._obj, message, raise_unregistered\n' 645 ' )\n' 646 ) 647 648 else: 649 out += ( 650 '\n' 651 ' def handle_raw_message(\n' 652 ' self, message: str, raise_unregistered: bool = False\n' 653 ' ) -> str:\n' 654 ' """Synchronously handle a raw incoming message."""\n' 655 ' return self._receiver.handle_raw_message(\n' 656 ' self._obj, message, raise_unregistered\n' 657 ' )\n' 658 ) 659 660 return out
Used by create_receiver_module(); do not call directly.
22class MessageSender: 23 """Facilitates sending messages to a target and receiving responses. 24 25 These are instantiated at the class level and used to register unbound 26 class methods to handle raw message sending. Generally this class is not 27 used directly, but instead autogenerated subclasses which provide type 28 safe overloads are used instead. 29 30 Example: 31 (In this example, MyMessageSender is an autogenerated class that 32 inherits from MessageSender). 33 34 class MyClass: 35 msg = MyMessageSender() 36 37 @msg.send_method 38 def send_raw_message(self, message: str) -> str: 39 # Actually send the message here. 40 41 obj = MyClass() 42 43 # The MyMessageSender generated class would provides overloads for 44 # send(), send_async(), etc. to provide type-safety for message types 45 # and their associated response types. 46 # Thus, given the statement below, a type-checker would know that 47 # 'response' is a SomeResponseType or whatever is associated with 48 # SomeMessageType. 49 response = obj.msg.send(SomeMessageType()) 50 51 """ 52 53 def __init__(self, protocol: MessageProtocol) -> None: 54 self.protocol = protocol 55 self._send_raw_message_call: Callable[[Any, str], str] | None = None 56 self._send_raw_message_ex_call: ( 57 Callable[[Any, str, Message], str] | None 58 ) = None 59 self._send_async_raw_message_call: ( 60 Callable[[Any, str], Awaitable[str]] | None 61 ) = None 62 self._send_async_raw_message_ex_call: ( 63 Callable[[Any, str, Message], Awaitable[str]] | None 64 ) = None 65 self._encode_filter_call: ( 66 Callable[[Any, Message, dict], None] | None 67 ) = None 68 self._decode_filter_call: ( 69 Callable[[Any, Message, dict, Response | SysResponse], None] | None 70 ) = None 71 self._peer_desc_call: Callable[[Any], str] | None = None 72 73 def send_method( 74 self, call: Callable[[Any, str], str] 75 ) -> Callable[[Any, str], str]: 76 """Function decorator for setting raw send method. 77 78 Send methods take strings and should return strings. 79 CommunicationErrors raised here will be returned to the sender 80 as such; all other exceptions will result in a RuntimeError for 81 the sender. 82 """ 83 assert self._send_raw_message_call is None 84 self._send_raw_message_call = call 85 return call 86 87 def send_ex_method( 88 self, call: Callable[[Any, str, Message], str] 89 ) -> Callable[[Any, str, Message], str]: 90 """Function decorator for extended send method. 91 92 Version of send_method which is also is passed the original 93 unencoded message; can be useful for cases where metadata is sent 94 along with messages referring to their payloads/etc. 95 """ 96 assert self._send_raw_message_ex_call is None 97 self._send_raw_message_ex_call = call 98 return call 99 100 def send_async_method( 101 self, call: Callable[[Any, str], Awaitable[str]] 102 ) -> Callable[[Any, str], Awaitable[str]]: 103 """Function decorator for setting raw send-async method. 104 105 Send methods take strings and should return strings. 106 CommunicationErrors raised here will be returned to the sender 107 as such; all other exceptions will result in a RuntimeError for 108 the sender. 109 110 IMPORTANT: Generally async send methods should not be implemented 111 as 'async' methods, but instead should be regular methods that 112 return awaitable objects. This way it can be guaranteed that 113 outgoing messages are synchronously enqueued in the correct 114 order, and then async calls can be returned which finish each 115 send. If the entire call is async, they may be enqueued out of 116 order in rare cases. 117 """ 118 assert self._send_async_raw_message_call is None 119 self._send_async_raw_message_call = call 120 return call 121 122 def send_async_ex_method( 123 self, call: Callable[[Any, str, Message], Awaitable[str]] 124 ) -> Callable[[Any, str, Message], Awaitable[str]]: 125 """Function decorator for extended send-async method. 126 127 Version of send_async_method which is also is passed the original 128 unencoded message; can be useful for cases where metadata is sent 129 along with messages referring to their payloads/etc. 130 """ 131 assert self._send_async_raw_message_ex_call is None 132 self._send_async_raw_message_ex_call = call 133 return call 134 135 def encode_filter_method( 136 self, call: Callable[[Any, Message, dict], None] 137 ) -> Callable[[Any, Message, dict], None]: 138 """Function decorator for defining an encode filter. 139 140 Encode filters can be used to add extra data to the message 141 dict before is is encoded to a string and sent out. 142 """ 143 assert self._encode_filter_call is None 144 self._encode_filter_call = call 145 return call 146 147 def decode_filter_method( 148 self, call: Callable[[Any, Message, dict, Response | SysResponse], None] 149 ) -> Callable[[Any, Message, dict, Response], None]: 150 """Function decorator for defining a decode filter. 151 152 Decode filters can be used to extract extra data from incoming 153 message dicts. 154 """ 155 assert self._decode_filter_call is None 156 self._decode_filter_call = call 157 return call 158 159 def peer_desc_method( 160 self, call: Callable[[Any], str] 161 ) -> Callable[[Any], str]: 162 """Function decorator for defining peer descriptions. 163 164 These are included in error messages or other diagnostics. 165 """ 166 assert self._peer_desc_call is None 167 self._peer_desc_call = call 168 return call 169 170 def send(self, bound_obj: Any, message: Message) -> Response | None: 171 """Send a message synchronously.""" 172 return self.unpack_raw_response( 173 bound_obj=bound_obj, 174 message=message, 175 raw_response=self.fetch_raw_response( 176 bound_obj=bound_obj, 177 message=message, 178 ), 179 ) 180 181 def send_async( 182 self, bound_obj: Any, message: Message 183 ) -> Awaitable[Response | None]: 184 """Send a message asynchronously.""" 185 186 # Note: This call is synchronous so that the first part of it can 187 # happen synchronously. If the whole call were async we wouldn't be 188 # able to guarantee that messages sent in order would actually go 189 # out in order. 190 raw_response_awaitable = self.fetch_raw_response_async( 191 bound_obj=bound_obj, 192 message=message, 193 ) 194 # Now return an awaitable that will finish the send. 195 return self._send_async_awaitable( 196 bound_obj, message, raw_response_awaitable 197 ) 198 199 async def _send_async_awaitable( 200 self, 201 bound_obj: Any, 202 message: Message, 203 raw_response_awaitable: Awaitable[Response | SysResponse], 204 ) -> Response | None: 205 return self.unpack_raw_response( 206 bound_obj=bound_obj, 207 message=message, 208 raw_response=await raw_response_awaitable, 209 ) 210 211 def fetch_raw_response( 212 self, bound_obj: Any, message: Message 213 ) -> Response | SysResponse: 214 """Send a message synchronously. 215 216 Generally you can just call send(); these split versions are 217 for when message sending and response handling need to happen 218 in different contexts/threads. 219 """ 220 if ( 221 self._send_raw_message_call is None 222 and self._send_raw_message_ex_call is None 223 ): 224 raise RuntimeError('send() is unimplemented for this type.') 225 226 msg_encoded = self._encode_message(bound_obj, message) 227 try: 228 if self._send_raw_message_ex_call is not None: 229 response_encoded = self._send_raw_message_ex_call( 230 bound_obj, msg_encoded, message 231 ) 232 else: 233 assert self._send_raw_message_call is not None 234 response_encoded = self._send_raw_message_call( 235 bound_obj, msg_encoded 236 ) 237 except Exception as exc: 238 response = ErrorSysResponse( 239 error_message='Error in MessageSender @send_method.', 240 error_type=( 241 ErrorSysResponse.ErrorType.COMMUNICATION 242 if isinstance(exc, CommunicationError) 243 else ErrorSysResponse.ErrorType.LOCAL 244 ), 245 ) 246 # Can include the actual exception since we'll be looking at 247 # this locally; might be helpful. 248 response.set_local_exception(exc) 249 return response 250 return self._decode_raw_response(bound_obj, message, response_encoded) 251 252 def fetch_raw_response_async( 253 self, bound_obj: Any, message: Message 254 ) -> Awaitable[Response | SysResponse]: 255 """Fetch a raw message response awaitable. 256 257 The result of this should be awaited and then passed to 258 unpack_raw_response() to produce the final message result. 259 260 Generally you can just call send(); calling fetch and unpack 261 manually is for when message sending and response handling need 262 to happen in different contexts/threads. 263 """ 264 265 # Note: This call is synchronous so that the first part of it can 266 # happen synchronously. If the whole call were async we wouldn't be 267 # able to guarantee that messages sent in order would actually go 268 # out in order. 269 if ( 270 self._send_async_raw_message_call is None 271 and self._send_async_raw_message_ex_call is None 272 ): 273 raise RuntimeError('send_async() is unimplemented for this type.') 274 275 msg_encoded = self._encode_message(bound_obj, message) 276 try: 277 if self._send_async_raw_message_ex_call is not None: 278 send_awaitable = self._send_async_raw_message_ex_call( 279 bound_obj, msg_encoded, message 280 ) 281 else: 282 assert self._send_async_raw_message_call is not None 283 send_awaitable = self._send_async_raw_message_call( 284 bound_obj, msg_encoded 285 ) 286 except Exception as exc: 287 return self._error_awaitable(exc) 288 289 # Now return an awaitable to finish the job. 290 return self._fetch_raw_response_awaitable( 291 bound_obj, message, send_awaitable 292 ) 293 294 async def _error_awaitable(self, exc: Exception) -> SysResponse: 295 response = ErrorSysResponse( 296 error_message='Error in MessageSender @send_async_method.', 297 error_type=( 298 ErrorSysResponse.ErrorType.COMMUNICATION 299 if isinstance(exc, CommunicationError) 300 else ErrorSysResponse.ErrorType.LOCAL 301 ), 302 ) 303 # Can include the actual exception since we'll be looking at 304 # this locally; might be helpful. 305 response.set_local_exception(exc) 306 return response 307 308 async def _fetch_raw_response_awaitable( 309 self, bound_obj: Any, message: Message, send_awaitable: Awaitable[str] 310 ) -> Response | SysResponse: 311 try: 312 response_encoded = await send_awaitable 313 except Exception as exc: 314 response = ErrorSysResponse( 315 error_message='Error in MessageSender @send_async_method.', 316 error_type=( 317 ErrorSysResponse.ErrorType.COMMUNICATION 318 if isinstance(exc, CommunicationError) 319 else ErrorSysResponse.ErrorType.LOCAL 320 ), 321 ) 322 # Can include the actual exception since we'll be looking at 323 # this locally; might be helpful. 324 response.set_local_exception(exc) 325 return response 326 return self._decode_raw_response(bound_obj, message, response_encoded) 327 328 def unpack_raw_response( 329 self, 330 bound_obj: Any, 331 message: Message, 332 raw_response: Response | SysResponse, 333 ) -> Response | None: 334 """Convert a raw fetched response into a final response/error/etc. 335 336 Generally you can just call send(); calling fetch and unpack 337 manually is for when message sending and response handling need 338 to happen in different contexts/threads. 339 """ 340 response = self._unpack_raw_response(bound_obj, raw_response) 341 assert ( 342 response is None 343 or type(response) in type(message).get_response_types() 344 ) 345 return response 346 347 def _encode_message(self, bound_obj: Any, message: Message) -> str: 348 """Encode a message for sending.""" 349 msg_dict = self.protocol.message_to_dict(message) 350 if self._encode_filter_call is not None: 351 self._encode_filter_call(bound_obj, message, msg_dict) 352 return self.protocol.encode_dict(msg_dict) 353 354 def _decode_raw_response( 355 self, bound_obj: Any, message: Message, response_encoded: str 356 ) -> Response | SysResponse: 357 """Create a Response from returned data. 358 359 These Responses may encapsulate things like remote errors and 360 should not be handed directly to users. _unpack_raw_response() 361 should be used to translate to special values like None or raise 362 Exceptions. This function itself should never raise Exceptions. 363 """ 364 response: Response | SysResponse 365 try: 366 response_dict = self.protocol.decode_dict(response_encoded) 367 response = self.protocol.response_from_dict(response_dict) 368 if self._decode_filter_call is not None: 369 self._decode_filter_call( 370 bound_obj, message, response_dict, response 371 ) 372 except Exception as exc: 373 response = ErrorSysResponse( 374 error_message='Error decoding raw response.', 375 error_type=ErrorSysResponse.ErrorType.LOCAL, 376 ) 377 # Since we'll be looking at this locally, we can include 378 # extra info for logging/etc. 379 response.set_local_exception(exc) 380 return response 381 382 def _unpack_raw_response( 383 self, bound_obj: Any, raw_response: Response | SysResponse 384 ) -> Response | None: 385 """Given a raw Response, unpacks to special values or Exceptions. 386 387 The result of this call is what should be passed to users. 388 For complex messaging situations such as response callbacks 389 operating across different threads, this last stage should be 390 run such that any raised Exception is active when the callback 391 fires; not on the thread where the message was sent. 392 """ 393 # EmptySysResponse translates to None 394 if isinstance(raw_response, EmptySysResponse): 395 return None 396 397 # Some error occurred. Raise a local Exception for it. 398 if isinstance(raw_response, ErrorSysResponse): 399 # Errors that happened locally can attach their exceptions 400 # here for extra logging goodness. 401 local_exception = raw_response.get_local_exception() 402 403 if ( 404 raw_response.error_type 405 is ErrorSysResponse.ErrorType.COMMUNICATION 406 ): 407 raise CommunicationError( 408 raw_response.error_message 409 ) from local_exception 410 411 # If something went wrong on *our* end of the connection, 412 # don't say it was a remote error. 413 if raw_response.error_type is ErrorSysResponse.ErrorType.LOCAL: 414 raise RuntimeError( 415 raw_response.error_message 416 ) from local_exception 417 418 # If they want to support clean errors, do those. 419 if ( 420 self.protocol.forward_clean_errors 421 and raw_response.error_type 422 is ErrorSysResponse.ErrorType.REMOTE_CLEAN 423 ): 424 raise CleanError( 425 raw_response.error_message 426 ) from local_exception 427 428 if ( 429 self.protocol.forward_communication_errors 430 and raw_response.error_type 431 is ErrorSysResponse.ErrorType.REMOTE_COMMUNICATION 432 ): 433 raise CommunicationError( 434 raw_response.error_message 435 ) from local_exception 436 437 # Everything else gets lumped in as a remote error. 438 raise RemoteError( 439 raw_response.error_message, 440 peer_desc=( 441 'peer' 442 if self._peer_desc_call is None 443 else self._peer_desc_call(bound_obj) 444 ), 445 ) from local_exception 446 447 assert isinstance(raw_response, Response) 448 return raw_response
Facilitates sending messages to a target and receiving responses.
These are instantiated at the class level and used to register unbound class methods to handle raw message sending. Generally this class is not used directly, but instead autogenerated subclasses which provide type safe overloads are used instead.
Example: (In this example, MyMessageSender is an autogenerated class that inherits from MessageSender).
class MyClass: msg = MyMessageSender()
@msg.send_method
def send_raw_message(self, message: str) -> str:
# Actually send the message here.
obj = MyClass()
The MyMessageSender generated class would provides overloads for
send(), send_async(), etc. to provide type-safety for message types
and their associated response types.
Thus, given the statement below, a type-checker would know that
'response' is a SomeResponseType or whatever is associated with
SomeMessageType.
response = obj.msg.send(SomeMessageType())
53 def __init__(self, protocol: MessageProtocol) -> None: 54 self.protocol = protocol 55 self._send_raw_message_call: Callable[[Any, str], str] | None = None 56 self._send_raw_message_ex_call: ( 57 Callable[[Any, str, Message], str] | None 58 ) = None 59 self._send_async_raw_message_call: ( 60 Callable[[Any, str], Awaitable[str]] | None 61 ) = None 62 self._send_async_raw_message_ex_call: ( 63 Callable[[Any, str, Message], Awaitable[str]] | None 64 ) = None 65 self._encode_filter_call: ( 66 Callable[[Any, Message, dict], None] | None 67 ) = None 68 self._decode_filter_call: ( 69 Callable[[Any, Message, dict, Response | SysResponse], None] | None 70 ) = None 71 self._peer_desc_call: Callable[[Any], str] | None = None
73 def send_method( 74 self, call: Callable[[Any, str], str] 75 ) -> Callable[[Any, str], str]: 76 """Function decorator for setting raw send method. 77 78 Send methods take strings and should return strings. 79 CommunicationErrors raised here will be returned to the sender 80 as such; all other exceptions will result in a RuntimeError for 81 the sender. 82 """ 83 assert self._send_raw_message_call is None 84 self._send_raw_message_call = call 85 return call
Function decorator for setting raw send method.
Send methods take strings and should return strings. CommunicationErrors raised here will be returned to the sender as such; all other exceptions will result in a RuntimeError for the sender.
87 def send_ex_method( 88 self, call: Callable[[Any, str, Message], str] 89 ) -> Callable[[Any, str, Message], str]: 90 """Function decorator for extended send method. 91 92 Version of send_method which is also is passed the original 93 unencoded message; can be useful for cases where metadata is sent 94 along with messages referring to their payloads/etc. 95 """ 96 assert self._send_raw_message_ex_call is None 97 self._send_raw_message_ex_call = call 98 return call
Function decorator for extended send method.
Version of send_method which is also is passed the original unencoded message; can be useful for cases where metadata is sent along with messages referring to their payloads/etc.
100 def send_async_method( 101 self, call: Callable[[Any, str], Awaitable[str]] 102 ) -> Callable[[Any, str], Awaitable[str]]: 103 """Function decorator for setting raw send-async method. 104 105 Send methods take strings and should return strings. 106 CommunicationErrors raised here will be returned to the sender 107 as such; all other exceptions will result in a RuntimeError for 108 the sender. 109 110 IMPORTANT: Generally async send methods should not be implemented 111 as 'async' methods, but instead should be regular methods that 112 return awaitable objects. This way it can be guaranteed that 113 outgoing messages are synchronously enqueued in the correct 114 order, and then async calls can be returned which finish each 115 send. If the entire call is async, they may be enqueued out of 116 order in rare cases. 117 """ 118 assert self._send_async_raw_message_call is None 119 self._send_async_raw_message_call = call 120 return call
Function decorator for setting raw send-async method.
Send methods take strings and should return strings. CommunicationErrors raised here will be returned to the sender as such; all other exceptions will result in a RuntimeError for the sender.
IMPORTANT: Generally async send methods should not be implemented as 'async' methods, but instead should be regular methods that return awaitable objects. This way it can be guaranteed that outgoing messages are synchronously enqueued in the correct order, and then async calls can be returned which finish each send. If the entire call is async, they may be enqueued out of order in rare cases.
122 def send_async_ex_method( 123 self, call: Callable[[Any, str, Message], Awaitable[str]] 124 ) -> Callable[[Any, str, Message], Awaitable[str]]: 125 """Function decorator for extended send-async method. 126 127 Version of send_async_method which is also is passed the original 128 unencoded message; can be useful for cases where metadata is sent 129 along with messages referring to their payloads/etc. 130 """ 131 assert self._send_async_raw_message_ex_call is None 132 self._send_async_raw_message_ex_call = call 133 return call
Function decorator for extended send-async method.
Version of send_async_method which is also is passed the original unencoded message; can be useful for cases where metadata is sent along with messages referring to their payloads/etc.
135 def encode_filter_method( 136 self, call: Callable[[Any, Message, dict], None] 137 ) -> Callable[[Any, Message, dict], None]: 138 """Function decorator for defining an encode filter. 139 140 Encode filters can be used to add extra data to the message 141 dict before is is encoded to a string and sent out. 142 """ 143 assert self._encode_filter_call is None 144 self._encode_filter_call = call 145 return call
Function decorator for defining an encode filter.
Encode filters can be used to add extra data to the message dict before is is encoded to a string and sent out.
147 def decode_filter_method( 148 self, call: Callable[[Any, Message, dict, Response | SysResponse], None] 149 ) -> Callable[[Any, Message, dict, Response], None]: 150 """Function decorator for defining a decode filter. 151 152 Decode filters can be used to extract extra data from incoming 153 message dicts. 154 """ 155 assert self._decode_filter_call is None 156 self._decode_filter_call = call 157 return call
Function decorator for defining a decode filter.
Decode filters can be used to extract extra data from incoming message dicts.
159 def peer_desc_method( 160 self, call: Callable[[Any], str] 161 ) -> Callable[[Any], str]: 162 """Function decorator for defining peer descriptions. 163 164 These are included in error messages or other diagnostics. 165 """ 166 assert self._peer_desc_call is None 167 self._peer_desc_call = call 168 return call
Function decorator for defining peer descriptions.
These are included in error messages or other diagnostics.
170 def send(self, bound_obj: Any, message: Message) -> Response | None: 171 """Send a message synchronously.""" 172 return self.unpack_raw_response( 173 bound_obj=bound_obj, 174 message=message, 175 raw_response=self.fetch_raw_response( 176 bound_obj=bound_obj, 177 message=message, 178 ), 179 )
Send a message synchronously.
181 def send_async( 182 self, bound_obj: Any, message: Message 183 ) -> Awaitable[Response | None]: 184 """Send a message asynchronously.""" 185 186 # Note: This call is synchronous so that the first part of it can 187 # happen synchronously. If the whole call were async we wouldn't be 188 # able to guarantee that messages sent in order would actually go 189 # out in order. 190 raw_response_awaitable = self.fetch_raw_response_async( 191 bound_obj=bound_obj, 192 message=message, 193 ) 194 # Now return an awaitable that will finish the send. 195 return self._send_async_awaitable( 196 bound_obj, message, raw_response_awaitable 197 )
Send a message asynchronously.
211 def fetch_raw_response( 212 self, bound_obj: Any, message: Message 213 ) -> Response | SysResponse: 214 """Send a message synchronously. 215 216 Generally you can just call send(); these split versions are 217 for when message sending and response handling need to happen 218 in different contexts/threads. 219 """ 220 if ( 221 self._send_raw_message_call is None 222 and self._send_raw_message_ex_call is None 223 ): 224 raise RuntimeError('send() is unimplemented for this type.') 225 226 msg_encoded = self._encode_message(bound_obj, message) 227 try: 228 if self._send_raw_message_ex_call is not None: 229 response_encoded = self._send_raw_message_ex_call( 230 bound_obj, msg_encoded, message 231 ) 232 else: 233 assert self._send_raw_message_call is not None 234 response_encoded = self._send_raw_message_call( 235 bound_obj, msg_encoded 236 ) 237 except Exception as exc: 238 response = ErrorSysResponse( 239 error_message='Error in MessageSender @send_method.', 240 error_type=( 241 ErrorSysResponse.ErrorType.COMMUNICATION 242 if isinstance(exc, CommunicationError) 243 else ErrorSysResponse.ErrorType.LOCAL 244 ), 245 ) 246 # Can include the actual exception since we'll be looking at 247 # this locally; might be helpful. 248 response.set_local_exception(exc) 249 return response 250 return self._decode_raw_response(bound_obj, message, response_encoded)
Send a message synchronously.
Generally you can just call send(); these split versions are for when message sending and response handling need to happen in different contexts/threads.
252 def fetch_raw_response_async( 253 self, bound_obj: Any, message: Message 254 ) -> Awaitable[Response | SysResponse]: 255 """Fetch a raw message response awaitable. 256 257 The result of this should be awaited and then passed to 258 unpack_raw_response() to produce the final message result. 259 260 Generally you can just call send(); calling fetch and unpack 261 manually is for when message sending and response handling need 262 to happen in different contexts/threads. 263 """ 264 265 # Note: This call is synchronous so that the first part of it can 266 # happen synchronously. If the whole call were async we wouldn't be 267 # able to guarantee that messages sent in order would actually go 268 # out in order. 269 if ( 270 self._send_async_raw_message_call is None 271 and self._send_async_raw_message_ex_call is None 272 ): 273 raise RuntimeError('send_async() is unimplemented for this type.') 274 275 msg_encoded = self._encode_message(bound_obj, message) 276 try: 277 if self._send_async_raw_message_ex_call is not None: 278 send_awaitable = self._send_async_raw_message_ex_call( 279 bound_obj, msg_encoded, message 280 ) 281 else: 282 assert self._send_async_raw_message_call is not None 283 send_awaitable = self._send_async_raw_message_call( 284 bound_obj, msg_encoded 285 ) 286 except Exception as exc: 287 return self._error_awaitable(exc) 288 289 # Now return an awaitable to finish the job. 290 return self._fetch_raw_response_awaitable( 291 bound_obj, message, send_awaitable 292 )
Fetch a raw message response awaitable.
The result of this should be awaited and then passed to unpack_raw_response() to produce the final message result.
Generally you can just call send(); calling fetch and unpack manually is for when message sending and response handling need to happen in different contexts/threads.
328 def unpack_raw_response( 329 self, 330 bound_obj: Any, 331 message: Message, 332 raw_response: Response | SysResponse, 333 ) -> Response | None: 334 """Convert a raw fetched response into a final response/error/etc. 335 336 Generally you can just call send(); calling fetch and unpack 337 manually is for when message sending and response handling need 338 to happen in different contexts/threads. 339 """ 340 response = self._unpack_raw_response(bound_obj, raw_response) 341 assert ( 342 response is None 343 or type(response) in type(message).get_response_types() 344 ) 345 return response
Convert a raw fetched response into a final response/error/etc.
Generally you can just call send(); calling fetch and unpack manually is for when message sending and response handling need to happen in different contexts/threads.
451class BoundMessageSender: 452 """Base class for bound senders.""" 453 454 def __init__(self, obj: Any, sender: MessageSender) -> None: 455 # Note: not checking obj here since we want to support 456 # at least our protocol property when accessed via type. 457 self._obj = obj 458 self._sender = sender 459 460 @property 461 def protocol(self) -> MessageProtocol: 462 """Protocol associated with this sender.""" 463 return self._sender.protocol 464 465 def send_untyped(self, message: Message) -> Response | None: 466 """Send a message synchronously. 467 468 Whenever possible, use the send() call provided by generated 469 subclasses instead of this; it will provide better type safety. 470 """ 471 assert self._obj is not None 472 return self._sender.send(bound_obj=self._obj, message=message) 473 474 def send_async_untyped( 475 self, message: Message 476 ) -> Awaitable[Response | None]: 477 """Send a message asynchronously. 478 479 Whenever possible, use the send_async() call provided by generated 480 subclasses instead of this; it will provide better type safety. 481 """ 482 assert self._obj is not None 483 return self._sender.send_async(bound_obj=self._obj, message=message) 484 485 def fetch_raw_response_async_untyped( 486 self, message: Message 487 ) -> Awaitable[Response | SysResponse]: 488 """Split send (part 1 of 2).""" 489 assert self._obj is not None 490 return self._sender.fetch_raw_response_async( 491 bound_obj=self._obj, message=message 492 ) 493 494 def unpack_raw_response_untyped( 495 self, message: Message, raw_response: Response | SysResponse 496 ) -> Response | None: 497 """Split send (part 2 of 2).""" 498 return self._sender.unpack_raw_response( 499 bound_obj=self._obj, message=message, raw_response=raw_response 500 )
Base class for bound senders.
460 @property 461 def protocol(self) -> MessageProtocol: 462 """Protocol associated with this sender.""" 463 return self._sender.protocol
Protocol associated with this sender.
465 def send_untyped(self, message: Message) -> Response | None: 466 """Send a message synchronously. 467 468 Whenever possible, use the send() call provided by generated 469 subclasses instead of this; it will provide better type safety. 470 """ 471 assert self._obj is not None 472 return self._sender.send(bound_obj=self._obj, message=message)
Send a message synchronously.
Whenever possible, use the send() call provided by generated subclasses instead of this; it will provide better type safety.
474 def send_async_untyped( 475 self, message: Message 476 ) -> Awaitable[Response | None]: 477 """Send a message asynchronously. 478 479 Whenever possible, use the send_async() call provided by generated 480 subclasses instead of this; it will provide better type safety. 481 """ 482 assert self._obj is not None 483 return self._sender.send_async(bound_obj=self._obj, message=message)
Send a message asynchronously.
Whenever possible, use the send_async() call provided by generated subclasses instead of this; it will provide better type safety.
485 def fetch_raw_response_async_untyped( 486 self, message: Message 487 ) -> Awaitable[Response | SysResponse]: 488 """Split send (part 1 of 2).""" 489 assert self._obj is not None 490 return self._sender.fetch_raw_response_async( 491 bound_obj=self._obj, message=message 492 )
Split send (part 1 of 2).
494 def unpack_raw_response_untyped( 495 self, message: Message, raw_response: Response | SysResponse 496 ) -> Response | None: 497 """Split send (part 2 of 2).""" 498 return self._sender.unpack_raw_response( 499 bound_obj=self._obj, message=message, raw_response=raw_response 500 )
Split send (part 2 of 2).
29class MessageReceiver: 30 """Facilitates receiving & responding to messages from a remote source. 31 32 This is instantiated at the class level with unbound methods registered 33 as handlers for different message types in the protocol. 34 35 Example: 36 37 class MyClass: 38 receiver = MyMessageReceiver() 39 40 # MyMessageReceiver fills out handler() overloads to ensure all 41 # registered handlers have valid types/return-types. 42 43 @receiver.handler 44 def handle_some_message_type(self, message: SomeMsg) -> SomeResponse: 45 # Deal with this message type here. 46 47 # This will trigger the registered handler being called. 48 obj = MyClass() 49 obj.receiver.handle_raw_message(some_raw_data) 50 51 Any unhandled Exception occurring during message handling will result in 52 an efro.error.RemoteError being raised on the sending end. 53 """ 54 55 is_async = False 56 57 def __init__(self, protocol: MessageProtocol) -> None: 58 self.protocol = protocol 59 self._handlers: dict[type[Message], Callable] = {} 60 self._decode_filter_call: ( 61 Callable[[Any, dict, Message], None] | None 62 ) = None 63 self._encode_filter_call: ( 64 Callable[[Any, Message | None, Response | SysResponse, dict], None] 65 | None 66 ) = None 67 68 # noinspection PyProtectedMember 69 def register_handler( 70 self, call: Callable[[Any, Message], Response | None] 71 ) -> None: 72 """Register a handler call. 73 74 The message type handled by the call is determined by its 75 type annotation. 76 """ 77 # TODO: can use types.GenericAlias in 3.9. 78 # (hmm though now that we're there, it seems a drop-in 79 # replace gives us errors. Should re-test in 3.11 as it seems 80 # that typing_extensions handles it differently in that case) 81 from typing import _GenericAlias # type: ignore 82 from typing import get_type_hints, get_args 83 84 sig = inspect.getfullargspec(call) 85 86 # The provided callable should be a method taking one 'msg' arg. 87 expectedsig = ['self', 'msg'] 88 if sig.args != expectedsig: 89 raise ValueError( 90 f'Expected callable signature of {expectedsig};' 91 f' got {sig.args}' 92 ) 93 94 # Check annotation types to determine what message types we handle. 95 # Return-type annotation can be a Union, but we probably don't 96 # have it available at runtime. Explicitly pull it in. 97 # UPDATE: we've updated our pylint filter to where we should 98 # have all annotations available. 99 # anns = get_type_hints(call, localns={'Union': Union}) 100 anns = get_type_hints(call) 101 102 msgtype = anns.get('msg') 103 if not isinstance(msgtype, type): 104 raise TypeError( 105 f'expected a type for "msg" annotation; got {type(msgtype)}.' 106 ) 107 assert issubclass(msgtype, Message) 108 109 ret = anns.get('return') 110 responsetypes: tuple[type[Any] | None, ...] 111 112 # Return types can be a single type or a union of types. 113 if isinstance(ret, (_GenericAlias, types.UnionType)): 114 targs = get_args(ret) 115 if not all(isinstance(a, (type, type(None))) for a in targs): 116 raise TypeError( 117 f'expected only types for "return" annotation;' 118 f' got {targs}.' 119 ) 120 responsetypes = targs 121 else: 122 if not isinstance(ret, (type, type(None))): 123 raise TypeError( 124 f'expected one or more types for' 125 f' "return" annotation; got a {type(ret)}.' 126 ) 127 # This seems like maybe a mypy bug. Appeared after adding 128 # types.UnionType above. 129 responsetypes = (ret,) 130 131 # This will contain NoneType for empty return cases, but 132 # we expect it to be None. 133 # noinspection PyPep8 134 responsetypes = tuple( 135 None if r is type(None) else r for r in responsetypes 136 ) 137 138 # Make sure our protocol has this message type registered and our 139 # return types exactly match. (Technically we could return a subset 140 # of the supported types; can allow this in the future if it makes 141 # sense). 142 registered_types = self.protocol.message_ids_by_type.keys() 143 144 if msgtype not in registered_types: 145 raise TypeError( 146 f'Message type {msgtype} is not registered' 147 f' in this Protocol.' 148 ) 149 150 if msgtype in self._handlers: 151 raise TypeError( 152 f'Message type {msgtype} already has a registered handler.' 153 ) 154 155 # Make sure the responses exactly matches what the message expects. 156 if set(responsetypes) != set(msgtype.get_response_types()): 157 raise TypeError( 158 f'Provided response types {responsetypes} do not' 159 f' match the set expected by message type {msgtype}: ' 160 f'({msgtype.get_response_types()})' 161 ) 162 163 # Ok; we're good! 164 self._handlers[msgtype] = call 165 166 def decode_filter_method( 167 self, call: Callable[[Any, dict, Message], None] 168 ) -> Callable[[Any, dict, Message], None]: 169 """Function decorator for defining a decode filter. 170 171 Decode filters can be used to extract extra data from incoming 172 message dicts. This version will work for both handle_raw_message() 173 and handle_raw_message_async() 174 """ 175 assert self._decode_filter_call is None 176 self._decode_filter_call = call 177 return call 178 179 def encode_filter_method( 180 self, 181 call: Callable[ 182 [Any, Message | None, Response | SysResponse, dict], None 183 ], 184 ) -> Callable[[Any, Message | None, Response, dict], None]: 185 """Function decorator for defining an encode filter. 186 187 Encode filters can be used to add extra data to the message 188 dict before is is encoded to a string and sent out. 189 """ 190 assert self._encode_filter_call is None 191 self._encode_filter_call = call 192 return call 193 194 def validate(self, log_only: bool = False) -> None: 195 """Check for handler completeness, valid types, etc.""" 196 for msgtype in self.protocol.message_ids_by_type.keys(): 197 if issubclass(msgtype, Response): 198 continue 199 if msgtype not in self._handlers: 200 msg = ( 201 f'Protocol message type {msgtype} is not handled' 202 f' by receiver type {type(self)}.' 203 ) 204 if log_only: 205 logging.error(msg) 206 else: 207 raise TypeError(msg) 208 209 def _decode_incoming_message_base( 210 self, bound_obj: Any, msg: str 211 ) -> tuple[Any, dict, Message]: 212 # Decode the incoming message. 213 msg_dict = self.protocol.decode_dict(msg) 214 msg_decoded = self.protocol.message_from_dict(msg_dict) 215 assert isinstance(msg_decoded, Message) 216 if self._decode_filter_call is not None: 217 self._decode_filter_call(bound_obj, msg_dict, msg_decoded) 218 return bound_obj, msg_dict, msg_decoded 219 220 def _decode_incoming_message(self, bound_obj: Any, msg: str) -> Message: 221 bound_obj, _msg_dict, msg_decoded = self._decode_incoming_message_base( 222 bound_obj=bound_obj, msg=msg 223 ) 224 return msg_decoded 225 226 def encode_user_response( 227 self, bound_obj: Any, message: Message, response: Response | None 228 ) -> str: 229 """Encode a response provided by the user for sending.""" 230 231 assert isinstance(response, Response | None) 232 # (user should never explicitly return error-responses) 233 assert ( 234 response is None or type(response) in message.get_response_types() 235 ) 236 237 # A return value of None equals EmptySysResponse. 238 out_response: Response | SysResponse 239 if response is None: 240 out_response = EmptySysResponse() 241 else: 242 out_response = response 243 244 response_dict = self.protocol.response_to_dict(out_response) 245 if self._encode_filter_call is not None: 246 self._encode_filter_call( 247 bound_obj, message, out_response, response_dict 248 ) 249 return self.protocol.encode_dict(response_dict) 250 251 def encode_error_response( 252 self, bound_obj: Any, message: Message | None, exc: Exception 253 ) -> tuple[str, bool]: 254 """Given an error, return sysresponse str and whether to log.""" 255 response, dolog = self.protocol.error_to_response(exc) 256 response_dict = self.protocol.response_to_dict(response) 257 if self._encode_filter_call is not None: 258 self._encode_filter_call( 259 bound_obj, message, response, response_dict 260 ) 261 return self.protocol.encode_dict(response_dict), dolog 262 263 def handle_raw_message( 264 self, bound_obj: Any, msg: str, raise_unregistered: bool = False 265 ) -> str: 266 """Decode, handle, and return an response for a message. 267 268 if 'raise_unregistered' is True, will raise an 269 efro.message.UnregisteredMessageIDError for messages not handled by 270 the protocol. In all other cases local errors will translate to 271 error responses returned to the sender. 272 """ 273 assert not self.is_async, "can't call sync handler on async receiver" 274 msg_decoded: Message | None = None 275 try: 276 msg_decoded = self._decode_incoming_message(bound_obj, msg) 277 msgtype = type(msg_decoded) 278 handler = self._handlers.get(msgtype) 279 if handler is None: 280 raise RuntimeError(f'Got unhandled message type: {msgtype}.') 281 response = handler(bound_obj, msg_decoded) 282 assert isinstance(response, Response | None) 283 return self.encode_user_response(bound_obj, msg_decoded, response) 284 285 except Exception as exc: 286 if raise_unregistered and isinstance( 287 exc, UnregisteredMessageIDError 288 ): 289 raise 290 rstr, dolog = self.encode_error_response( 291 bound_obj, msg_decoded, exc 292 ) 293 if dolog: 294 if msg_decoded is not None: 295 msgtype = type(msg_decoded) 296 logging.exception( 297 'Error handling %s.%s message.', 298 msgtype.__module__, 299 msgtype.__qualname__, 300 ) 301 else: 302 logging.exception( 303 'Error handling raw efro.message' 304 ' (likely a message format incompatibility): %s.', 305 msg, 306 ) 307 return rstr 308 309 def handle_raw_message_async( 310 self, bound_obj: Any, msg: str, raise_unregistered: bool = False 311 ) -> Awaitable[str]: 312 """Should be called when the receiver gets a message. 313 314 The return value is the raw response to the message. 315 """ 316 317 # Note: This call is synchronous so that the first part of it can 318 # happen synchronously. If the whole call were async we wouldn't be 319 # able to guarantee that messages handlers would be called in the 320 # order the messages were received. 321 322 assert self.is_async, "Can't call async handler on sync receiver." 323 msg_decoded: Message | None = None 324 try: 325 msg_decoded = self._decode_incoming_message(bound_obj, msg) 326 msgtype = type(msg_decoded) 327 handler = self._handlers.get(msgtype) 328 if handler is None: 329 raise RuntimeError(f'Got unhandled message type: {msgtype}.') 330 handler_awaitable = handler(bound_obj, msg_decoded) 331 332 except Exception as exc: 333 if raise_unregistered and isinstance( 334 exc, UnregisteredMessageIDError 335 ): 336 raise 337 return self._handle_raw_message_async_error( 338 bound_obj, msg, msg_decoded, exc 339 ) 340 341 # Return an awaitable to handle the rest asynchronously. 342 return self._handle_raw_message_async( 343 bound_obj, msg, msg_decoded, handler_awaitable 344 ) 345 346 async def _handle_raw_message_async_error( 347 self, 348 bound_obj: Any, 349 msg_raw: str, 350 msg_decoded: Message | None, 351 exc: Exception, 352 ) -> str: 353 rstr, dolog = self.encode_error_response(bound_obj, msg_decoded, exc) 354 if dolog: 355 if msg_decoded is not None: 356 msgtype = type(msg_decoded) 357 logging.exception( 358 'Error handling %s.%s message.', 359 msgtype.__module__, 360 msgtype.__qualname__, 361 # We need to explicitly provide the exception here, 362 # otherwise it shows up at None. I assume related to 363 # the fact that we're an async function. 364 exc_info=exc, 365 ) 366 else: 367 logging.exception( 368 'Error handling raw async efro.message' 369 ' (likely a message format incompatibility): %s.', 370 msg_raw, 371 # We need to explicitly provide the exception here, 372 # otherwise it shows up at None. I assume related to 373 # the fact that we're an async function. 374 exc_info=exc, 375 ) 376 return rstr 377 378 async def _handle_raw_message_async( 379 self, 380 bound_obj: Any, 381 msg_raw: str, 382 msg_decoded: Message, 383 handler_awaitable: Awaitable[Response | None], 384 ) -> str: 385 """Should be called when the receiver gets a message. 386 387 The return value is the raw response to the message. 388 """ 389 try: 390 response = await handler_awaitable 391 assert isinstance(response, Response | None) 392 return self.encode_user_response(bound_obj, msg_decoded, response) 393 394 except Exception as exc: 395 return await self._handle_raw_message_async_error( 396 bound_obj, msg_raw, msg_decoded, exc 397 )
Facilitates receiving & responding to messages from a remote source.
This is instantiated at the class level with unbound methods registered as handlers for different message types in the protocol.
Example:
class MyClass: receiver = MyMessageReceiver()
# MyMessageReceiver fills out handler() overloads to ensure all
# registered handlers have valid types/return-types.
@receiver.handler
def handle_some_message_type(self, message: SomeMsg) -> SomeResponse:
# Deal with this message type here.
This will trigger the registered handler being called.
obj = MyClass() obj.receiver.handle_raw_message(some_raw_data)
Any unhandled Exception occurring during message handling will result in an efro.error.RemoteError being raised on the sending end.
57 def __init__(self, protocol: MessageProtocol) -> None: 58 self.protocol = protocol 59 self._handlers: dict[type[Message], Callable] = {} 60 self._decode_filter_call: ( 61 Callable[[Any, dict, Message], None] | None 62 ) = None 63 self._encode_filter_call: ( 64 Callable[[Any, Message | None, Response | SysResponse, dict], None] 65 | None 66 ) = None
69 def register_handler( 70 self, call: Callable[[Any, Message], Response | None] 71 ) -> None: 72 """Register a handler call. 73 74 The message type handled by the call is determined by its 75 type annotation. 76 """ 77 # TODO: can use types.GenericAlias in 3.9. 78 # (hmm though now that we're there, it seems a drop-in 79 # replace gives us errors. Should re-test in 3.11 as it seems 80 # that typing_extensions handles it differently in that case) 81 from typing import _GenericAlias # type: ignore 82 from typing import get_type_hints, get_args 83 84 sig = inspect.getfullargspec(call) 85 86 # The provided callable should be a method taking one 'msg' arg. 87 expectedsig = ['self', 'msg'] 88 if sig.args != expectedsig: 89 raise ValueError( 90 f'Expected callable signature of {expectedsig};' 91 f' got {sig.args}' 92 ) 93 94 # Check annotation types to determine what message types we handle. 95 # Return-type annotation can be a Union, but we probably don't 96 # have it available at runtime. Explicitly pull it in. 97 # UPDATE: we've updated our pylint filter to where we should 98 # have all annotations available. 99 # anns = get_type_hints(call, localns={'Union': Union}) 100 anns = get_type_hints(call) 101 102 msgtype = anns.get('msg') 103 if not isinstance(msgtype, type): 104 raise TypeError( 105 f'expected a type for "msg" annotation; got {type(msgtype)}.' 106 ) 107 assert issubclass(msgtype, Message) 108 109 ret = anns.get('return') 110 responsetypes: tuple[type[Any] | None, ...] 111 112 # Return types can be a single type or a union of types. 113 if isinstance(ret, (_GenericAlias, types.UnionType)): 114 targs = get_args(ret) 115 if not all(isinstance(a, (type, type(None))) for a in targs): 116 raise TypeError( 117 f'expected only types for "return" annotation;' 118 f' got {targs}.' 119 ) 120 responsetypes = targs 121 else: 122 if not isinstance(ret, (type, type(None))): 123 raise TypeError( 124 f'expected one or more types for' 125 f' "return" annotation; got a {type(ret)}.' 126 ) 127 # This seems like maybe a mypy bug. Appeared after adding 128 # types.UnionType above. 129 responsetypes = (ret,) 130 131 # This will contain NoneType for empty return cases, but 132 # we expect it to be None. 133 # noinspection PyPep8 134 responsetypes = tuple( 135 None if r is type(None) else r for r in responsetypes 136 ) 137 138 # Make sure our protocol has this message type registered and our 139 # return types exactly match. (Technically we could return a subset 140 # of the supported types; can allow this in the future if it makes 141 # sense). 142 registered_types = self.protocol.message_ids_by_type.keys() 143 144 if msgtype not in registered_types: 145 raise TypeError( 146 f'Message type {msgtype} is not registered' 147 f' in this Protocol.' 148 ) 149 150 if msgtype in self._handlers: 151 raise TypeError( 152 f'Message type {msgtype} already has a registered handler.' 153 ) 154 155 # Make sure the responses exactly matches what the message expects. 156 if set(responsetypes) != set(msgtype.get_response_types()): 157 raise TypeError( 158 f'Provided response types {responsetypes} do not' 159 f' match the set expected by message type {msgtype}: ' 160 f'({msgtype.get_response_types()})' 161 ) 162 163 # Ok; we're good! 164 self._handlers[msgtype] = call
Register a handler call.
The message type handled by the call is determined by its type annotation.
166 def decode_filter_method( 167 self, call: Callable[[Any, dict, Message], None] 168 ) -> Callable[[Any, dict, Message], None]: 169 """Function decorator for defining a decode filter. 170 171 Decode filters can be used to extract extra data from incoming 172 message dicts. This version will work for both handle_raw_message() 173 and handle_raw_message_async() 174 """ 175 assert self._decode_filter_call is None 176 self._decode_filter_call = call 177 return call
Function decorator for defining a decode filter.
Decode filters can be used to extract extra data from incoming message dicts. This version will work for both handle_raw_message() and handle_raw_message_async()
179 def encode_filter_method( 180 self, 181 call: Callable[ 182 [Any, Message | None, Response | SysResponse, dict], None 183 ], 184 ) -> Callable[[Any, Message | None, Response, dict], None]: 185 """Function decorator for defining an encode filter. 186 187 Encode filters can be used to add extra data to the message 188 dict before is is encoded to a string and sent out. 189 """ 190 assert self._encode_filter_call is None 191 self._encode_filter_call = call 192 return call
Function decorator for defining an encode filter.
Encode filters can be used to add extra data to the message dict before is is encoded to a string and sent out.
194 def validate(self, log_only: bool = False) -> None: 195 """Check for handler completeness, valid types, etc.""" 196 for msgtype in self.protocol.message_ids_by_type.keys(): 197 if issubclass(msgtype, Response): 198 continue 199 if msgtype not in self._handlers: 200 msg = ( 201 f'Protocol message type {msgtype} is not handled' 202 f' by receiver type {type(self)}.' 203 ) 204 if log_only: 205 logging.error(msg) 206 else: 207 raise TypeError(msg)
Check for handler completeness, valid types, etc.
226 def encode_user_response( 227 self, bound_obj: Any, message: Message, response: Response | None 228 ) -> str: 229 """Encode a response provided by the user for sending.""" 230 231 assert isinstance(response, Response | None) 232 # (user should never explicitly return error-responses) 233 assert ( 234 response is None or type(response) in message.get_response_types() 235 ) 236 237 # A return value of None equals EmptySysResponse. 238 out_response: Response | SysResponse 239 if response is None: 240 out_response = EmptySysResponse() 241 else: 242 out_response = response 243 244 response_dict = self.protocol.response_to_dict(out_response) 245 if self._encode_filter_call is not None: 246 self._encode_filter_call( 247 bound_obj, message, out_response, response_dict 248 ) 249 return self.protocol.encode_dict(response_dict)
Encode a response provided by the user for sending.
251 def encode_error_response( 252 self, bound_obj: Any, message: Message | None, exc: Exception 253 ) -> tuple[str, bool]: 254 """Given an error, return sysresponse str and whether to log.""" 255 response, dolog = self.protocol.error_to_response(exc) 256 response_dict = self.protocol.response_to_dict(response) 257 if self._encode_filter_call is not None: 258 self._encode_filter_call( 259 bound_obj, message, response, response_dict 260 ) 261 return self.protocol.encode_dict(response_dict), dolog
Given an error, return sysresponse str and whether to log.
263 def handle_raw_message( 264 self, bound_obj: Any, msg: str, raise_unregistered: bool = False 265 ) -> str: 266 """Decode, handle, and return an response for a message. 267 268 if 'raise_unregistered' is True, will raise an 269 efro.message.UnregisteredMessageIDError for messages not handled by 270 the protocol. In all other cases local errors will translate to 271 error responses returned to the sender. 272 """ 273 assert not self.is_async, "can't call sync handler on async receiver" 274 msg_decoded: Message | None = None 275 try: 276 msg_decoded = self._decode_incoming_message(bound_obj, msg) 277 msgtype = type(msg_decoded) 278 handler = self._handlers.get(msgtype) 279 if handler is None: 280 raise RuntimeError(f'Got unhandled message type: {msgtype}.') 281 response = handler(bound_obj, msg_decoded) 282 assert isinstance(response, Response | None) 283 return self.encode_user_response(bound_obj, msg_decoded, response) 284 285 except Exception as exc: 286 if raise_unregistered and isinstance( 287 exc, UnregisteredMessageIDError 288 ): 289 raise 290 rstr, dolog = self.encode_error_response( 291 bound_obj, msg_decoded, exc 292 ) 293 if dolog: 294 if msg_decoded is not None: 295 msgtype = type(msg_decoded) 296 logging.exception( 297 'Error handling %s.%s message.', 298 msgtype.__module__, 299 msgtype.__qualname__, 300 ) 301 else: 302 logging.exception( 303 'Error handling raw efro.message' 304 ' (likely a message format incompatibility): %s.', 305 msg, 306 ) 307 return rstr
Decode, handle, and return an response for a message.
if 'raise_unregistered' is True, will raise an efro.message.UnregisteredMessageIDError for messages not handled by the protocol. In all other cases local errors will translate to error responses returned to the sender.
309 def handle_raw_message_async( 310 self, bound_obj: Any, msg: str, raise_unregistered: bool = False 311 ) -> Awaitable[str]: 312 """Should be called when the receiver gets a message. 313 314 The return value is the raw response to the message. 315 """ 316 317 # Note: This call is synchronous so that the first part of it can 318 # happen synchronously. If the whole call were async we wouldn't be 319 # able to guarantee that messages handlers would be called in the 320 # order the messages were received. 321 322 assert self.is_async, "Can't call async handler on sync receiver." 323 msg_decoded: Message | None = None 324 try: 325 msg_decoded = self._decode_incoming_message(bound_obj, msg) 326 msgtype = type(msg_decoded) 327 handler = self._handlers.get(msgtype) 328 if handler is None: 329 raise RuntimeError(f'Got unhandled message type: {msgtype}.') 330 handler_awaitable = handler(bound_obj, msg_decoded) 331 332 except Exception as exc: 333 if raise_unregistered and isinstance( 334 exc, UnregisteredMessageIDError 335 ): 336 raise 337 return self._handle_raw_message_async_error( 338 bound_obj, msg, msg_decoded, exc 339 ) 340 341 # Return an awaitable to handle the rest asynchronously. 342 return self._handle_raw_message_async( 343 bound_obj, msg, msg_decoded, handler_awaitable 344 )
Should be called when the receiver gets a message.
The return value is the raw response to the message.
400class BoundMessageReceiver: 401 """Base bound receiver class.""" 402 403 def __init__( 404 self, 405 obj: Any, 406 receiver: MessageReceiver, 407 ) -> None: 408 assert obj is not None 409 self._obj = obj 410 self._receiver = receiver 411 412 @property 413 def protocol(self) -> MessageProtocol: 414 """Protocol associated with this receiver.""" 415 return self._receiver.protocol 416 417 def encode_error_response(self, exc: Exception) -> str: 418 """Given an error, return a response ready to send. 419 420 This should be used for any errors that happen outside of 421 standard handle_raw_message calls. Any errors within those 422 calls will be automatically returned as encoded strings. 423 """ 424 # Passing None for Message here; we would only have that available 425 # for things going wrong in the handler (which this is not for). 426 return self._receiver.encode_error_response(self._obj, None, exc)[0]
Base bound receiver class.
412 @property 413 def protocol(self) -> MessageProtocol: 414 """Protocol associated with this receiver.""" 415 return self._receiver.protocol
Protocol associated with this receiver.
417 def encode_error_response(self, exc: Exception) -> str: 418 """Given an error, return a response ready to send. 419 420 This should be used for any errors that happen outside of 421 standard handle_raw_message calls. Any errors within those 422 calls will be automatically returned as encoded strings. 423 """ 424 # Passing None for Message here; we would only have that available 425 # for things going wrong in the handler (which this is not for). 426 return self._receiver.encode_error_response(self._obj, None, exc)[0]
Given an error, return a response ready to send.
This should be used for any errors that happen outside of standard handle_raw_message calls. Any errors within those calls will be automatically returned as encoded strings.
18def create_sender_module( 19 basename: str, 20 protocol_create_code: str, 21 enable_sync_sends: bool, 22 enable_async_sends: bool, 23 *, 24 private: bool = False, 25 protocol_module_level_import_code: str | None = None, 26 build_time_protocol_create_code: str | None = None, 27) -> str: 28 """Create a Python module defining a MessageSender subclass. 29 30 This class is primarily for type checking and will contain overrides 31 for the varieties of send calls for message/response types defined 32 in the protocol. 33 34 Code passed for 'protocol_create_code' should import necessary 35 modules and assign an instance of the Protocol to a 'protocol' 36 variable. 37 38 Class names are based on basename; a basename 'FooSender' will 39 result in classes FooSender and BoundFooSender. 40 41 If 'private' is True, class-names will be prefixed with an '_'. 42 43 Note: output code may have long lines and should generally be run 44 through a formatter. We should perhaps move this functionality to 45 efrotools so we can include that functionality inline. 46 """ 47 protocol = _protocol_from_code( 48 build_time_protocol_create_code 49 if build_time_protocol_create_code is not None 50 else protocol_create_code 51 ) 52 return protocol.do_create_sender_module( 53 basename=basename, 54 protocol_create_code=protocol_create_code, 55 enable_sync_sends=enable_sync_sends, 56 enable_async_sends=enable_async_sends, 57 private=private, 58 protocol_module_level_import_code=protocol_module_level_import_code, 59 )
Create a Python module defining a MessageSender subclass.
This class is primarily for type checking and will contain overrides for the varieties of send calls for message/response types defined in the protocol.
Code passed for 'protocol_create_code' should import necessary modules and assign an instance of the Protocol to a 'protocol' variable.
Class names are based on basename; a basename 'FooSender' will result in classes FooSender and BoundFooSender.
If 'private' is True, class-names will be prefixed with an '_'.
Note: output code may have long lines and should generally be run through a formatter. We should perhaps move this functionality to efrotools so we can include that functionality inline.
62def create_receiver_module( 63 basename: str, 64 protocol_create_code: str, 65 is_async: bool, 66 *, 67 private: bool = False, 68 protocol_module_level_import_code: str | None = None, 69 build_time_protocol_create_code: str | None = None, 70) -> str: 71 """ "Create a Python module defining a MessageReceiver subclass. 72 73 This class is primarily for type checking and will contain overrides 74 for the register method for message/response types defined in 75 the protocol. 76 77 Class names are based on basename; a basename 'FooReceiver' will 78 result in FooReceiver and BoundFooReceiver. 79 80 If 'is_async' is True, handle_raw_message() will be an async method 81 and the @handler decorator will expect async methods. 82 83 If 'private' is True, class-names will be prefixed with an '_'. 84 85 Note that line lengths are not clipped, so output may need to be 86 run through a formatter to prevent lint warnings about excessive 87 line lengths. 88 """ 89 protocol = _protocol_from_code( 90 build_time_protocol_create_code 91 if build_time_protocol_create_code is not None 92 else protocol_create_code 93 ) 94 return protocol.do_create_receiver_module( 95 basename=basename, 96 protocol_create_code=protocol_create_code, 97 is_async=is_async, 98 private=private, 99 protocol_module_level_import_code=protocol_module_level_import_code, 100 )
"Create a Python module defining a MessageReceiver subclass.
This class is primarily for type checking and will contain overrides for the register method for message/response types defined in the protocol.
Class names are based on basename; a basename 'FooReceiver' will result in FooReceiver and BoundFooReceiver.
If 'is_async' is True, handle_raw_message() will be an async method and the @handler decorator will expect async methods.
If 'private' is True, class-names will be prefixed with an '_'.
Note that line lengths are not clipped, so output may need to be run through a formatter to prevent lint warnings about excessive line lengths.
20class UnregisteredMessageIDError(Exception): 21 """A message or response id is not covered by our protocol."""
A message or response id is not covered by our protocol.
Inherited Members
- builtins.Exception
- Exception
- builtins.BaseException
- with_traceback
- add_note
- args