efro.message
Functionality for sending and responding to messages. Supports static typing for message types and possible return types.
1# Released under the MIT License. See LICENSE for details. 2# 3"""Functionality for sending and responding to messages. 4Supports static typing for message types and possible return types. 5""" 6 7from __future__ import annotations 8 9from efro.util import set_canonical_module_names 10from efro.message._protocol import MessageProtocol 11from efro.message._sender import MessageSender, BoundMessageSender 12from efro.message._receiver import MessageReceiver, BoundMessageReceiver 13from efro.message._module import create_sender_module, create_receiver_module 14from efro.message._message import ( 15 Message, 16 Response, 17 SysResponse, 18 EmptySysResponse, 19 ErrorSysResponse, 20 StringResponse, 21 BoolResponse, 22 UnregisteredMessageIDError, 23) 24 25__all__ = [ 26 'Message', 27 'Response', 28 'SysResponse', 29 'EmptySysResponse', 30 'ErrorSysResponse', 31 'StringResponse', 32 'BoolResponse', 33 'MessageProtocol', 34 'MessageSender', 35 'BoundMessageSender', 36 'MessageReceiver', 37 'BoundMessageReceiver', 38 'create_sender_module', 39 'create_receiver_module', 40 'UnregisteredMessageIDError', 41] 42 43# Have these things present themselves cleanly as 'thismodule.SomeClass' 44# instead of 'thismodule._internalmodule.SomeClass' 45set_canonical_module_names(globals())
24class Message: 25 """Base class for messages.""" 26 27 @classmethod 28 def get_response_types(cls) -> list[type[Response] | None]: 29 """Return all Response types this Message can return when sent. 30 31 The default implementation specifies a None return type. 32 """ 33 return [None]
Base class for messages.
27 @classmethod 28 def get_response_types(cls) -> list[type[Response] | None]: 29 """Return all Response types this Message can return when sent. 30 31 The default implementation specifies a None return type. 32 """ 33 return [None]
Return all Response types this Message can return when sent.
The default implementation specifies a None return type.
Base class for responses to messages.
40class SysResponse: 41 """Base class for system-responses to messages. 42 43 These are only sent/handled by the messaging system itself; 44 users of the api never see them. 45 """ 46 47 def set_local_exception(self, exc: Exception) -> None: 48 """Attach a local exception to facilitate better logging/handling. 49 50 Be aware that this data does not get serialized and only 51 exists on the local object. 52 """ 53 setattr(self, '_sr_local_exception', exc) 54 55 def get_local_exception(self) -> Exception | None: 56 """Fetch a local attached exception.""" 57 value = getattr(self, '_sr_local_exception', None) 58 assert isinstance(value, Exception | None) 59 return value
Base class for system-responses to messages.
These are only sent/handled by the messaging system itself; users of the api never see them.
47 def set_local_exception(self, exc: Exception) -> None: 48 """Attach a local exception to facilitate better logging/handling. 49 50 Be aware that this data does not get serialized and only 51 exists on the local object. 52 """ 53 setattr(self, '_sr_local_exception', exc)
Attach a local exception to facilitate better logging/handling.
Be aware that this data does not get serialized and only exists on the local object.
86@ioprepped 87@dataclass 88class EmptySysResponse(SysResponse): 89 """The response equivalent of None."""
The response equivalent of None.
65@ioprepped 66@dataclass 67class ErrorSysResponse(SysResponse): 68 """SysResponse saying some error has occurred for the send. 69 70 This generally results in an Exception being raised for the caller. 71 """ 72 73 class ErrorType(Enum): 74 """Type of error that occurred while sending a message.""" 75 76 REMOTE = 0 77 REMOTE_CLEAN = 1 78 LOCAL = 2 79 COMMUNICATION = 3 80 REMOTE_COMMUNICATION = 4 81 82 error_message: Annotated[str, IOAttrs('m')] 83 error_type: Annotated[ErrorType, IOAttrs('e')] = ErrorType.REMOTE
SysResponse saying some error has occurred for the send.
This generally results in an Exception being raised for the caller.
73 class ErrorType(Enum): 74 """Type of error that occurred while sending a message.""" 75 76 REMOTE = 0 77 REMOTE_CLEAN = 1 78 LOCAL = 2 79 COMMUNICATION = 3 80 REMOTE_COMMUNICATION = 4
Type of error that occurred while sending a message.
104@ioprepped 105@dataclass 106class StringResponse(Response): 107 """A simple string value response.""" 108 109 value: Annotated[str, IOAttrs('v')]
A simple string value response.
96@ioprepped 97@dataclass 98class BoolResponse(Response): 99 """A simple bool value response.""" 100 101 value: Annotated[bool, IOAttrs('v')]
A simple bool value response.
33class MessageProtocol: 34 """Wrangles a set of message types, formats, and response types. 35 Both endpoints must be using a compatible Protocol for communication 36 to succeed. To maintain Protocol compatibility between revisions, 37 all message types must retain the same id, message attr storage 38 names must not change, newly added attrs must have default values, 39 etc. 40 """ 41 42 def __init__( 43 self, 44 message_types: dict[int, type[Message]], 45 response_types: dict[int, type[Response]], 46 *, 47 forward_communication_errors: bool = False, 48 forward_clean_errors: bool = False, 49 remote_errors_include_stack_traces: bool = False, 50 log_errors_on_receiver: bool = True, 51 log_response_decode_errors: bool = True, 52 ) -> None: 53 """Create a protocol with a given configuration. 54 55 If 'forward_communication_errors' is True, 56 efro.error.CommunicationErrors raised on the receiver end will 57 result in a matching error raised back on the sender. This can 58 be useful if the receiver will be in some way forwarding 59 messages along and the sender doesn't need to know where 60 communication breakdowns occurred; only that they did. 61 62 If 'forward_clean_errors' is True, efro.error.CleanError 63 exceptions raised on the receiver end will result in a matching 64 CleanError raised back on the sender. 65 66 When an exception is not covered by the optional forwarding 67 mechanisms above, it will come across as efro.error.RemoteError 68 and the exception will be logged on the receiver end - at least 69 by default (see details below). 70 71 If 'remote_errors_include_stack_traces' is True, stringified 72 stack traces will be returned with efro.error.RemoteError 73 exceptions. This is useful for debugging but should only be 74 enabled in cases where the sender is trusted to see internal 75 details of the receiver. 76 77 By default, when a message-handling exception will result in an 78 efro.error.RemoteError being returned to the sender, the 79 exception will be logged on the receiver. This is because the 80 goal is usually to avoid returning opaque RemoteErrors and to 81 instead return something meaningful as part of the expected 82 response type (even if that value itself represents a logical 83 error state). If 'log_errors_on_receiver' is False, however, 84 such exceptions will *not* be logged on the receiver. This can 85 be useful in combination with 86 'remote_errors_include_stack_traces' and 'forward_clean_errors' 87 in situations where all error logging/management will be 88 happening on the sender end. Be aware, however, that in that 89 case it may be possible for communication errors to prevent some 90 errors from ever being acknowledged. 91 92 If an error occurs when decoding a message response, a 93 RuntimeError is generated locally. However, in practice it is 94 likely for such errors to be silently ignored by message 95 handling code alongside more common communication type errors, 96 meaning serious protocol breakage could go unnoticed. To avoid 97 this, a log message is also printed in such cases. Pass 98 'log_response_decode_errors' as False to disable this logging. 99 """ 100 # pylint: disable=too-many-locals 101 self.message_types_by_id: dict[int, type[Message]] = {} 102 self.message_ids_by_type: dict[type[Message], int] = {} 103 self.response_types_by_id: dict[ 104 int, type[Response] | type[SysResponse] 105 ] = {} 106 self.response_ids_by_type: dict[ 107 type[Response] | type[SysResponse], int 108 ] = {} 109 for m_id, m_type in message_types.items(): 110 # Make sure only valid message types were passed and each 111 # id was assigned only once. 112 assert isinstance(m_id, int) 113 assert m_id >= 0 114 assert is_ioprepped_dataclass(m_type) and issubclass( 115 m_type, Message 116 ) 117 assert self.message_types_by_id.get(m_id) is None 118 self.message_types_by_id[m_id] = m_type 119 self.message_ids_by_type[m_type] = m_id 120 121 for r_id, r_type in response_types.items(): 122 assert isinstance(r_id, int) 123 assert r_id >= 0 124 assert is_ioprepped_dataclass(r_type) and issubclass( 125 r_type, Response 126 ) 127 assert self.response_types_by_id.get(r_id) is None 128 self.response_types_by_id[r_id] = r_type 129 self.response_ids_by_type[r_type] = r_id 130 131 # Register our SysResponse types. These use negative 132 # IDs so as to never overlap with user Response types. 133 def _reg_sys(reg_tp: type[SysResponse], reg_id: int) -> None: 134 assert self.response_types_by_id.get(reg_id) is None 135 self.response_types_by_id[reg_id] = reg_tp 136 self.response_ids_by_type[reg_tp] = reg_id 137 138 _reg_sys(ErrorSysResponse, -1) 139 _reg_sys(EmptySysResponse, -2) 140 141 # Some extra-thorough validation in debug mode. 142 if __debug__: 143 # Make sure all Message types' return types are valid 144 # and have been assigned an ID as well. 145 all_response_types: set[type[Response] | None] = set() 146 for m_id, m_type in message_types.items(): 147 m_rtypes = m_type.get_response_types() 148 149 assert isinstance(m_rtypes, list) 150 assert ( 151 m_rtypes 152 ), f'Message type {m_type} specifies no return types.' 153 assert len(set(m_rtypes)) == len(m_rtypes) # check dups 154 for m_rtype in m_rtypes: 155 all_response_types.add(m_rtype) 156 for cls in all_response_types: 157 if cls is None: 158 continue 159 assert is_ioprepped_dataclass(cls) 160 assert issubclass(cls, Response) 161 if cls not in self.response_ids_by_type: 162 raise ValueError( 163 f'Possible response type {cls} needs to be included' 164 f' in response_types for this protocol.' 165 ) 166 167 # Make sure all registered types have unique base names. 168 # We can take advantage of this to generate cleaner looking 169 # protocol modules. Can revisit if this is ever a problem. 170 mtypenames = set(tp.__name__ for tp in self.message_ids_by_type) 171 if len(mtypenames) != len(message_types): 172 raise ValueError( 173 'message_types contains duplicate __name__s;' 174 ' all types are required to have unique names.' 175 ) 176 177 self.forward_clean_errors = forward_clean_errors 178 self.forward_communication_errors = forward_communication_errors 179 self.remote_errors_include_stack_traces = ( 180 remote_errors_include_stack_traces 181 ) 182 self.log_errors_on_receiver = log_errors_on_receiver 183 self.log_response_decode_errors = log_response_decode_errors 184 185 @staticmethod 186 def encode_dict(obj: dict) -> str: 187 """Json-encode a provided dict.""" 188 return json.dumps(obj, separators=(',', ':')) 189 190 def message_to_dict(self, message: Message) -> dict: 191 """Encode a message to a json ready dict.""" 192 return self._to_dict(message, self.message_ids_by_type, 'message') 193 194 def response_to_dict(self, response: Response | SysResponse) -> dict: 195 """Encode a response to a json ready dict.""" 196 return self._to_dict(response, self.response_ids_by_type, 'response') 197 198 def error_to_response(self, exc: Exception) -> tuple[SysResponse, bool]: 199 """Translate an Exception to a SysResponse. 200 201 Also returns whether the error should be logged if this happened 202 within handle_raw_message(). 203 """ 204 205 # If anything goes wrong, return a ErrorSysResponse instead. 206 # (either CLEAN or generic REMOTE) 207 if self.forward_clean_errors and isinstance(exc, CleanError): 208 return ( 209 ErrorSysResponse( 210 error_message=str(exc), 211 error_type=ErrorSysResponse.ErrorType.REMOTE_CLEAN, 212 ), 213 False, 214 ) 215 if self.forward_communication_errors and isinstance( 216 exc, CommunicationError 217 ): 218 return ( 219 ErrorSysResponse( 220 error_message=str(exc), 221 error_type=ErrorSysResponse.ErrorType.REMOTE_COMMUNICATION, 222 ), 223 False, 224 ) 225 return ( 226 ErrorSysResponse( 227 error_message=( 228 # Note: need to format exception ourself here; it 229 # might not be current so we can't use 230 # traceback.format_exc(). 231 ''.join( 232 traceback.format_exception( 233 type(exc), exc, exc.__traceback__ 234 ) 235 ) 236 if self.remote_errors_include_stack_traces 237 else 'An internal error has occurred.' 238 ), 239 error_type=ErrorSysResponse.ErrorType.REMOTE, 240 ), 241 self.log_errors_on_receiver, 242 ) 243 244 def _to_dict( 245 self, message: Any, ids_by_type: dict[type, int], opname: str 246 ) -> dict: 247 """Encode a message to a json string for transport.""" 248 249 m_id: int | None = ids_by_type.get(type(message)) 250 if m_id is None: 251 raise TypeError( 252 f'{opname} type is not registered in protocol:' 253 f' {type(message)}' 254 ) 255 out = {'t': m_id, 'm': dataclass_to_dict(message)} 256 return out 257 258 @staticmethod 259 def decode_dict(data: str) -> dict: 260 """Decode data to a dict.""" 261 out = json.loads(data) 262 assert isinstance(out, dict) 263 return out 264 265 def message_from_dict(self, data: dict) -> Message: 266 """Decode a message from a dict.""" 267 out = self._from_dict(data, self.message_types_by_id, 'message') 268 assert isinstance(out, Message) 269 return out 270 271 def response_from_dict(self, data: dict) -> Response | SysResponse: 272 """Decode a response from a json string.""" 273 out = self._from_dict(data, self.response_types_by_id, 'response') 274 assert isinstance(out, Response | SysResponse) 275 return out 276 277 # Weeeird; we get mypy errors returning dict[int, type] but 278 # dict[int, typing.Type] or dict[int, type[Any]] works.. 279 def _from_dict( 280 self, data: dict, types_by_id: dict[int, type[Any]], opname: str 281 ) -> Any: 282 """Decode a message from a json string.""" 283 msgdict: dict | None 284 285 m_id = data.get('t') 286 # Allow omitting 'm' dict if its empty. 287 msgdict = data.get('m', {}) 288 289 assert isinstance(m_id, int) 290 assert isinstance(msgdict, dict) 291 292 # Decode this particular type. 293 msgtype = types_by_id.get(m_id) 294 if msgtype is None: 295 raise UnregisteredMessageIDError( 296 f'Got unregistered {opname} id of {m_id}.' 297 ) 298 return dataclass_from_dict(msgtype, msgdict) 299 300 def _get_module_header( 301 self, 302 part: Literal['sender', 'receiver'], 303 extra_import_code: str | None, 304 enable_async_sends: bool, 305 ) -> str: 306 """Return common parts of generated modules.""" 307 # pylint: disable=too-many-locals 308 # pylint: disable=too-many-branches 309 # pylint: disable=too-many-statements 310 import textwrap 311 312 tpimports: dict[str, list[str]] = {} 313 imports: dict[str, list[str]] = {} 314 315 single_message_type = len(self.message_ids_by_type) == 1 316 317 msgtypes = list(self.message_ids_by_type) 318 if part == 'sender': 319 msgtypes.append(Message) 320 for msgtype in msgtypes: 321 tpimports.setdefault(msgtype.__module__, []).append( 322 msgtype.__name__ 323 ) 324 rsptypes = list(self.response_ids_by_type) 325 if part == 'sender': 326 rsptypes.append(Response) 327 for rsp_tp in rsptypes: 328 # Skip these as they don't actually show up in code. 329 if rsp_tp is EmptySysResponse or rsp_tp is ErrorSysResponse: 330 continue 331 if ( 332 single_message_type 333 and part == 'sender' 334 and rsp_tp is not Response 335 ): 336 # We need to cast to the single supported response type 337 # in this case so need response types at runtime. 338 imports.setdefault(rsp_tp.__module__, []).append( 339 rsp_tp.__name__ 340 ) 341 else: 342 tpimports.setdefault(rsp_tp.__module__, []).append( 343 rsp_tp.__name__ 344 ) 345 346 import_lines = '' 347 tpimport_lines = '' 348 349 for module, names in sorted(imports.items()): 350 jnames = ', '.join(names) 351 line = f'from {module} import {jnames}' 352 if len(line) > 80: 353 # Recreate in a wrapping-friendly form. 354 line = f'from {module} import ({jnames})' 355 import_lines += f'{line}\n' 356 for module, names in sorted(tpimports.items()): 357 jnames = ', '.join(names) 358 line = f'from {module} import {jnames}' 359 if len(line) > 75: # Account for indent 360 # Recreate in a wrapping-friendly form. 361 line = f'from {module} import ({jnames})' 362 tpimport_lines += f'{line}\n' 363 364 if part == 'sender': 365 import_lines += ( 366 'from efro.message import MessageSender, BoundMessageSender\n' 367 ) 368 tpimport_typing_extras = '' 369 else: 370 if single_message_type: 371 import_lines += ( 372 'from efro.message import (MessageReceiver,' 373 ' BoundMessageReceiver, Message, Response)\n' 374 ) 375 else: 376 import_lines += ( 377 'from efro.message import MessageReceiver,' 378 ' BoundMessageReceiver\n' 379 ) 380 tpimport_typing_extras = ', Awaitable' 381 382 if extra_import_code is not None: 383 import_lines += extra_import_code 384 385 ovld = ', overload' if not single_message_type else '' 386 ovld2 = ( 387 ', cast, Awaitable' 388 if (single_message_type and part == 'sender' and enable_async_sends) 389 else '' 390 ) 391 tpimport_lines = textwrap.indent(tpimport_lines, ' ') 392 393 baseimps = ['Any'] 394 if part == 'receiver': 395 baseimps.append('Callable') 396 if part == 'sender' and enable_async_sends: 397 baseimps.append('Awaitable') 398 baseimps_s = ', '.join(baseimps) 399 out = ( 400 '# Released under the MIT License. See LICENSE for details.\n' 401 f'#\n' 402 f'"""Auto-generated {part} module. Do not edit by hand."""\n' 403 f'\n' 404 f'from __future__ import annotations\n' 405 f'\n' 406 f'from typing import TYPE_CHECKING{ovld}{ovld2}\n' 407 f'\n' 408 f'{import_lines}' 409 f'\n' 410 f'if TYPE_CHECKING:\n' 411 f' from typing import {baseimps_s}' 412 f'{tpimport_typing_extras}\n' 413 f'{tpimport_lines}' 414 f'\n' 415 f'\n' 416 ) 417 return out 418 419 def do_create_sender_module( 420 self, 421 basename: str, 422 protocol_create_code: str, 423 enable_sync_sends: bool, 424 enable_async_sends: bool, 425 private: bool = False, 426 protocol_module_level_import_code: str | None = None, 427 ) -> str: 428 """Used by create_sender_module(); do not call directly.""" 429 # pylint: disable=too-many-positional-arguments 430 # pylint: disable=too-many-locals 431 # pylint: disable=too-many-branches 432 import textwrap 433 434 msgtypes = list(self.message_ids_by_type.keys()) 435 436 ppre = '_' if private else '' 437 out = self._get_module_header( 438 'sender', 439 extra_import_code=protocol_module_level_import_code, 440 enable_async_sends=enable_async_sends, 441 ) 442 ccind = textwrap.indent(protocol_create_code, ' ') 443 out += ( 444 f'class {ppre}{basename}(MessageSender):\n' 445 f' """Protocol-specific sender."""\n' 446 f'\n' 447 f' def __init__(self) -> None:\n' 448 f'{ccind}\n' 449 f' super().__init__(protocol)\n' 450 f'\n' 451 f' def __get__(\n' 452 f' self, obj: Any, type_in: Any = None\n' 453 f' ) -> {ppre}Bound{basename}:\n' 454 f' return {ppre}Bound{basename}(obj, self)\n' 455 f'\n' 456 f'\n' 457 f'class {ppre}Bound{basename}(BoundMessageSender):\n' 458 f' """Protocol-specific bound sender."""\n' 459 ) 460 461 def _filt_tp_name(rtype: type[Response] | None) -> str: 462 return 'None' if rtype is None else rtype.__name__ 463 464 # Define handler() overloads for all registered message types. 465 if msgtypes: 466 for async_pass in False, True: 467 if async_pass and not enable_async_sends: 468 continue 469 if not async_pass and not enable_sync_sends: 470 continue 471 pfx = 'async ' if async_pass else '' 472 sfx = '_async' if async_pass else '' 473 # awt = 'await ' if async_pass else '' 474 awt = '' 475 how = 'asynchronously' if async_pass else 'synchronously' 476 477 if len(msgtypes) == 1: 478 # Special case: with a single message types we don't 479 # use overloads. 480 msgtype = msgtypes[0] 481 msgtypevar = msgtype.__name__ 482 rtypes = msgtype.get_response_types() 483 if len(rtypes) > 1: 484 rtypevar = ' | '.join(_filt_tp_name(t) for t in rtypes) 485 else: 486 rtypevar = _filt_tp_name(rtypes[0]) 487 if async_pass: 488 rtypevar = f'Awaitable[{rtypevar}]' 489 out += ( 490 f'\n' 491 f' def send{sfx}(self,' 492 f' message: {msgtypevar})' 493 f' -> {rtypevar}:\n' 494 f' """Send a message {how}."""\n' 495 f' out = {awt}self._sender.' 496 f'send{sfx}(self._obj, message)\n' 497 ) 498 if not async_pass: 499 out += ( 500 f' assert isinstance(out, {rtypevar})\n' 501 ' return out\n' 502 ) 503 else: 504 out += f' return cast({rtypevar}, out)\n' 505 506 else: 507 for msgtype in msgtypes: 508 msgtypevar = msgtype.__name__ 509 rtypes = msgtype.get_response_types() 510 if len(rtypes) > 1: 511 rtypevar = ' | '.join( 512 _filt_tp_name(t) for t in rtypes 513 ) 514 else: 515 rtypevar = _filt_tp_name(rtypes[0]) 516 out += ( 517 f'\n' 518 f' @overload\n' 519 f' {pfx}def send{sfx}(self,' 520 f' message: {msgtypevar})' 521 f' -> {rtypevar}: ...\n' 522 ) 523 rtypevar = 'Response | None' 524 if async_pass: 525 rtypevar = f'Awaitable[{rtypevar}]' 526 out += ( 527 f'\n' 528 f' def send{sfx}(self, message: Message)' 529 f' -> {rtypevar}:\n' 530 f' """Send a message {how}."""\n' 531 f' return {awt}self._sender.' 532 f'send{sfx}(self._obj, message)\n' 533 ) 534 535 return out 536 537 def do_create_receiver_module( 538 self, 539 basename: str, 540 protocol_create_code: str, 541 is_async: bool, 542 private: bool = False, 543 protocol_module_level_import_code: str | None = None, 544 ) -> str: 545 """Used by create_receiver_module(); do not call directly.""" 546 # pylint: disable=too-many-locals 547 # pylint: disable=too-many-positional-arguments 548 import textwrap 549 550 desc = 'asynchronous' if is_async else 'synchronous' 551 ppre = '_' if private else '' 552 msgtypes = list(self.message_ids_by_type.keys()) 553 out = self._get_module_header( 554 'receiver', 555 extra_import_code=protocol_module_level_import_code, 556 enable_async_sends=False, 557 ) 558 ccind = textwrap.indent(protocol_create_code, ' ') 559 out += ( 560 f'class {ppre}{basename}(MessageReceiver):\n' 561 f' """Protocol-specific {desc} receiver."""\n' 562 f'\n' 563 f' is_async = {is_async}\n' 564 f'\n' 565 f' def __init__(self) -> None:\n' 566 f'{ccind}\n' 567 f' super().__init__(protocol)\n' 568 f'\n' 569 f' def __get__(\n' 570 f' self,\n' 571 f' obj: Any,\n' 572 f' type_in: Any = None,\n' 573 f' ) -> {ppre}Bound{basename}:\n' 574 f' return {ppre}Bound{basename}(' 575 f'obj, self)\n' 576 ) 577 578 # Define handler() overloads for all registered message types. 579 580 def _filt_tp_name(rtype: type[Response] | None) -> str: 581 return 'None' if rtype is None else rtype.__name__ 582 583 if msgtypes: 584 cbgn = 'Awaitable[' if is_async else '' 585 cend = ']' if is_async else '' 586 if len(msgtypes) == 1: 587 # Special case: when we have a single message type we don't 588 # use overloads. 589 msgtype = msgtypes[0] 590 msgtypevar = msgtype.__name__ 591 rtypes = msgtype.get_response_types() 592 if len(rtypes) > 1: 593 rtypevar = ' | '.join(_filt_tp_name(t) for t in rtypes) 594 else: 595 rtypevar = _filt_tp_name(rtypes[0]) 596 rtypevar = f'{cbgn}{rtypevar}{cend}' 597 out += ( 598 f'\n' 599 f' def handler(\n' 600 f' self,\n' 601 f' call: Callable[[Any, {msgtypevar}], ' 602 f'{rtypevar}],\n' 603 f' )' 604 f' -> Callable[[Any, {msgtypevar}], {rtypevar}]:\n' 605 f' """Decorator to register message handlers."""\n' 606 f' from typing import cast, Callable, Any\n' 607 f'\n' 608 f' self.register_handler(cast(Callable' 609 f'[[Any, Message], Response], call))\n' 610 f' return call\n' 611 ) 612 else: 613 for msgtype in msgtypes: 614 msgtypevar = msgtype.__name__ 615 rtypes = msgtype.get_response_types() 616 if len(rtypes) > 1: 617 rtypevar = ' | '.join(_filt_tp_name(t) for t in rtypes) 618 else: 619 rtypevar = _filt_tp_name(rtypes[0]) 620 rtypevar = f'{cbgn}{rtypevar}{cend}' 621 out += ( 622 f'\n' 623 f' @overload\n' 624 f' def handler(\n' 625 f' self,\n' 626 f' call: Callable[[Any, {msgtypevar}], ' 627 f'{rtypevar}],\n' 628 f' )' 629 f' -> Callable[[Any, {msgtypevar}], {rtypevar}]: ...\n' 630 ) 631 out += ( 632 '\n' 633 ' def handler(self, call: Callable) -> Callable:\n' 634 ' """Decorator to register message handlers."""\n' 635 ' self.register_handler(call)\n' 636 ' return call\n' 637 ) 638 639 out += ( 640 f'\n' 641 f'\n' 642 f'class {ppre}Bound{basename}(BoundMessageReceiver):\n' 643 f' """Protocol-specific bound receiver."""\n' 644 ) 645 if is_async: 646 out += ( 647 '\n' 648 ' def handle_raw_message(\n' 649 ' self, message: str, raise_unregistered: bool = False\n' 650 ' ) -> Awaitable[str]:\n' 651 ' """Asynchronously handle a raw incoming message."""\n' 652 ' return self._receiver.' 653 'handle_raw_message_async(\n' 654 ' self._obj, message, raise_unregistered\n' 655 ' )\n' 656 ) 657 658 else: 659 out += ( 660 '\n' 661 ' def handle_raw_message(\n' 662 ' self, message: str, raise_unregistered: bool = False\n' 663 ' ) -> str:\n' 664 ' """Synchronously handle a raw incoming message."""\n' 665 ' return self._receiver.handle_raw_message(\n' 666 ' self._obj, message, raise_unregistered\n' 667 ' )\n' 668 ) 669 670 return out
Wrangles a set of message types, formats, and response types. Both endpoints must be using a compatible Protocol for communication to succeed. To maintain Protocol compatibility between revisions, all message types must retain the same id, message attr storage names must not change, newly added attrs must have default values, etc.
42 def __init__( 43 self, 44 message_types: dict[int, type[Message]], 45 response_types: dict[int, type[Response]], 46 *, 47 forward_communication_errors: bool = False, 48 forward_clean_errors: bool = False, 49 remote_errors_include_stack_traces: bool = False, 50 log_errors_on_receiver: bool = True, 51 log_response_decode_errors: bool = True, 52 ) -> None: 53 """Create a protocol with a given configuration. 54 55 If 'forward_communication_errors' is True, 56 efro.error.CommunicationErrors raised on the receiver end will 57 result in a matching error raised back on the sender. This can 58 be useful if the receiver will be in some way forwarding 59 messages along and the sender doesn't need to know where 60 communication breakdowns occurred; only that they did. 61 62 If 'forward_clean_errors' is True, efro.error.CleanError 63 exceptions raised on the receiver end will result in a matching 64 CleanError raised back on the sender. 65 66 When an exception is not covered by the optional forwarding 67 mechanisms above, it will come across as efro.error.RemoteError 68 and the exception will be logged on the receiver end - at least 69 by default (see details below). 70 71 If 'remote_errors_include_stack_traces' is True, stringified 72 stack traces will be returned with efro.error.RemoteError 73 exceptions. This is useful for debugging but should only be 74 enabled in cases where the sender is trusted to see internal 75 details of the receiver. 76 77 By default, when a message-handling exception will result in an 78 efro.error.RemoteError being returned to the sender, the 79 exception will be logged on the receiver. This is because the 80 goal is usually to avoid returning opaque RemoteErrors and to 81 instead return something meaningful as part of the expected 82 response type (even if that value itself represents a logical 83 error state). If 'log_errors_on_receiver' is False, however, 84 such exceptions will *not* be logged on the receiver. This can 85 be useful in combination with 86 'remote_errors_include_stack_traces' and 'forward_clean_errors' 87 in situations where all error logging/management will be 88 happening on the sender end. Be aware, however, that in that 89 case it may be possible for communication errors to prevent some 90 errors from ever being acknowledged. 91 92 If an error occurs when decoding a message response, a 93 RuntimeError is generated locally. However, in practice it is 94 likely for such errors to be silently ignored by message 95 handling code alongside more common communication type errors, 96 meaning serious protocol breakage could go unnoticed. To avoid 97 this, a log message is also printed in such cases. Pass 98 'log_response_decode_errors' as False to disable this logging. 99 """ 100 # pylint: disable=too-many-locals 101 self.message_types_by_id: dict[int, type[Message]] = {} 102 self.message_ids_by_type: dict[type[Message], int] = {} 103 self.response_types_by_id: dict[ 104 int, type[Response] | type[SysResponse] 105 ] = {} 106 self.response_ids_by_type: dict[ 107 type[Response] | type[SysResponse], int 108 ] = {} 109 for m_id, m_type in message_types.items(): 110 # Make sure only valid message types were passed and each 111 # id was assigned only once. 112 assert isinstance(m_id, int) 113 assert m_id >= 0 114 assert is_ioprepped_dataclass(m_type) and issubclass( 115 m_type, Message 116 ) 117 assert self.message_types_by_id.get(m_id) is None 118 self.message_types_by_id[m_id] = m_type 119 self.message_ids_by_type[m_type] = m_id 120 121 for r_id, r_type in response_types.items(): 122 assert isinstance(r_id, int) 123 assert r_id >= 0 124 assert is_ioprepped_dataclass(r_type) and issubclass( 125 r_type, Response 126 ) 127 assert self.response_types_by_id.get(r_id) is None 128 self.response_types_by_id[r_id] = r_type 129 self.response_ids_by_type[r_type] = r_id 130 131 # Register our SysResponse types. These use negative 132 # IDs so as to never overlap with user Response types. 133 def _reg_sys(reg_tp: type[SysResponse], reg_id: int) -> None: 134 assert self.response_types_by_id.get(reg_id) is None 135 self.response_types_by_id[reg_id] = reg_tp 136 self.response_ids_by_type[reg_tp] = reg_id 137 138 _reg_sys(ErrorSysResponse, -1) 139 _reg_sys(EmptySysResponse, -2) 140 141 # Some extra-thorough validation in debug mode. 142 if __debug__: 143 # Make sure all Message types' return types are valid 144 # and have been assigned an ID as well. 145 all_response_types: set[type[Response] | None] = set() 146 for m_id, m_type in message_types.items(): 147 m_rtypes = m_type.get_response_types() 148 149 assert isinstance(m_rtypes, list) 150 assert ( 151 m_rtypes 152 ), f'Message type {m_type} specifies no return types.' 153 assert len(set(m_rtypes)) == len(m_rtypes) # check dups 154 for m_rtype in m_rtypes: 155 all_response_types.add(m_rtype) 156 for cls in all_response_types: 157 if cls is None: 158 continue 159 assert is_ioprepped_dataclass(cls) 160 assert issubclass(cls, Response) 161 if cls not in self.response_ids_by_type: 162 raise ValueError( 163 f'Possible response type {cls} needs to be included' 164 f' in response_types for this protocol.' 165 ) 166 167 # Make sure all registered types have unique base names. 168 # We can take advantage of this to generate cleaner looking 169 # protocol modules. Can revisit if this is ever a problem. 170 mtypenames = set(tp.__name__ for tp in self.message_ids_by_type) 171 if len(mtypenames) != len(message_types): 172 raise ValueError( 173 'message_types contains duplicate __name__s;' 174 ' all types are required to have unique names.' 175 ) 176 177 self.forward_clean_errors = forward_clean_errors 178 self.forward_communication_errors = forward_communication_errors 179 self.remote_errors_include_stack_traces = ( 180 remote_errors_include_stack_traces 181 ) 182 self.log_errors_on_receiver = log_errors_on_receiver 183 self.log_response_decode_errors = log_response_decode_errors
Create a protocol with a given configuration.
If 'forward_communication_errors' is True, efro.error.CommunicationErrors raised on the receiver end will result in a matching error raised back on the sender. This can be useful if the receiver will be in some way forwarding messages along and the sender doesn't need to know where communication breakdowns occurred; only that they did.
If 'forward_clean_errors' is True, efro.error.CleanError exceptions raised on the receiver end will result in a matching CleanError raised back on the sender.
When an exception is not covered by the optional forwarding mechanisms above, it will come across as efro.error.RemoteError and the exception will be logged on the receiver end - at least by default (see details below).
If 'remote_errors_include_stack_traces' is True, stringified stack traces will be returned with efro.error.RemoteError exceptions. This is useful for debugging but should only be enabled in cases where the sender is trusted to see internal details of the receiver.
By default, when a message-handling exception will result in an efro.error.RemoteError being returned to the sender, the exception will be logged on the receiver. This is because the goal is usually to avoid returning opaque RemoteErrors and to instead return something meaningful as part of the expected response type (even if that value itself represents a logical error state). If 'log_errors_on_receiver' is False, however, such exceptions will *not* be logged on the receiver. This can be useful in combination with 'remote_errors_include_stack_traces' and 'forward_clean_errors' in situations where all error logging/management will be happening on the sender end. Be aware, however, that in that case it may be possible for communication errors to prevent some errors from ever being acknowledged.
If an error occurs when decoding a message response, a RuntimeError is generated locally. However, in practice it is likely for such errors to be silently ignored by message handling code alongside more common communication type errors, meaning serious protocol breakage could go unnoticed. To avoid this, a log message is also printed in such cases. Pass 'log_response_decode_errors' as False to disable this logging.
185 @staticmethod 186 def encode_dict(obj: dict) -> str: 187 """Json-encode a provided dict.""" 188 return json.dumps(obj, separators=(',', ':'))
Json-encode a provided dict.
190 def message_to_dict(self, message: Message) -> dict: 191 """Encode a message to a json ready dict.""" 192 return self._to_dict(message, self.message_ids_by_type, 'message')
Encode a message to a json ready dict.
194 def response_to_dict(self, response: Response | SysResponse) -> dict: 195 """Encode a response to a json ready dict.""" 196 return self._to_dict(response, self.response_ids_by_type, 'response')
Encode a response to a json ready dict.
198 def error_to_response(self, exc: Exception) -> tuple[SysResponse, bool]: 199 """Translate an Exception to a SysResponse. 200 201 Also returns whether the error should be logged if this happened 202 within handle_raw_message(). 203 """ 204 205 # If anything goes wrong, return a ErrorSysResponse instead. 206 # (either CLEAN or generic REMOTE) 207 if self.forward_clean_errors and isinstance(exc, CleanError): 208 return ( 209 ErrorSysResponse( 210 error_message=str(exc), 211 error_type=ErrorSysResponse.ErrorType.REMOTE_CLEAN, 212 ), 213 False, 214 ) 215 if self.forward_communication_errors and isinstance( 216 exc, CommunicationError 217 ): 218 return ( 219 ErrorSysResponse( 220 error_message=str(exc), 221 error_type=ErrorSysResponse.ErrorType.REMOTE_COMMUNICATION, 222 ), 223 False, 224 ) 225 return ( 226 ErrorSysResponse( 227 error_message=( 228 # Note: need to format exception ourself here; it 229 # might not be current so we can't use 230 # traceback.format_exc(). 231 ''.join( 232 traceback.format_exception( 233 type(exc), exc, exc.__traceback__ 234 ) 235 ) 236 if self.remote_errors_include_stack_traces 237 else 'An internal error has occurred.' 238 ), 239 error_type=ErrorSysResponse.ErrorType.REMOTE, 240 ), 241 self.log_errors_on_receiver, 242 )
Translate an Exception to a SysResponse.
Also returns whether the error should be logged if this happened within handle_raw_message().
258 @staticmethod 259 def decode_dict(data: str) -> dict: 260 """Decode data to a dict.""" 261 out = json.loads(data) 262 assert isinstance(out, dict) 263 return out
Decode data to a dict.
265 def message_from_dict(self, data: dict) -> Message: 266 """Decode a message from a dict.""" 267 out = self._from_dict(data, self.message_types_by_id, 'message') 268 assert isinstance(out, Message) 269 return out
Decode a message from a dict.
271 def response_from_dict(self, data: dict) -> Response | SysResponse: 272 """Decode a response from a json string.""" 273 out = self._from_dict(data, self.response_types_by_id, 'response') 274 assert isinstance(out, Response | SysResponse) 275 return out
Decode a response from a json string.
419 def do_create_sender_module( 420 self, 421 basename: str, 422 protocol_create_code: str, 423 enable_sync_sends: bool, 424 enable_async_sends: bool, 425 private: bool = False, 426 protocol_module_level_import_code: str | None = None, 427 ) -> str: 428 """Used by create_sender_module(); do not call directly.""" 429 # pylint: disable=too-many-positional-arguments 430 # pylint: disable=too-many-locals 431 # pylint: disable=too-many-branches 432 import textwrap 433 434 msgtypes = list(self.message_ids_by_type.keys()) 435 436 ppre = '_' if private else '' 437 out = self._get_module_header( 438 'sender', 439 extra_import_code=protocol_module_level_import_code, 440 enable_async_sends=enable_async_sends, 441 ) 442 ccind = textwrap.indent(protocol_create_code, ' ') 443 out += ( 444 f'class {ppre}{basename}(MessageSender):\n' 445 f' """Protocol-specific sender."""\n' 446 f'\n' 447 f' def __init__(self) -> None:\n' 448 f'{ccind}\n' 449 f' super().__init__(protocol)\n' 450 f'\n' 451 f' def __get__(\n' 452 f' self, obj: Any, type_in: Any = None\n' 453 f' ) -> {ppre}Bound{basename}:\n' 454 f' return {ppre}Bound{basename}(obj, self)\n' 455 f'\n' 456 f'\n' 457 f'class {ppre}Bound{basename}(BoundMessageSender):\n' 458 f' """Protocol-specific bound sender."""\n' 459 ) 460 461 def _filt_tp_name(rtype: type[Response] | None) -> str: 462 return 'None' if rtype is None else rtype.__name__ 463 464 # Define handler() overloads for all registered message types. 465 if msgtypes: 466 for async_pass in False, True: 467 if async_pass and not enable_async_sends: 468 continue 469 if not async_pass and not enable_sync_sends: 470 continue 471 pfx = 'async ' if async_pass else '' 472 sfx = '_async' if async_pass else '' 473 # awt = 'await ' if async_pass else '' 474 awt = '' 475 how = 'asynchronously' if async_pass else 'synchronously' 476 477 if len(msgtypes) == 1: 478 # Special case: with a single message types we don't 479 # use overloads. 480 msgtype = msgtypes[0] 481 msgtypevar = msgtype.__name__ 482 rtypes = msgtype.get_response_types() 483 if len(rtypes) > 1: 484 rtypevar = ' | '.join(_filt_tp_name(t) for t in rtypes) 485 else: 486 rtypevar = _filt_tp_name(rtypes[0]) 487 if async_pass: 488 rtypevar = f'Awaitable[{rtypevar}]' 489 out += ( 490 f'\n' 491 f' def send{sfx}(self,' 492 f' message: {msgtypevar})' 493 f' -> {rtypevar}:\n' 494 f' """Send a message {how}."""\n' 495 f' out = {awt}self._sender.' 496 f'send{sfx}(self._obj, message)\n' 497 ) 498 if not async_pass: 499 out += ( 500 f' assert isinstance(out, {rtypevar})\n' 501 ' return out\n' 502 ) 503 else: 504 out += f' return cast({rtypevar}, out)\n' 505 506 else: 507 for msgtype in msgtypes: 508 msgtypevar = msgtype.__name__ 509 rtypes = msgtype.get_response_types() 510 if len(rtypes) > 1: 511 rtypevar = ' | '.join( 512 _filt_tp_name(t) for t in rtypes 513 ) 514 else: 515 rtypevar = _filt_tp_name(rtypes[0]) 516 out += ( 517 f'\n' 518 f' @overload\n' 519 f' {pfx}def send{sfx}(self,' 520 f' message: {msgtypevar})' 521 f' -> {rtypevar}: ...\n' 522 ) 523 rtypevar = 'Response | None' 524 if async_pass: 525 rtypevar = f'Awaitable[{rtypevar}]' 526 out += ( 527 f'\n' 528 f' def send{sfx}(self, message: Message)' 529 f' -> {rtypevar}:\n' 530 f' """Send a message {how}."""\n' 531 f' return {awt}self._sender.' 532 f'send{sfx}(self._obj, message)\n' 533 ) 534 535 return out
Used by create_sender_module(); do not call directly.
537 def do_create_receiver_module( 538 self, 539 basename: str, 540 protocol_create_code: str, 541 is_async: bool, 542 private: bool = False, 543 protocol_module_level_import_code: str | None = None, 544 ) -> str: 545 """Used by create_receiver_module(); do not call directly.""" 546 # pylint: disable=too-many-locals 547 # pylint: disable=too-many-positional-arguments 548 import textwrap 549 550 desc = 'asynchronous' if is_async else 'synchronous' 551 ppre = '_' if private else '' 552 msgtypes = list(self.message_ids_by_type.keys()) 553 out = self._get_module_header( 554 'receiver', 555 extra_import_code=protocol_module_level_import_code, 556 enable_async_sends=False, 557 ) 558 ccind = textwrap.indent(protocol_create_code, ' ') 559 out += ( 560 f'class {ppre}{basename}(MessageReceiver):\n' 561 f' """Protocol-specific {desc} receiver."""\n' 562 f'\n' 563 f' is_async = {is_async}\n' 564 f'\n' 565 f' def __init__(self) -> None:\n' 566 f'{ccind}\n' 567 f' super().__init__(protocol)\n' 568 f'\n' 569 f' def __get__(\n' 570 f' self,\n' 571 f' obj: Any,\n' 572 f' type_in: Any = None,\n' 573 f' ) -> {ppre}Bound{basename}:\n' 574 f' return {ppre}Bound{basename}(' 575 f'obj, self)\n' 576 ) 577 578 # Define handler() overloads for all registered message types. 579 580 def _filt_tp_name(rtype: type[Response] | None) -> str: 581 return 'None' if rtype is None else rtype.__name__ 582 583 if msgtypes: 584 cbgn = 'Awaitable[' if is_async else '' 585 cend = ']' if is_async else '' 586 if len(msgtypes) == 1: 587 # Special case: when we have a single message type we don't 588 # use overloads. 589 msgtype = msgtypes[0] 590 msgtypevar = msgtype.__name__ 591 rtypes = msgtype.get_response_types() 592 if len(rtypes) > 1: 593 rtypevar = ' | '.join(_filt_tp_name(t) for t in rtypes) 594 else: 595 rtypevar = _filt_tp_name(rtypes[0]) 596 rtypevar = f'{cbgn}{rtypevar}{cend}' 597 out += ( 598 f'\n' 599 f' def handler(\n' 600 f' self,\n' 601 f' call: Callable[[Any, {msgtypevar}], ' 602 f'{rtypevar}],\n' 603 f' )' 604 f' -> Callable[[Any, {msgtypevar}], {rtypevar}]:\n' 605 f' """Decorator to register message handlers."""\n' 606 f' from typing import cast, Callable, Any\n' 607 f'\n' 608 f' self.register_handler(cast(Callable' 609 f'[[Any, Message], Response], call))\n' 610 f' return call\n' 611 ) 612 else: 613 for msgtype in msgtypes: 614 msgtypevar = msgtype.__name__ 615 rtypes = msgtype.get_response_types() 616 if len(rtypes) > 1: 617 rtypevar = ' | '.join(_filt_tp_name(t) for t in rtypes) 618 else: 619 rtypevar = _filt_tp_name(rtypes[0]) 620 rtypevar = f'{cbgn}{rtypevar}{cend}' 621 out += ( 622 f'\n' 623 f' @overload\n' 624 f' def handler(\n' 625 f' self,\n' 626 f' call: Callable[[Any, {msgtypevar}], ' 627 f'{rtypevar}],\n' 628 f' )' 629 f' -> Callable[[Any, {msgtypevar}], {rtypevar}]: ...\n' 630 ) 631 out += ( 632 '\n' 633 ' def handler(self, call: Callable) -> Callable:\n' 634 ' """Decorator to register message handlers."""\n' 635 ' self.register_handler(call)\n' 636 ' return call\n' 637 ) 638 639 out += ( 640 f'\n' 641 f'\n' 642 f'class {ppre}Bound{basename}(BoundMessageReceiver):\n' 643 f' """Protocol-specific bound receiver."""\n' 644 ) 645 if is_async: 646 out += ( 647 '\n' 648 ' def handle_raw_message(\n' 649 ' self, message: str, raise_unregistered: bool = False\n' 650 ' ) -> Awaitable[str]:\n' 651 ' """Asynchronously handle a raw incoming message."""\n' 652 ' return self._receiver.' 653 'handle_raw_message_async(\n' 654 ' self._obj, message, raise_unregistered\n' 655 ' )\n' 656 ) 657 658 else: 659 out += ( 660 '\n' 661 ' def handle_raw_message(\n' 662 ' self, message: str, raise_unregistered: bool = False\n' 663 ' ) -> str:\n' 664 ' """Synchronously handle a raw incoming message."""\n' 665 ' return self._receiver.handle_raw_message(\n' 666 ' self._obj, message, raise_unregistered\n' 667 ' )\n' 668 ) 669 670 return out
Used by create_receiver_module(); do not call directly.
23class MessageSender: 24 """Facilitates sending messages to a target and receiving responses. 25 26 These are instantiated at the class level and used to register unbound 27 class methods to handle raw message sending. Generally this class is not 28 used directly, but instead autogenerated subclasses which provide type 29 safe overloads are used instead. 30 31 Example: 32 (In this example, MyMessageSender is an autogenerated class that 33 inherits from MessageSender). 34 35 class MyClass: 36 msg = MyMessageSender() 37 38 @msg.send_method 39 def send_raw_message(self, message: str) -> str: 40 # Actually send the message here. 41 42 obj = MyClass() 43 44 # The MyMessageSender generated class would provides overloads for 45 # send(), send_async(), etc. to provide type-safety for message types 46 # and their associated response types. 47 # Thus, given the statement below, a type-checker would know that 48 # 'response' is a SomeResponseType or whatever is associated with 49 # SomeMessageType. 50 response = obj.msg.send(SomeMessageType()) 51 52 """ 53 54 def __init__(self, protocol: MessageProtocol) -> None: 55 self.protocol = protocol 56 self._send_raw_message_call: Callable[[Any, str], str] | None = None 57 self._send_raw_message_ex_call: ( 58 Callable[[Any, str, Message], str] | None 59 ) = None 60 self._send_async_raw_message_call: ( 61 Callable[[Any, str], Awaitable[str]] | None 62 ) = None 63 self._send_async_raw_message_ex_call: ( 64 Callable[[Any, str, Message], Awaitable[str]] | None 65 ) = None 66 self._encode_filter_call: ( 67 Callable[[Any, Message, dict], None] | None 68 ) = None 69 self._decode_filter_call: ( 70 Callable[[Any, Message, dict, Response | SysResponse], None] | None 71 ) = None 72 self._peer_desc_call: Callable[[Any], str] | None = None 73 74 def send_method( 75 self, call: Callable[[Any, str], str] 76 ) -> Callable[[Any, str], str]: 77 """Function decorator for setting raw send method. 78 79 Send methods take strings and should return strings. 80 CommunicationErrors raised here will be returned to the sender 81 as such; all other exceptions will result in a RuntimeError for 82 the sender. 83 """ 84 assert self._send_raw_message_call is None 85 self._send_raw_message_call = call 86 return call 87 88 def send_ex_method( 89 self, call: Callable[[Any, str, Message], str] 90 ) -> Callable[[Any, str, Message], str]: 91 """Function decorator for extended send method. 92 93 Version of send_method which is also is passed the original 94 unencoded message; can be useful for cases where metadata is sent 95 along with messages referring to their payloads/etc. 96 """ 97 assert self._send_raw_message_ex_call is None 98 self._send_raw_message_ex_call = call 99 return call 100 101 def send_async_method( 102 self, call: Callable[[Any, str], Awaitable[str]] 103 ) -> Callable[[Any, str], Awaitable[str]]: 104 """Function decorator for setting raw send-async method. 105 106 Send methods take strings and should return strings. 107 CommunicationErrors raised here will be returned to the sender 108 as such; all other exceptions will result in a RuntimeError for 109 the sender. 110 111 IMPORTANT: Generally async send methods should not be implemented 112 as 'async' methods, but instead should be regular methods that 113 return awaitable objects. This way it can be guaranteed that 114 outgoing messages are synchronously enqueued in the correct 115 order, and then async calls can be returned which finish each 116 send. If the entire call is async, they may be enqueued out of 117 order in rare cases. 118 """ 119 assert self._send_async_raw_message_call is None 120 self._send_async_raw_message_call = call 121 return call 122 123 def send_async_ex_method( 124 self, call: Callable[[Any, str, Message], Awaitable[str]] 125 ) -> Callable[[Any, str, Message], Awaitable[str]]: 126 """Function decorator for extended send-async method. 127 128 Version of send_async_method which is also is passed the original 129 unencoded message; can be useful for cases where metadata is sent 130 along with messages referring to their payloads/etc. 131 """ 132 assert self._send_async_raw_message_ex_call is None 133 self._send_async_raw_message_ex_call = call 134 return call 135 136 def encode_filter_method( 137 self, call: Callable[[Any, Message, dict], None] 138 ) -> Callable[[Any, Message, dict], None]: 139 """Function decorator for defining an encode filter. 140 141 Encode filters can be used to add extra data to the message 142 dict before is is encoded to a string and sent out. 143 """ 144 assert self._encode_filter_call is None 145 self._encode_filter_call = call 146 return call 147 148 def decode_filter_method( 149 self, call: Callable[[Any, Message, dict, Response | SysResponse], None] 150 ) -> Callable[[Any, Message, dict, Response], None]: 151 """Function decorator for defining a decode filter. 152 153 Decode filters can be used to extract extra data from incoming 154 message dicts. 155 """ 156 assert self._decode_filter_call is None 157 self._decode_filter_call = call 158 return call 159 160 def peer_desc_method( 161 self, call: Callable[[Any], str] 162 ) -> Callable[[Any], str]: 163 """Function decorator for defining peer descriptions. 164 165 These are included in error messages or other diagnostics. 166 """ 167 assert self._peer_desc_call is None 168 self._peer_desc_call = call 169 return call 170 171 def send(self, bound_obj: Any, message: Message) -> Response | None: 172 """Send a message synchronously.""" 173 return self.unpack_raw_response( 174 bound_obj=bound_obj, 175 message=message, 176 raw_response=self.fetch_raw_response( 177 bound_obj=bound_obj, 178 message=message, 179 ), 180 ) 181 182 def send_async( 183 self, bound_obj: Any, message: Message 184 ) -> Awaitable[Response | None]: 185 """Send a message asynchronously.""" 186 187 # Note: This call is synchronous so that the first part of it can 188 # happen synchronously. If the whole call were async we wouldn't be 189 # able to guarantee that messages sent in order would actually go 190 # out in order. 191 raw_response_awaitable = self.fetch_raw_response_async( 192 bound_obj=bound_obj, 193 message=message, 194 ) 195 # Now return an awaitable that will finish the send. 196 return self._send_async_awaitable( 197 bound_obj, message, raw_response_awaitable 198 ) 199 200 async def _send_async_awaitable( 201 self, 202 bound_obj: Any, 203 message: Message, 204 raw_response_awaitable: Awaitable[Response | SysResponse], 205 ) -> Response | None: 206 return self.unpack_raw_response( 207 bound_obj=bound_obj, 208 message=message, 209 raw_response=await raw_response_awaitable, 210 ) 211 212 def fetch_raw_response( 213 self, bound_obj: Any, message: Message 214 ) -> Response | SysResponse: 215 """Send a message synchronously. 216 217 Generally you can just call send(); these split versions are 218 for when message sending and response handling need to happen 219 in different contexts/threads. 220 """ 221 if ( 222 self._send_raw_message_call is None 223 and self._send_raw_message_ex_call is None 224 ): 225 raise RuntimeError('send() is unimplemented for this type.') 226 227 msg_encoded = self._encode_message(bound_obj, message) 228 try: 229 if self._send_raw_message_ex_call is not None: 230 response_encoded = self._send_raw_message_ex_call( 231 bound_obj, msg_encoded, message 232 ) 233 else: 234 assert self._send_raw_message_call is not None 235 response_encoded = self._send_raw_message_call( 236 bound_obj, msg_encoded 237 ) 238 except Exception as exc: 239 response = ErrorSysResponse( 240 error_message='Error in MessageSender @send_method.', 241 error_type=( 242 ErrorSysResponse.ErrorType.COMMUNICATION 243 if isinstance(exc, CommunicationError) 244 else ErrorSysResponse.ErrorType.LOCAL 245 ), 246 ) 247 # Can include the actual exception since we'll be looking at 248 # this locally; might be helpful. 249 response.set_local_exception(exc) 250 return response 251 return self._decode_raw_response(bound_obj, message, response_encoded) 252 253 def fetch_raw_response_async( 254 self, bound_obj: Any, message: Message 255 ) -> Awaitable[Response | SysResponse]: 256 """Fetch a raw message response awaitable. 257 258 The result of this should be awaited and then passed to 259 unpack_raw_response() to produce the final message result. 260 261 Generally you can just call send(); calling fetch and unpack 262 manually is for when message sending and response handling need 263 to happen in different contexts/threads. 264 """ 265 266 # Note: This call is synchronous so that the first part of it can 267 # happen synchronously. If the whole call were async we wouldn't be 268 # able to guarantee that messages sent in order would actually go 269 # out in order. 270 if ( 271 self._send_async_raw_message_call is None 272 and self._send_async_raw_message_ex_call is None 273 ): 274 raise RuntimeError('send_async() is unimplemented for this type.') 275 276 msg_encoded = self._encode_message(bound_obj, message) 277 try: 278 if self._send_async_raw_message_ex_call is not None: 279 send_awaitable = self._send_async_raw_message_ex_call( 280 bound_obj, msg_encoded, message 281 ) 282 else: 283 assert self._send_async_raw_message_call is not None 284 send_awaitable = self._send_async_raw_message_call( 285 bound_obj, msg_encoded 286 ) 287 except Exception as exc: 288 return self._error_awaitable(exc) 289 290 # Now return an awaitable to finish the job. 291 return self._fetch_raw_response_awaitable( 292 bound_obj, message, send_awaitable 293 ) 294 295 async def _error_awaitable(self, exc: Exception) -> SysResponse: 296 response = ErrorSysResponse( 297 error_message='Error in MessageSender @send_async_method.', 298 error_type=( 299 ErrorSysResponse.ErrorType.COMMUNICATION 300 if isinstance(exc, CommunicationError) 301 else ErrorSysResponse.ErrorType.LOCAL 302 ), 303 ) 304 # Can include the actual exception since we'll be looking at 305 # this locally; might be helpful. 306 response.set_local_exception(exc) 307 return response 308 309 async def _fetch_raw_response_awaitable( 310 self, bound_obj: Any, message: Message, send_awaitable: Awaitable[str] 311 ) -> Response | SysResponse: 312 try: 313 response_encoded = await send_awaitable 314 except Exception as exc: 315 response = ErrorSysResponse( 316 error_message='Error in MessageSender @send_async_method.', 317 error_type=( 318 ErrorSysResponse.ErrorType.COMMUNICATION 319 if isinstance(exc, CommunicationError) 320 else ErrorSysResponse.ErrorType.LOCAL 321 ), 322 ) 323 # Can include the actual exception since we'll be looking at 324 # this locally; might be helpful. 325 response.set_local_exception(exc) 326 return response 327 return self._decode_raw_response(bound_obj, message, response_encoded) 328 329 def unpack_raw_response( 330 self, 331 bound_obj: Any, 332 message: Message, 333 raw_response: Response | SysResponse, 334 ) -> Response | None: 335 """Convert a raw fetched response into a final response/error/etc. 336 337 Generally you can just call send(); calling fetch and unpack 338 manually is for when message sending and response handling need 339 to happen in different contexts/threads. 340 """ 341 response = self._unpack_raw_response(bound_obj, raw_response) 342 assert ( 343 response is None 344 or type(response) in type(message).get_response_types() 345 ) 346 return response 347 348 def _encode_message(self, bound_obj: Any, message: Message) -> str: 349 """Encode a message for sending.""" 350 msg_dict = self.protocol.message_to_dict(message) 351 if self._encode_filter_call is not None: 352 self._encode_filter_call(bound_obj, message, msg_dict) 353 return self.protocol.encode_dict(msg_dict) 354 355 def _decode_raw_response( 356 self, bound_obj: Any, message: Message, response_encoded: str 357 ) -> Response | SysResponse: 358 """Create a Response from returned data. 359 360 These Responses may encapsulate things like remote errors and 361 should not be handed directly to users. _unpack_raw_response() 362 should be used to translate to special values like None or raise 363 Exceptions. This function itself should never raise Exceptions. 364 """ 365 response: Response | SysResponse 366 try: 367 response_dict = self.protocol.decode_dict(response_encoded) 368 response = self.protocol.response_from_dict(response_dict) 369 if self._decode_filter_call is not None: 370 self._decode_filter_call( 371 bound_obj, message, response_dict, response 372 ) 373 except Exception as exc: 374 375 # We pragmatically log by default if decoding fails. This 376 # means a message type was likely changed in a way that 377 # breaks the protocol, but individual message handlers are 378 # likely to lump all errors together (communication and 379 # otherwise) which could cause such breakage to go 380 # unnoticed. 381 if self.protocol.log_response_decode_errors: 382 logging.exception( 383 'Error decoding message response;' 384 ' protocol might be broken.', 385 ) 386 387 response = ErrorSysResponse( 388 error_message='Error decoding raw response.', 389 error_type=ErrorSysResponse.ErrorType.LOCAL, 390 ) 391 # Since we'll be looking at this locally, we can include 392 # extra info for logging/etc. 393 response.set_local_exception(exc) 394 return response 395 396 def _unpack_raw_response( 397 self, bound_obj: Any, raw_response: Response | SysResponse 398 ) -> Response | None: 399 """Given a raw Response, unpacks to special values or Exceptions. 400 401 The result of this call is what should be passed to users. 402 For complex messaging situations such as response callbacks 403 operating across different threads, this last stage should be 404 run such that any raised Exception is active when the callback 405 fires; not on the thread where the message was sent. 406 """ 407 # EmptySysResponse translates to None 408 if isinstance(raw_response, EmptySysResponse): 409 return None 410 411 # Some error occurred. Raise a local Exception for it. 412 if isinstance(raw_response, ErrorSysResponse): 413 # Errors that happened locally can attach their exceptions 414 # here for extra logging goodness. 415 local_exception = raw_response.get_local_exception() 416 417 if ( 418 raw_response.error_type 419 is ErrorSysResponse.ErrorType.COMMUNICATION 420 ): 421 raise CommunicationError( 422 raw_response.error_message 423 ) from local_exception 424 425 # If something went wrong on *our* end of the connection, 426 # don't say it was a remote error. 427 if raw_response.error_type is ErrorSysResponse.ErrorType.LOCAL: 428 raise RuntimeError( 429 raw_response.error_message 430 ) from local_exception 431 432 # If they want to support clean errors, do those. 433 if ( 434 self.protocol.forward_clean_errors 435 and raw_response.error_type 436 is ErrorSysResponse.ErrorType.REMOTE_CLEAN 437 ): 438 raise CleanError( 439 raw_response.error_message 440 ) from local_exception 441 442 if ( 443 self.protocol.forward_communication_errors 444 and raw_response.error_type 445 is ErrorSysResponse.ErrorType.REMOTE_COMMUNICATION 446 ): 447 raise CommunicationError( 448 raw_response.error_message 449 ) from local_exception 450 451 # Everything else gets lumped in as a remote error. 452 raise RemoteError( 453 raw_response.error_message, 454 peer_desc=( 455 'peer' 456 if self._peer_desc_call is None 457 else self._peer_desc_call(bound_obj) 458 ), 459 ) from local_exception 460 461 assert isinstance(raw_response, Response) 462 return raw_response
Facilitates sending messages to a target and receiving responses.
These are instantiated at the class level and used to register unbound class methods to handle raw message sending. Generally this class is not used directly, but instead autogenerated subclasses which provide type safe overloads are used instead.
Example: (In this example, MyMessageSender is an autogenerated class that inherits from MessageSender).
class MyClass: msg = MyMessageSender()
@msg.send_method
def send_raw_message(self, message: str) -> str:
# Actually send the message here.
obj = MyClass()
The MyMessageSender generated class would provides overloads for
send(), send_async(), etc. to provide type-safety for message types
and their associated response types.
Thus, given the statement below, a type-checker would know that
'response' is a SomeResponseType or whatever is associated with
SomeMessageType.
response = obj.msg.send(SomeMessageType())
54 def __init__(self, protocol: MessageProtocol) -> None: 55 self.protocol = protocol 56 self._send_raw_message_call: Callable[[Any, str], str] | None = None 57 self._send_raw_message_ex_call: ( 58 Callable[[Any, str, Message], str] | None 59 ) = None 60 self._send_async_raw_message_call: ( 61 Callable[[Any, str], Awaitable[str]] | None 62 ) = None 63 self._send_async_raw_message_ex_call: ( 64 Callable[[Any, str, Message], Awaitable[str]] | None 65 ) = None 66 self._encode_filter_call: ( 67 Callable[[Any, Message, dict], None] | None 68 ) = None 69 self._decode_filter_call: ( 70 Callable[[Any, Message, dict, Response | SysResponse], None] | None 71 ) = None 72 self._peer_desc_call: Callable[[Any], str] | None = None
74 def send_method( 75 self, call: Callable[[Any, str], str] 76 ) -> Callable[[Any, str], str]: 77 """Function decorator for setting raw send method. 78 79 Send methods take strings and should return strings. 80 CommunicationErrors raised here will be returned to the sender 81 as such; all other exceptions will result in a RuntimeError for 82 the sender. 83 """ 84 assert self._send_raw_message_call is None 85 self._send_raw_message_call = call 86 return call
Function decorator for setting raw send method.
Send methods take strings and should return strings. CommunicationErrors raised here will be returned to the sender as such; all other exceptions will result in a RuntimeError for the sender.
88 def send_ex_method( 89 self, call: Callable[[Any, str, Message], str] 90 ) -> Callable[[Any, str, Message], str]: 91 """Function decorator for extended send method. 92 93 Version of send_method which is also is passed the original 94 unencoded message; can be useful for cases where metadata is sent 95 along with messages referring to their payloads/etc. 96 """ 97 assert self._send_raw_message_ex_call is None 98 self._send_raw_message_ex_call = call 99 return call
Function decorator for extended send method.
Version of send_method which is also is passed the original unencoded message; can be useful for cases where metadata is sent along with messages referring to their payloads/etc.
101 def send_async_method( 102 self, call: Callable[[Any, str], Awaitable[str]] 103 ) -> Callable[[Any, str], Awaitable[str]]: 104 """Function decorator for setting raw send-async method. 105 106 Send methods take strings and should return strings. 107 CommunicationErrors raised here will be returned to the sender 108 as such; all other exceptions will result in a RuntimeError for 109 the sender. 110 111 IMPORTANT: Generally async send methods should not be implemented 112 as 'async' methods, but instead should be regular methods that 113 return awaitable objects. This way it can be guaranteed that 114 outgoing messages are synchronously enqueued in the correct 115 order, and then async calls can be returned which finish each 116 send. If the entire call is async, they may be enqueued out of 117 order in rare cases. 118 """ 119 assert self._send_async_raw_message_call is None 120 self._send_async_raw_message_call = call 121 return call
Function decorator for setting raw send-async method.
Send methods take strings and should return strings. CommunicationErrors raised here will be returned to the sender as such; all other exceptions will result in a RuntimeError for the sender.
IMPORTANT: Generally async send methods should not be implemented as 'async' methods, but instead should be regular methods that return awaitable objects. This way it can be guaranteed that outgoing messages are synchronously enqueued in the correct order, and then async calls can be returned which finish each send. If the entire call is async, they may be enqueued out of order in rare cases.
123 def send_async_ex_method( 124 self, call: Callable[[Any, str, Message], Awaitable[str]] 125 ) -> Callable[[Any, str, Message], Awaitable[str]]: 126 """Function decorator for extended send-async method. 127 128 Version of send_async_method which is also is passed the original 129 unencoded message; can be useful for cases where metadata is sent 130 along with messages referring to their payloads/etc. 131 """ 132 assert self._send_async_raw_message_ex_call is None 133 self._send_async_raw_message_ex_call = call 134 return call
Function decorator for extended send-async method.
Version of send_async_method which is also is passed the original unencoded message; can be useful for cases where metadata is sent along with messages referring to their payloads/etc.
136 def encode_filter_method( 137 self, call: Callable[[Any, Message, dict], None] 138 ) -> Callable[[Any, Message, dict], None]: 139 """Function decorator for defining an encode filter. 140 141 Encode filters can be used to add extra data to the message 142 dict before is is encoded to a string and sent out. 143 """ 144 assert self._encode_filter_call is None 145 self._encode_filter_call = call 146 return call
Function decorator for defining an encode filter.
Encode filters can be used to add extra data to the message dict before is is encoded to a string and sent out.
148 def decode_filter_method( 149 self, call: Callable[[Any, Message, dict, Response | SysResponse], None] 150 ) -> Callable[[Any, Message, dict, Response], None]: 151 """Function decorator for defining a decode filter. 152 153 Decode filters can be used to extract extra data from incoming 154 message dicts. 155 """ 156 assert self._decode_filter_call is None 157 self._decode_filter_call = call 158 return call
Function decorator for defining a decode filter.
Decode filters can be used to extract extra data from incoming message dicts.
160 def peer_desc_method( 161 self, call: Callable[[Any], str] 162 ) -> Callable[[Any], str]: 163 """Function decorator for defining peer descriptions. 164 165 These are included in error messages or other diagnostics. 166 """ 167 assert self._peer_desc_call is None 168 self._peer_desc_call = call 169 return call
Function decorator for defining peer descriptions.
These are included in error messages or other diagnostics.
171 def send(self, bound_obj: Any, message: Message) -> Response | None: 172 """Send a message synchronously.""" 173 return self.unpack_raw_response( 174 bound_obj=bound_obj, 175 message=message, 176 raw_response=self.fetch_raw_response( 177 bound_obj=bound_obj, 178 message=message, 179 ), 180 )
Send a message synchronously.
182 def send_async( 183 self, bound_obj: Any, message: Message 184 ) -> Awaitable[Response | None]: 185 """Send a message asynchronously.""" 186 187 # Note: This call is synchronous so that the first part of it can 188 # happen synchronously. If the whole call were async we wouldn't be 189 # able to guarantee that messages sent in order would actually go 190 # out in order. 191 raw_response_awaitable = self.fetch_raw_response_async( 192 bound_obj=bound_obj, 193 message=message, 194 ) 195 # Now return an awaitable that will finish the send. 196 return self._send_async_awaitable( 197 bound_obj, message, raw_response_awaitable 198 )
Send a message asynchronously.
212 def fetch_raw_response( 213 self, bound_obj: Any, message: Message 214 ) -> Response | SysResponse: 215 """Send a message synchronously. 216 217 Generally you can just call send(); these split versions are 218 for when message sending and response handling need to happen 219 in different contexts/threads. 220 """ 221 if ( 222 self._send_raw_message_call is None 223 and self._send_raw_message_ex_call is None 224 ): 225 raise RuntimeError('send() is unimplemented for this type.') 226 227 msg_encoded = self._encode_message(bound_obj, message) 228 try: 229 if self._send_raw_message_ex_call is not None: 230 response_encoded = self._send_raw_message_ex_call( 231 bound_obj, msg_encoded, message 232 ) 233 else: 234 assert self._send_raw_message_call is not None 235 response_encoded = self._send_raw_message_call( 236 bound_obj, msg_encoded 237 ) 238 except Exception as exc: 239 response = ErrorSysResponse( 240 error_message='Error in MessageSender @send_method.', 241 error_type=( 242 ErrorSysResponse.ErrorType.COMMUNICATION 243 if isinstance(exc, CommunicationError) 244 else ErrorSysResponse.ErrorType.LOCAL 245 ), 246 ) 247 # Can include the actual exception since we'll be looking at 248 # this locally; might be helpful. 249 response.set_local_exception(exc) 250 return response 251 return self._decode_raw_response(bound_obj, message, response_encoded)
Send a message synchronously.
Generally you can just call send(); these split versions are for when message sending and response handling need to happen in different contexts/threads.
253 def fetch_raw_response_async( 254 self, bound_obj: Any, message: Message 255 ) -> Awaitable[Response | SysResponse]: 256 """Fetch a raw message response awaitable. 257 258 The result of this should be awaited and then passed to 259 unpack_raw_response() to produce the final message result. 260 261 Generally you can just call send(); calling fetch and unpack 262 manually is for when message sending and response handling need 263 to happen in different contexts/threads. 264 """ 265 266 # Note: This call is synchronous so that the first part of it can 267 # happen synchronously. If the whole call were async we wouldn't be 268 # able to guarantee that messages sent in order would actually go 269 # out in order. 270 if ( 271 self._send_async_raw_message_call is None 272 and self._send_async_raw_message_ex_call is None 273 ): 274 raise RuntimeError('send_async() is unimplemented for this type.') 275 276 msg_encoded = self._encode_message(bound_obj, message) 277 try: 278 if self._send_async_raw_message_ex_call is not None: 279 send_awaitable = self._send_async_raw_message_ex_call( 280 bound_obj, msg_encoded, message 281 ) 282 else: 283 assert self._send_async_raw_message_call is not None 284 send_awaitable = self._send_async_raw_message_call( 285 bound_obj, msg_encoded 286 ) 287 except Exception as exc: 288 return self._error_awaitable(exc) 289 290 # Now return an awaitable to finish the job. 291 return self._fetch_raw_response_awaitable( 292 bound_obj, message, send_awaitable 293 )
Fetch a raw message response awaitable.
The result of this should be awaited and then passed to unpack_raw_response() to produce the final message result.
Generally you can just call send(); calling fetch and unpack manually is for when message sending and response handling need to happen in different contexts/threads.
329 def unpack_raw_response( 330 self, 331 bound_obj: Any, 332 message: Message, 333 raw_response: Response | SysResponse, 334 ) -> Response | None: 335 """Convert a raw fetched response into a final response/error/etc. 336 337 Generally you can just call send(); calling fetch and unpack 338 manually is for when message sending and response handling need 339 to happen in different contexts/threads. 340 """ 341 response = self._unpack_raw_response(bound_obj, raw_response) 342 assert ( 343 response is None 344 or type(response) in type(message).get_response_types() 345 ) 346 return response
Convert a raw fetched response into a final response/error/etc.
Generally you can just call send(); calling fetch and unpack manually is for when message sending and response handling need to happen in different contexts/threads.
465class BoundMessageSender: 466 """Base class for bound senders.""" 467 468 def __init__(self, obj: Any, sender: MessageSender) -> None: 469 # Note: not checking obj here since we want to support 470 # at least our protocol property when accessed via type. 471 self._obj = obj 472 self._sender = sender 473 474 @property 475 def protocol(self) -> MessageProtocol: 476 """Protocol associated with this sender.""" 477 return self._sender.protocol 478 479 def send_untyped(self, message: Message) -> Response | None: 480 """Send a message synchronously. 481 482 Whenever possible, use the send() call provided by generated 483 subclasses instead of this; it will provide better type safety. 484 """ 485 assert self._obj is not None 486 return self._sender.send(bound_obj=self._obj, message=message) 487 488 def send_async_untyped( 489 self, message: Message 490 ) -> Awaitable[Response | None]: 491 """Send a message asynchronously. 492 493 Whenever possible, use the send_async() call provided by generated 494 subclasses instead of this; it will provide better type safety. 495 """ 496 assert self._obj is not None 497 return self._sender.send_async(bound_obj=self._obj, message=message) 498 499 def fetch_raw_response_async_untyped( 500 self, message: Message 501 ) -> Awaitable[Response | SysResponse]: 502 """Split send (part 1 of 2).""" 503 assert self._obj is not None 504 return self._sender.fetch_raw_response_async( 505 bound_obj=self._obj, message=message 506 ) 507 508 def unpack_raw_response_untyped( 509 self, message: Message, raw_response: Response | SysResponse 510 ) -> Response | None: 511 """Split send (part 2 of 2).""" 512 return self._sender.unpack_raw_response( 513 bound_obj=self._obj, message=message, raw_response=raw_response 514 )
Base class for bound senders.
474 @property 475 def protocol(self) -> MessageProtocol: 476 """Protocol associated with this sender.""" 477 return self._sender.protocol
Protocol associated with this sender.
479 def send_untyped(self, message: Message) -> Response | None: 480 """Send a message synchronously. 481 482 Whenever possible, use the send() call provided by generated 483 subclasses instead of this; it will provide better type safety. 484 """ 485 assert self._obj is not None 486 return self._sender.send(bound_obj=self._obj, message=message)
Send a message synchronously.
Whenever possible, use the send() call provided by generated subclasses instead of this; it will provide better type safety.
488 def send_async_untyped( 489 self, message: Message 490 ) -> Awaitable[Response | None]: 491 """Send a message asynchronously. 492 493 Whenever possible, use the send_async() call provided by generated 494 subclasses instead of this; it will provide better type safety. 495 """ 496 assert self._obj is not None 497 return self._sender.send_async(bound_obj=self._obj, message=message)
Send a message asynchronously.
Whenever possible, use the send_async() call provided by generated subclasses instead of this; it will provide better type safety.
499 def fetch_raw_response_async_untyped( 500 self, message: Message 501 ) -> Awaitable[Response | SysResponse]: 502 """Split send (part 1 of 2).""" 503 assert self._obj is not None 504 return self._sender.fetch_raw_response_async( 505 bound_obj=self._obj, message=message 506 )
Split send (part 1 of 2).
508 def unpack_raw_response_untyped( 509 self, message: Message, raw_response: Response | SysResponse 510 ) -> Response | None: 511 """Split send (part 2 of 2).""" 512 return self._sender.unpack_raw_response( 513 bound_obj=self._obj, message=message, raw_response=raw_response 514 )
Split send (part 2 of 2).
29class MessageReceiver: 30 """Facilitates receiving & responding to messages from a remote source. 31 32 This is instantiated at the class level with unbound methods registered 33 as handlers for different message types in the protocol. 34 35 Example: 36 37 class MyClass: 38 receiver = MyMessageReceiver() 39 40 # MyMessageReceiver fills out handler() overloads to ensure all 41 # registered handlers have valid types/return-types. 42 43 @receiver.handler 44 def handle_some_message_type(self, message: SomeMsg) -> SomeResponse: 45 # Deal with this message type here. 46 47 # This will trigger the registered handler being called. 48 obj = MyClass() 49 obj.receiver.handle_raw_message(some_raw_data) 50 51 Any unhandled Exception occurring during message handling will result in 52 an efro.error.RemoteError being raised on the sending end. 53 """ 54 55 is_async = False 56 57 def __init__(self, protocol: MessageProtocol) -> None: 58 self.protocol = protocol 59 self._handlers: dict[type[Message], Callable] = {} 60 self._decode_filter_call: ( 61 Callable[[Any, dict, Message], None] | None 62 ) = None 63 self._encode_filter_call: ( 64 Callable[[Any, Message | None, Response | SysResponse, dict], None] 65 | None 66 ) = None 67 68 # noinspection PyProtectedMember 69 def register_handler( 70 self, call: Callable[[Any, Message], Response | None] 71 ) -> None: 72 """Register a handler call. 73 74 The message type handled by the call is determined by its 75 type annotation. 76 """ 77 # TODO: can use types.GenericAlias in 3.9. 78 # (hmm though now that we're there, it seems a drop-in 79 # replace gives us errors. Should re-test in 3.11 as it seems 80 # that typing_extensions handles it differently in that case) 81 from typing import _GenericAlias # type: ignore 82 from typing import get_type_hints, get_args 83 84 sig = inspect.getfullargspec(call) 85 86 # The provided callable should be a method taking one 'msg' arg. 87 expectedsig = ['self', 'msg'] 88 if sig.args != expectedsig: 89 raise ValueError( 90 f'Expected callable signature of {expectedsig};' 91 f' got {sig.args}' 92 ) 93 94 # Check annotation types to determine what message types we handle. 95 # Return-type annotation can be a Union, but we probably don't 96 # have it available at runtime. Explicitly pull it in. 97 # UPDATE: we've updated our pylint filter to where we should 98 # have all annotations available. 99 # anns = get_type_hints(call, localns={'Union': Union}) 100 anns = get_type_hints(call) 101 102 msgtype = anns.get('msg') 103 if not isinstance(msgtype, type): 104 raise TypeError( 105 f'expected a type for "msg" annotation; got {type(msgtype)}.' 106 ) 107 assert issubclass(msgtype, Message) 108 109 ret = anns.get('return') 110 responsetypes: tuple[type[Any] | None, ...] 111 112 # Return types can be a single type or a union of types. 113 if isinstance(ret, (_GenericAlias, types.UnionType)): 114 targs = get_args(ret) 115 if not all(isinstance(a, (type, type(None))) for a in targs): 116 raise TypeError( 117 f'expected only types for "return" annotation;' 118 f' got {targs}.' 119 ) 120 responsetypes = targs 121 else: 122 if not isinstance(ret, (type, type(None))): 123 raise TypeError( 124 f'expected one or more types for' 125 f' "return" annotation; got a {type(ret)}.' 126 ) 127 # This seems like maybe a mypy bug. Appeared after adding 128 # types.UnionType above. 129 responsetypes = (ret,) 130 131 # This will contain NoneType for empty return cases, but 132 # we expect it to be None. 133 # noinspection PyPep8 134 responsetypes = tuple( 135 None if r is type(None) else r for r in responsetypes 136 ) 137 138 # Make sure our protocol has this message type registered and our 139 # return types exactly match. (Technically we could return a subset 140 # of the supported types; can allow this in the future if it makes 141 # sense). 142 registered_types = self.protocol.message_ids_by_type.keys() 143 144 if msgtype not in registered_types: 145 raise TypeError( 146 f'Message type {msgtype} is not registered' 147 f' in this Protocol.' 148 ) 149 150 if msgtype in self._handlers: 151 raise TypeError( 152 f'Message type {msgtype} already has a registered handler.' 153 ) 154 155 # Make sure the responses exactly matches what the message expects. 156 if set(responsetypes) != set(msgtype.get_response_types()): 157 raise TypeError( 158 f'Provided response types {responsetypes} do not' 159 f' match the set expected by message type {msgtype}: ' 160 f'({msgtype.get_response_types()})' 161 ) 162 163 # Ok; we're good! 164 self._handlers[msgtype] = call 165 166 def decode_filter_method( 167 self, call: Callable[[Any, dict, Message], None] 168 ) -> Callable[[Any, dict, Message], None]: 169 """Function decorator for defining a decode filter. 170 171 Decode filters can be used to extract extra data from incoming 172 message dicts. This version will work for both handle_raw_message() 173 and handle_raw_message_async() 174 """ 175 assert self._decode_filter_call is None 176 self._decode_filter_call = call 177 return call 178 179 def encode_filter_method( 180 self, 181 call: Callable[ 182 [Any, Message | None, Response | SysResponse, dict], None 183 ], 184 ) -> Callable[[Any, Message | None, Response, dict], None]: 185 """Function decorator for defining an encode filter. 186 187 Encode filters can be used to add extra data to the message 188 dict before is is encoded to a string and sent out. 189 """ 190 assert self._encode_filter_call is None 191 self._encode_filter_call = call 192 return call 193 194 def validate(self, log_only: bool = False) -> None: 195 """Check for handler completeness, valid types, etc.""" 196 for msgtype in self.protocol.message_ids_by_type.keys(): 197 if issubclass(msgtype, Response): 198 continue 199 if msgtype not in self._handlers: 200 msg = ( 201 f'Protocol message type {msgtype} is not handled' 202 f' by receiver type {type(self)}.' 203 ) 204 if log_only: 205 logging.error(msg) 206 else: 207 raise TypeError(msg) 208 209 def _decode_incoming_message_base( 210 self, bound_obj: Any, msg: str 211 ) -> tuple[Any, dict, Message]: 212 # Decode the incoming message. 213 msg_dict = self.protocol.decode_dict(msg) 214 msg_decoded = self.protocol.message_from_dict(msg_dict) 215 assert isinstance(msg_decoded, Message) 216 if self._decode_filter_call is not None: 217 self._decode_filter_call(bound_obj, msg_dict, msg_decoded) 218 return bound_obj, msg_dict, msg_decoded 219 220 def _decode_incoming_message(self, bound_obj: Any, msg: str) -> Message: 221 bound_obj, _msg_dict, msg_decoded = self._decode_incoming_message_base( 222 bound_obj=bound_obj, msg=msg 223 ) 224 return msg_decoded 225 226 def encode_user_response( 227 self, bound_obj: Any, message: Message, response: Response | None 228 ) -> str: 229 """Encode a response provided by the user for sending.""" 230 231 assert isinstance(response, Response | None) 232 # (user should never explicitly return error-responses) 233 assert ( 234 response is None or type(response) in message.get_response_types() 235 ) 236 237 # A return value of None equals EmptySysResponse. 238 out_response: Response | SysResponse 239 if response is None: 240 out_response = EmptySysResponse() 241 else: 242 out_response = response 243 244 response_dict = self.protocol.response_to_dict(out_response) 245 if self._encode_filter_call is not None: 246 self._encode_filter_call( 247 bound_obj, message, out_response, response_dict 248 ) 249 return self.protocol.encode_dict(response_dict) 250 251 def encode_error_response( 252 self, bound_obj: Any, message: Message | None, exc: Exception 253 ) -> tuple[str, bool]: 254 """Given an error, return sysresponse str and whether to log.""" 255 response, dolog = self.protocol.error_to_response(exc) 256 response_dict = self.protocol.response_to_dict(response) 257 if self._encode_filter_call is not None: 258 self._encode_filter_call( 259 bound_obj, message, response, response_dict 260 ) 261 return self.protocol.encode_dict(response_dict), dolog 262 263 def handle_raw_message( 264 self, bound_obj: Any, msg: str, raise_unregistered: bool = False 265 ) -> str: 266 """Decode, handle, and return an response for a message. 267 268 if 'raise_unregistered' is True, will raise an 269 efro.message.UnregisteredMessageIDError for messages not handled by 270 the protocol. In all other cases local errors will translate to 271 error responses returned to the sender. 272 """ 273 assert not self.is_async, "can't call sync handler on async receiver" 274 msg_decoded: Message | None = None 275 try: 276 msg_decoded = self._decode_incoming_message(bound_obj, msg) 277 msgtype = type(msg_decoded) 278 handler = self._handlers.get(msgtype) 279 if handler is None: 280 raise RuntimeError(f'Got unhandled message type: {msgtype}.') 281 response = handler(bound_obj, msg_decoded) 282 assert isinstance(response, Response | None) 283 return self.encode_user_response(bound_obj, msg_decoded, response) 284 285 except Exception as exc: 286 if raise_unregistered and isinstance( 287 exc, UnregisteredMessageIDError 288 ): 289 raise 290 rstr, dolog = self.encode_error_response( 291 bound_obj, msg_decoded, exc 292 ) 293 if dolog: 294 if msg_decoded is not None: 295 msgtype = type(msg_decoded) 296 logging.exception( 297 'Error handling %s.%s message.', 298 msgtype.__module__, 299 msgtype.__qualname__, 300 ) 301 else: 302 logging.exception( 303 'Error handling raw efro.message' 304 ' (likely a message format incompatibility): %s.', 305 msg, 306 ) 307 return rstr 308 309 def handle_raw_message_async( 310 self, bound_obj: Any, msg: str, raise_unregistered: bool = False 311 ) -> Awaitable[str]: 312 """Should be called when the receiver gets a message. 313 314 The return value is the raw response to the message. 315 """ 316 317 # Note: This call is synchronous so that the first part of it can 318 # happen synchronously. If the whole call were async we wouldn't be 319 # able to guarantee that messages handlers would be called in the 320 # order the messages were received. 321 322 assert self.is_async, "Can't call async handler on sync receiver." 323 msg_decoded: Message | None = None 324 try: 325 msg_decoded = self._decode_incoming_message(bound_obj, msg) 326 msgtype = type(msg_decoded) 327 handler = self._handlers.get(msgtype) 328 if handler is None: 329 raise RuntimeError(f'Got unhandled message type: {msgtype}.') 330 handler_awaitable = handler(bound_obj, msg_decoded) 331 332 except Exception as exc: 333 if raise_unregistered and isinstance( 334 exc, UnregisteredMessageIDError 335 ): 336 raise 337 return self._handle_raw_message_async_error( 338 bound_obj, msg, msg_decoded, exc 339 ) 340 341 # Return an awaitable to handle the rest asynchronously. 342 return self._handle_raw_message_async( 343 bound_obj, msg, msg_decoded, handler_awaitable 344 ) 345 346 async def _handle_raw_message_async_error( 347 self, 348 bound_obj: Any, 349 msg_raw: str, 350 msg_decoded: Message | None, 351 exc: Exception, 352 ) -> str: 353 rstr, dolog = self.encode_error_response(bound_obj, msg_decoded, exc) 354 if dolog: 355 if msg_decoded is not None: 356 msgtype = type(msg_decoded) 357 logging.exception( 358 'Error handling %s.%s message.', 359 msgtype.__module__, 360 msgtype.__qualname__, 361 # We need to explicitly provide the exception here, 362 # otherwise it shows up at None. I assume related to 363 # the fact that we're an async function. 364 exc_info=exc, 365 ) 366 else: 367 logging.exception( 368 'Error handling raw async efro.message' 369 ' (likely a message format incompatibility): %s.', 370 msg_raw, 371 # We need to explicitly provide the exception here, 372 # otherwise it shows up at None. I assume related to 373 # the fact that we're an async function. 374 exc_info=exc, 375 ) 376 return rstr 377 378 async def _handle_raw_message_async( 379 self, 380 bound_obj: Any, 381 msg_raw: str, 382 msg_decoded: Message, 383 handler_awaitable: Awaitable[Response | None], 384 ) -> str: 385 """Should be called when the receiver gets a message. 386 387 The return value is the raw response to the message. 388 """ 389 try: 390 response = await handler_awaitable 391 assert isinstance(response, Response | None) 392 return self.encode_user_response(bound_obj, msg_decoded, response) 393 394 except Exception as exc: 395 return await self._handle_raw_message_async_error( 396 bound_obj, msg_raw, msg_decoded, exc 397 )
Facilitates receiving & responding to messages from a remote source.
This is instantiated at the class level with unbound methods registered as handlers for different message types in the protocol.
Example:
class MyClass: receiver = MyMessageReceiver()
# MyMessageReceiver fills out handler() overloads to ensure all
# registered handlers have valid types/return-types.
@receiver.handler
def handle_some_message_type(self, message: SomeMsg) -> SomeResponse:
# Deal with this message type here.
This will trigger the registered handler being called.
obj = MyClass() obj.receiver.handle_raw_message(some_raw_data)
Any unhandled Exception occurring during message handling will result in an efro.error.RemoteError being raised on the sending end.
57 def __init__(self, protocol: MessageProtocol) -> None: 58 self.protocol = protocol 59 self._handlers: dict[type[Message], Callable] = {} 60 self._decode_filter_call: ( 61 Callable[[Any, dict, Message], None] | None 62 ) = None 63 self._encode_filter_call: ( 64 Callable[[Any, Message | None, Response | SysResponse, dict], None] 65 | None 66 ) = None
69 def register_handler( 70 self, call: Callable[[Any, Message], Response | None] 71 ) -> None: 72 """Register a handler call. 73 74 The message type handled by the call is determined by its 75 type annotation. 76 """ 77 # TODO: can use types.GenericAlias in 3.9. 78 # (hmm though now that we're there, it seems a drop-in 79 # replace gives us errors. Should re-test in 3.11 as it seems 80 # that typing_extensions handles it differently in that case) 81 from typing import _GenericAlias # type: ignore 82 from typing import get_type_hints, get_args 83 84 sig = inspect.getfullargspec(call) 85 86 # The provided callable should be a method taking one 'msg' arg. 87 expectedsig = ['self', 'msg'] 88 if sig.args != expectedsig: 89 raise ValueError( 90 f'Expected callable signature of {expectedsig};' 91 f' got {sig.args}' 92 ) 93 94 # Check annotation types to determine what message types we handle. 95 # Return-type annotation can be a Union, but we probably don't 96 # have it available at runtime. Explicitly pull it in. 97 # UPDATE: we've updated our pylint filter to where we should 98 # have all annotations available. 99 # anns = get_type_hints(call, localns={'Union': Union}) 100 anns = get_type_hints(call) 101 102 msgtype = anns.get('msg') 103 if not isinstance(msgtype, type): 104 raise TypeError( 105 f'expected a type for "msg" annotation; got {type(msgtype)}.' 106 ) 107 assert issubclass(msgtype, Message) 108 109 ret = anns.get('return') 110 responsetypes: tuple[type[Any] | None, ...] 111 112 # Return types can be a single type or a union of types. 113 if isinstance(ret, (_GenericAlias, types.UnionType)): 114 targs = get_args(ret) 115 if not all(isinstance(a, (type, type(None))) for a in targs): 116 raise TypeError( 117 f'expected only types for "return" annotation;' 118 f' got {targs}.' 119 ) 120 responsetypes = targs 121 else: 122 if not isinstance(ret, (type, type(None))): 123 raise TypeError( 124 f'expected one or more types for' 125 f' "return" annotation; got a {type(ret)}.' 126 ) 127 # This seems like maybe a mypy bug. Appeared after adding 128 # types.UnionType above. 129 responsetypes = (ret,) 130 131 # This will contain NoneType for empty return cases, but 132 # we expect it to be None. 133 # noinspection PyPep8 134 responsetypes = tuple( 135 None if r is type(None) else r for r in responsetypes 136 ) 137 138 # Make sure our protocol has this message type registered and our 139 # return types exactly match. (Technically we could return a subset 140 # of the supported types; can allow this in the future if it makes 141 # sense). 142 registered_types = self.protocol.message_ids_by_type.keys() 143 144 if msgtype not in registered_types: 145 raise TypeError( 146 f'Message type {msgtype} is not registered' 147 f' in this Protocol.' 148 ) 149 150 if msgtype in self._handlers: 151 raise TypeError( 152 f'Message type {msgtype} already has a registered handler.' 153 ) 154 155 # Make sure the responses exactly matches what the message expects. 156 if set(responsetypes) != set(msgtype.get_response_types()): 157 raise TypeError( 158 f'Provided response types {responsetypes} do not' 159 f' match the set expected by message type {msgtype}: ' 160 f'({msgtype.get_response_types()})' 161 ) 162 163 # Ok; we're good! 164 self._handlers[msgtype] = call
Register a handler call.
The message type handled by the call is determined by its type annotation.
166 def decode_filter_method( 167 self, call: Callable[[Any, dict, Message], None] 168 ) -> Callable[[Any, dict, Message], None]: 169 """Function decorator for defining a decode filter. 170 171 Decode filters can be used to extract extra data from incoming 172 message dicts. This version will work for both handle_raw_message() 173 and handle_raw_message_async() 174 """ 175 assert self._decode_filter_call is None 176 self._decode_filter_call = call 177 return call
Function decorator for defining a decode filter.
Decode filters can be used to extract extra data from incoming message dicts. This version will work for both handle_raw_message() and handle_raw_message_async()
179 def encode_filter_method( 180 self, 181 call: Callable[ 182 [Any, Message | None, Response | SysResponse, dict], None 183 ], 184 ) -> Callable[[Any, Message | None, Response, dict], None]: 185 """Function decorator for defining an encode filter. 186 187 Encode filters can be used to add extra data to the message 188 dict before is is encoded to a string and sent out. 189 """ 190 assert self._encode_filter_call is None 191 self._encode_filter_call = call 192 return call
Function decorator for defining an encode filter.
Encode filters can be used to add extra data to the message dict before is is encoded to a string and sent out.
194 def validate(self, log_only: bool = False) -> None: 195 """Check for handler completeness, valid types, etc.""" 196 for msgtype in self.protocol.message_ids_by_type.keys(): 197 if issubclass(msgtype, Response): 198 continue 199 if msgtype not in self._handlers: 200 msg = ( 201 f'Protocol message type {msgtype} is not handled' 202 f' by receiver type {type(self)}.' 203 ) 204 if log_only: 205 logging.error(msg) 206 else: 207 raise TypeError(msg)
Check for handler completeness, valid types, etc.
226 def encode_user_response( 227 self, bound_obj: Any, message: Message, response: Response | None 228 ) -> str: 229 """Encode a response provided by the user for sending.""" 230 231 assert isinstance(response, Response | None) 232 # (user should never explicitly return error-responses) 233 assert ( 234 response is None or type(response) in message.get_response_types() 235 ) 236 237 # A return value of None equals EmptySysResponse. 238 out_response: Response | SysResponse 239 if response is None: 240 out_response = EmptySysResponse() 241 else: 242 out_response = response 243 244 response_dict = self.protocol.response_to_dict(out_response) 245 if self._encode_filter_call is not None: 246 self._encode_filter_call( 247 bound_obj, message, out_response, response_dict 248 ) 249 return self.protocol.encode_dict(response_dict)
Encode a response provided by the user for sending.
251 def encode_error_response( 252 self, bound_obj: Any, message: Message | None, exc: Exception 253 ) -> tuple[str, bool]: 254 """Given an error, return sysresponse str and whether to log.""" 255 response, dolog = self.protocol.error_to_response(exc) 256 response_dict = self.protocol.response_to_dict(response) 257 if self._encode_filter_call is not None: 258 self._encode_filter_call( 259 bound_obj, message, response, response_dict 260 ) 261 return self.protocol.encode_dict(response_dict), dolog
Given an error, return sysresponse str and whether to log.
263 def handle_raw_message( 264 self, bound_obj: Any, msg: str, raise_unregistered: bool = False 265 ) -> str: 266 """Decode, handle, and return an response for a message. 267 268 if 'raise_unregistered' is True, will raise an 269 efro.message.UnregisteredMessageIDError for messages not handled by 270 the protocol. In all other cases local errors will translate to 271 error responses returned to the sender. 272 """ 273 assert not self.is_async, "can't call sync handler on async receiver" 274 msg_decoded: Message | None = None 275 try: 276 msg_decoded = self._decode_incoming_message(bound_obj, msg) 277 msgtype = type(msg_decoded) 278 handler = self._handlers.get(msgtype) 279 if handler is None: 280 raise RuntimeError(f'Got unhandled message type: {msgtype}.') 281 response = handler(bound_obj, msg_decoded) 282 assert isinstance(response, Response | None) 283 return self.encode_user_response(bound_obj, msg_decoded, response) 284 285 except Exception as exc: 286 if raise_unregistered and isinstance( 287 exc, UnregisteredMessageIDError 288 ): 289 raise 290 rstr, dolog = self.encode_error_response( 291 bound_obj, msg_decoded, exc 292 ) 293 if dolog: 294 if msg_decoded is not None: 295 msgtype = type(msg_decoded) 296 logging.exception( 297 'Error handling %s.%s message.', 298 msgtype.__module__, 299 msgtype.__qualname__, 300 ) 301 else: 302 logging.exception( 303 'Error handling raw efro.message' 304 ' (likely a message format incompatibility): %s.', 305 msg, 306 ) 307 return rstr
Decode, handle, and return an response for a message.
if 'raise_unregistered' is True, will raise an efro.message.UnregisteredMessageIDError for messages not handled by the protocol. In all other cases local errors will translate to error responses returned to the sender.
309 def handle_raw_message_async( 310 self, bound_obj: Any, msg: str, raise_unregistered: bool = False 311 ) -> Awaitable[str]: 312 """Should be called when the receiver gets a message. 313 314 The return value is the raw response to the message. 315 """ 316 317 # Note: This call is synchronous so that the first part of it can 318 # happen synchronously. If the whole call were async we wouldn't be 319 # able to guarantee that messages handlers would be called in the 320 # order the messages were received. 321 322 assert self.is_async, "Can't call async handler on sync receiver." 323 msg_decoded: Message | None = None 324 try: 325 msg_decoded = self._decode_incoming_message(bound_obj, msg) 326 msgtype = type(msg_decoded) 327 handler = self._handlers.get(msgtype) 328 if handler is None: 329 raise RuntimeError(f'Got unhandled message type: {msgtype}.') 330 handler_awaitable = handler(bound_obj, msg_decoded) 331 332 except Exception as exc: 333 if raise_unregistered and isinstance( 334 exc, UnregisteredMessageIDError 335 ): 336 raise 337 return self._handle_raw_message_async_error( 338 bound_obj, msg, msg_decoded, exc 339 ) 340 341 # Return an awaitable to handle the rest asynchronously. 342 return self._handle_raw_message_async( 343 bound_obj, msg, msg_decoded, handler_awaitable 344 )
Should be called when the receiver gets a message.
The return value is the raw response to the message.
400class BoundMessageReceiver: 401 """Base bound receiver class.""" 402 403 def __init__( 404 self, 405 obj: Any, 406 receiver: MessageReceiver, 407 ) -> None: 408 assert obj is not None 409 self._obj = obj 410 self._receiver = receiver 411 412 @property 413 def protocol(self) -> MessageProtocol: 414 """Protocol associated with this receiver.""" 415 return self._receiver.protocol 416 417 def encode_error_response(self, exc: Exception) -> str: 418 """Given an error, return a response ready to send. 419 420 This should be used for any errors that happen outside of 421 standard handle_raw_message calls. Any errors within those 422 calls will be automatically returned as encoded strings. 423 """ 424 # Passing None for Message here; we would only have that available 425 # for things going wrong in the handler (which this is not for). 426 return self._receiver.encode_error_response(self._obj, None, exc)[0]
Base bound receiver class.
412 @property 413 def protocol(self) -> MessageProtocol: 414 """Protocol associated with this receiver.""" 415 return self._receiver.protocol
Protocol associated with this receiver.
417 def encode_error_response(self, exc: Exception) -> str: 418 """Given an error, return a response ready to send. 419 420 This should be used for any errors that happen outside of 421 standard handle_raw_message calls. Any errors within those 422 calls will be automatically returned as encoded strings. 423 """ 424 # Passing None for Message here; we would only have that available 425 # for things going wrong in the handler (which this is not for). 426 return self._receiver.encode_error_response(self._obj, None, exc)[0]
Given an error, return a response ready to send.
This should be used for any errors that happen outside of standard handle_raw_message calls. Any errors within those calls will be automatically returned as encoded strings.
18def create_sender_module( 19 basename: str, 20 protocol_create_code: str, 21 enable_sync_sends: bool, 22 enable_async_sends: bool, 23 *, 24 private: bool = False, 25 protocol_module_level_import_code: str | None = None, 26 build_time_protocol_create_code: str | None = None, 27) -> str: 28 """Create a Python module defining a MessageSender subclass. 29 30 This class is primarily for type checking and will contain overrides 31 for the varieties of send calls for message/response types defined 32 in the protocol. 33 34 Code passed for 'protocol_create_code' should import necessary 35 modules and assign an instance of the Protocol to a 'protocol' 36 variable. 37 38 Class names are based on basename; a basename 'FooSender' will 39 result in classes FooSender and BoundFooSender. 40 41 If 'private' is True, class-names will be prefixed with an '_'. 42 43 Note: output code may have long lines and should generally be run 44 through a formatter. We should perhaps move this functionality to 45 efrotools so we can include that functionality inline. 46 """ 47 protocol = _protocol_from_code( 48 build_time_protocol_create_code 49 if build_time_protocol_create_code is not None 50 else protocol_create_code 51 ) 52 return protocol.do_create_sender_module( 53 basename=basename, 54 protocol_create_code=protocol_create_code, 55 enable_sync_sends=enable_sync_sends, 56 enable_async_sends=enable_async_sends, 57 private=private, 58 protocol_module_level_import_code=protocol_module_level_import_code, 59 )
Create a Python module defining a MessageSender subclass.
This class is primarily for type checking and will contain overrides for the varieties of send calls for message/response types defined in the protocol.
Code passed for 'protocol_create_code' should import necessary modules and assign an instance of the Protocol to a 'protocol' variable.
Class names are based on basename; a basename 'FooSender' will result in classes FooSender and BoundFooSender.
If 'private' is True, class-names will be prefixed with an '_'.
Note: output code may have long lines and should generally be run through a formatter. We should perhaps move this functionality to efrotools so we can include that functionality inline.
62def create_receiver_module( 63 basename: str, 64 protocol_create_code: str, 65 is_async: bool, 66 *, 67 private: bool = False, 68 protocol_module_level_import_code: str | None = None, 69 build_time_protocol_create_code: str | None = None, 70) -> str: 71 """ "Create a Python module defining a MessageReceiver subclass. 72 73 This class is primarily for type checking and will contain overrides 74 for the register method for message/response types defined in 75 the protocol. 76 77 Class names are based on basename; a basename 'FooReceiver' will 78 result in FooReceiver and BoundFooReceiver. 79 80 If 'is_async' is True, handle_raw_message() will be an async method 81 and the @handler decorator will expect async methods. 82 83 If 'private' is True, class-names will be prefixed with an '_'. 84 85 Note that line lengths are not clipped, so output may need to be 86 run through a formatter to prevent lint warnings about excessive 87 line lengths. 88 """ 89 protocol = _protocol_from_code( 90 build_time_protocol_create_code 91 if build_time_protocol_create_code is not None 92 else protocol_create_code 93 ) 94 return protocol.do_create_receiver_module( 95 basename=basename, 96 protocol_create_code=protocol_create_code, 97 is_async=is_async, 98 private=private, 99 protocol_module_level_import_code=protocol_module_level_import_code, 100 )
"Create a Python module defining a MessageReceiver subclass.
This class is primarily for type checking and will contain overrides for the register method for message/response types defined in the protocol.
Class names are based on basename; a basename 'FooReceiver' will result in FooReceiver and BoundFooReceiver.
If 'is_async' is True, handle_raw_message() will be an async method and the @handler decorator will expect async methods.
If 'private' is True, class-names will be prefixed with an '_'.
Note that line lengths are not clipped, so output may need to be run through a formatter to prevent lint warnings about excessive line lengths.
20class UnregisteredMessageIDError(Exception): 21 """A message or response id is not covered by our protocol."""
A message or response id is not covered by our protocol.