diff --git a/ingestify/__init__.py b/ingestify/__init__.py index 23ca0ca..c3f5b44 100644 --- a/ingestify/__init__.py +++ b/ingestify/__init__.py @@ -9,4 +9,4 @@ from .source_base import Source, DatasetResource from .main import debug_source -__version__ = "0.12.0" +__version__ = "0.12.1" diff --git a/ingestify/infra/event_log/consumer.py b/ingestify/infra/event_log/consumer.py index a75851a..696c6b7 100644 --- a/ingestify/infra/event_log/consumer.py +++ b/ingestify/infra/event_log/consumer.py @@ -77,8 +77,8 @@ def _update_cursor(self, conn, event_id: int) -> None: ) conn.commit() - def _run_once(self, on_event: Callable, batch_size: int = 100) -> bool: - """Returns True on success, False if a processing error occurred.""" + def _run_once(self, on_event: Callable, batch_size: int = 100) -> int: + """Returns number of events processed, or -1 if a processing error occurred.""" with self._engine.connect() as conn: self._ensure_reader_state(conn) last_id = self._get_last_event_id(conn) @@ -95,10 +95,10 @@ def _run_once(self, on_event: Callable, batch_size: int = 100) -> bool: event_id, type(event).event_type, ) - return False + return -1 self._update_cursor(conn, event_id) - return True + return len(rows) def run( self, @@ -107,7 +107,10 @@ def run( batch_size: int = 100, ) -> int: while True: - success = self._run_once(on_event, batch_size) - if not success or poll_interval is None: - return 0 if success else 1 - time.sleep(poll_interval) + count = self._run_once(on_event, batch_size) + if count < 0: + return 1 + if count == 0: + if poll_interval is None: + return 0 + time.sleep(poll_interval) diff --git a/ingestify/tests/test_event_log.py b/ingestify/tests/test_event_log.py index db8d3c5..9d0d15c 100644 --- a/ingestify/tests/test_event_log.py +++ b/ingestify/tests/test_event_log.py @@ -80,7 +80,7 @@ def test_consumer_processes_events(consumer, dataset): def test_consumer_cursor_not_advanced_on_error(consumer, dataset): consumer._event_log.write(RevisionAdded(dataset=dataset)) - assert not consumer._run_once(lambda e: 1 / 0) + assert consumer._run_once(lambda e: 1 / 0) == -1 # Next run still sees the same event received = [] consumer._run_once(lambda e: received.append(e))