diff --git a/community/kortexa-radio/README.md b/community/kortexa-radio/README.md new file mode 100644 index 00000000..f80b727c --- /dev/null +++ b/community/kortexa-radio/README.md @@ -0,0 +1,41 @@ +# Kortexa Radio + +![Community](https://img.shields.io/badge/OpenHome-Community-orange?style=flat-square) +![Author](https://img.shields.io/badge/Author-@kortexa--ai-lightgrey?style=flat-square) + +## What It Does +Streams AI-generated radio from [radio.kortexa.ai](https://radio.kortexa.ai) to your OpenHome device. Music, DJ announcements, and news segments — all generated in real-time by AI. + +## Suggested Trigger Words +- "start radio" +- "kortexa radio" + +## Setup +No setup required. The stream is public. + +## How It Works +1. Say the trigger word +2. The ability enters music mode and starts streaming +3. Interrupt (wake word or touch) to stop — control returns to the agent +4. Say the trigger word again to restart + +## Example Conversation +> **User:** "start radio" +> **AI:** "Tuning in to Kortexa Radio." +> *(AI-generated music plays through the device)* +> +> *(User interrupts)* +> **AI:** *(agent resumes normal conversation)* +> +> **User:** "start radio" +> **AI:** "Tuning in to Kortexa Radio." +> *(Music plays again)* + +## Stream Details +- Format: MP3, 128kbps, 48kHz stereo +- Source: https://api.kortexa.ai/radio/stream +- Content: AI-generated music, DJ segments, news updates +- Programming changes by time of day + +## Logs +Look for `[KortexaRadio]` entries in OpenHome Live Editor logs. diff --git a/community/kortexa-radio/__init__.py b/community/kortexa-radio/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/community/kortexa-radio/main.py b/community/kortexa-radio/main.py new file mode 100644 index 00000000..a0b82b56 --- /dev/null +++ b/community/kortexa-radio/main.py @@ -0,0 +1,147 @@ +import asyncio +import httpx + +from src.agent.capability import MatchingCapability +from src.main import AgentWorker +from src.agent.capability_worker import CapabilityWorker + +STREAM_URL = "https://api.kortexa.ai/radio/stream" +EVENTS_URL = "https://api.kortexa.ai/radio/events" +CHUNK_SIZE = 25 * 1024 +TAG = "[KortexaRadio]" + +STOP_WORDS = ["stop", "off", "exit", "quit", "turn it off"] + + +class KortexaRadioCapability(MatchingCapability): + worker: AgentWorker = None + capability_worker: CapabilityWorker = None + + #{{register_capability}} + + async def _stream(self): + """Stream radio audio with pause/stop handling.""" + try: + + async with httpx.AsyncClient(timeout=None) as client: + async with client.stream("GET", STREAM_URL, follow_redirects=True) as response: + self.worker.editor_logging_handler.info(f"{TAG} Connected, status={response.status_code}") + + if response.status_code != 200: + await self.capability_worker.speak("Could not connect to the radio stream.") + return + + await self.capability_worker.stream_init() + + async for chunk in response.aiter_bytes(chunk_size=CHUNK_SIZE): + if not chunk: + self.worker.editor_logging_handler.info(f"{TAG} No chunk") + continue + + if self.worker.music_mode_stop_event.is_set(): + self.worker.editor_logging_handler.info(f"{TAG} Stop event, ending stream") + await self.capability_worker.stream_end() + return + + while self.worker.music_mode_pause_event.is_set(): + await self.worker.session_tasks.sleep(0.1) + + await self.capability_worker.send_audio_data_in_stream(chunk) + + await self.capability_worker.stream_end() + + except asyncio.CancelledError as e: + self.worker.editor_logging_handler.info(f"{TAG} Stream cancelled: {e}") + except Exception as e: + self.worker.editor_logging_handler.error(f"{TAG} Stream error: {e}") + + async def _keep_events_connected(self): + """Stay connected to SSE to register as a listener. Events are not processed.""" + try: + while True: + try: + async with httpx.AsyncClient(timeout=httpx.Timeout(30.0)) as client: + async with client.stream("GET", EVENTS_URL) as response: + async for _ in response.aiter_lines(): + pass + except httpx.HTTPError as e: + self.worker.editor_logging_handler.error(f"{TAG} SSE error: {e}") + pass + + await self.worker.session_tasks.sleep(5) + except asyncio.CancelledError: + self.worker.editor_logging_handler.info(f"{TAG} SSE cancelled") + + pass + + async def run(self): + """Auto-start radio, listen for stop command, exit cleanly.""" + + stream_task = None + events_task = None + + try: + await self.capability_worker.speak("Tuning in to Kortexa Radio.") + + # Subscribe to events (registers us as a listener) + self.worker.editor_logging_handler.info(f"{TAG} Register as a listener") + events_task = self.worker.session_tasks.create(self._keep_events_connected()) + + # Turn on music mode + self.worker.editor_logging_handler.info(f"{TAG} Turn on music mode") + self.worker.music_mode_event.set() + await self.capability_worker.send_data_over_websocket("music-mode", {"mode": "on"}) + + # Stream audio in background so the loop stays responsive + self.worker.editor_logging_handler.info(f"{TAG} Start streaming") + stream_task = self.worker.session_tasks.create(self._stream()) + + # Wait for stop command + while not self.worker.music_mode_stop_event.is_set(): + msg = await self.capability_worker.user_response() + + if self.worker.music_mode_stop_event.is_set(): + break + + if msg: + normalized = msg.strip().lower() + self.worker.editor_logging_handler.info(f"{TAG} Command: {normalized}") + + if any(word in normalized for word in STOP_WORDS): + self.worker.editor_logging_handler.info(f"{TAG} Stop requested") + self.worker.music_mode_stop_event.set() + break + + await self.capability_worker.speak("Radio off! Catch you later.") + + except Exception as e: + self.worker.editor_logging_handler.error(f"{TAG} Error: {e}") + + finally: + self.worker.editor_logging_handler.info(f"{TAG} Clean up") + + self.worker.music_mode_stop_event.set() + + if stream_task: + stream_task.cancel() + if events_task: + events_task.cancel() + + await self.capability_worker.send_data_over_websocket("music-mode", {"mode": "off"}) + self.worker.music_mode_event.clear() + self.worker.music_mode_stop_event.clear() + + self.worker.editor_logging_handler.info(f"{TAG} Radio OFF") + + await self.worker.session_tasks.sleep(1) + self.capability_worker.resume_normal_flow() + + def call(self, worker: AgentWorker): + try: + self.worker = worker + self.capability_worker = CapabilityWorker(self.worker) + self.worker.editor_logging_handler.info(f"{TAG} Radio ON") + + self.worker.session_tasks.create(self.run()) + except Exception as e: + self.worker.editor_logging_handler.warning(e)