Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ingestify/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,4 @@
from .source_base import Source, DatasetResource
from .main import debug_source

__version__ = "0.12.0"
__version__ = "0.12.1"
19 changes: 11 additions & 8 deletions ingestify/infra/event_log/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,8 @@ def _update_cursor(self, conn, event_id: int) -> None:
)
conn.commit()

def _run_once(self, on_event: Callable, batch_size: int = 100) -> bool:
"""Returns True on success, False if a processing error occurred."""
def _run_once(self, on_event: Callable, batch_size: int = 100) -> int:
"""Returns number of events processed, or -1 if a processing error occurred."""
with self._engine.connect() as conn:
self._ensure_reader_state(conn)
last_id = self._get_last_event_id(conn)
Expand All @@ -95,10 +95,10 @@ def _run_once(self, on_event: Callable, batch_size: int = 100) -> bool:
event_id,
type(event).event_type,
)
return False
return -1
self._update_cursor(conn, event_id)

return True
return len(rows)

def run(
self,
Expand All @@ -107,7 +107,10 @@ def run(
batch_size: int = 100,
) -> int:
while True:
success = self._run_once(on_event, batch_size)
if not success or poll_interval is None:
return 0 if success else 1
time.sleep(poll_interval)
count = self._run_once(on_event, batch_size)
if count < 0:
return 1
if count == 0:
if poll_interval is None:
return 0
time.sleep(poll_interval)
2 changes: 1 addition & 1 deletion ingestify/tests/test_event_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def test_consumer_processes_events(consumer, dataset):

def test_consumer_cursor_not_advanced_on_error(consumer, dataset):
consumer._event_log.write(RevisionAdded(dataset=dataset))
assert not consumer._run_once(lambda e: 1 / 0)
assert consumer._run_once(lambda e: 1 / 0) == -1
# Next run still sees the same event
received = []
consumer._run_once(lambda e: received.append(e))
Expand Down
Loading