diff --git a/pixi.lock b/pixi.lock index 6a7b63c..d6d8e6d 100644 --- a/pixi.lock +++ b/pixi.lock @@ -1558,8 +1558,8 @@ packages: requires_python: '>=3.9' - pypi: ./ name: arroyopy - version: 0.2.2 - sha256: c6fdf2a9b564d18ebbecd592b255d38e1390a315a4098c2a13fab22db9dc38b7 + version: 0.3.0a7 + sha256: 76dc3179c409495f9e06776fc63c6c8572405029c82a82b1368b9ce9282c9500 requires_dist: - python-dotenv - pandas @@ -1581,7 +1581,7 @@ packages: - redis ; extra == 'redis' - tiled[client] ; extra == 'tiled' - watchfiles>=0.21.0 ; extra == 'file-watch' - requires_python: '>=3.11,<3.13' + requires_python: '>=3.11,<3.14' - pypi: https://files.pythonhosted.org/packages/d9/ab/6936e2663c47a926e0659437b9333ad87d1ff49b1375d239026e0a268eba/asgi_correlation_id-4.3.4-py3-none-any.whl name: asgi-correlation-id version: 4.3.4 diff --git a/src/arroyopy/operator.py b/src/arroyopy/operator.py index 2e2795e..9444151 100644 --- a/src/arroyopy/operator.py +++ b/src/arroyopy/operator.py @@ -35,6 +35,9 @@ def add_publisher(self, publisher: Publisher) -> None: def remove_publisher(self, publisher: Publisher) -> None: self.publishers.remove(publisher) + async def notify(self, message: Message) -> None: + await self.listener_queue.put(message) + async def publish(self, message: Message) -> None: for publisher in self.publishers: await publisher.publish(message) diff --git a/src/arroyopy/redis.py b/src/arroyopy/redis.py index baf7e7f..426fc7b 100644 --- a/src/arroyopy/redis.py +++ b/src/arroyopy/redis.py @@ -48,6 +48,13 @@ async def stop(self): await self.redis_client.aclose() +def redis_listener_factory( + redis_uri: str, redis_channel_name: str, operator: Operator = None +) -> RedisListener: + redis_client = Redis(redis_uri) + return RedisListener(redis_client, redis_channel_name) + + class RedisPublisher(Publisher): def __init__(self, redis_client: Redis, redis_channel_name: str): self.redis_client: Redis = redis_client