feat: According to the routing feature plan in Chapter 2.2.3.2 of RFC 113, the routing functionality is to be consolidated into the Environment class.

This commit is contained in:
莘权 马 2023-11-04 14:07:33 +08:00
parent 8137e1af50
commit 2688fe680a
3 changed files with 33 additions and 10 deletions

View file

@ -8,9 +8,11 @@
1. Remove the functionality of `Environment` class as a public message buffer.
2. Standardize the message forwarding behavior of the `Environment` class.
3. Add the `is_idle` property.
@Modified By: mashenquan, 2023-11-4. According to the routing feature plan in Chapter 2.2.3.2 of RFC 113, the routing
functionality is to be consolidated into the `Environment` class.
"""
import asyncio
from typing import Iterable
from typing import Iterable, Set
from pydantic import BaseModel, Field
@ -26,6 +28,7 @@ class Environment(BaseModel):
"""
roles: dict[str, Role] = Field(default_factory=dict)
consumers: dict[Role, Set] = Field(default_factory=dict)
class Config:
arbitrary_types_allowed = True
@ -36,6 +39,8 @@ class Environment(BaseModel):
"""
role.set_env(self)
self.roles[role.profile] = role
# According to the routing feature plan in Chapter 2.2.3.2 of RFC 113
self.set_subscribed_tags(role, role.subscribed_tags)
def add_roles(self, roles: Iterable[Role]):
"""增加一批在当前环境的角色
@ -55,17 +60,14 @@ class Environment(BaseModel):
"""
logger.info(f"publish_message: {message.save()}")
found = False
for r in self.roles.values():
if message.is_recipient(r.subscribed_tags):
r.put_message(message)
# According to the routing feature plan in Chapter 2.2.3.2 of RFC 113
for obj, subscribed_tags in self.consumers.items():
if message.is_recipient(subscribed_tags):
obj.put_message(message)
found = True
if not found:
logger.warning(f"Message no recipients: {message.save()}")
# Implemented the functionality related to remote message forwarding as described in RFC 113. Awaiting release.
# if self._parent:
# return self._parent.publish_message(message)
return True
async def run(self, k=1):
@ -100,3 +102,11 @@ class Environment(BaseModel):
if not r.is_idle:
return False
return True
def get_subscribed_tags(self, obj):
"""Get the labels for messages to be consumed by the object."""
return self.consumers.get(obj, {})
def set_subscribed_tags(self, obj, tags):
"""Set the labels for message to be consumed by the object"""
self.consumers[obj] = tags