diff --git a/README.md b/README.md index c66b48e..2587f3c 100644 --- a/README.md +++ b/README.md @@ -145,10 +145,7 @@ Current keybindings can be seen in the footer, they can be remapped in settings. ## Roadmap -- [x] implement compact view mode -- [x] improve docker documentation usage -- [x] create video demo -- [ ] custom diff views? +Right now the software is complete, I can review PRs for additional feature requests, but the goal is mainly to keep up with the upstream API changes. ## 👨‍💻 Development diff --git a/src/changedetection_tui/dashboard/buttons.py b/src/changedetection_tui/dashboard/buttons.py index 9fe7345..465b298 100644 --- a/src/changedetection_tui/dashboard/buttons.py +++ b/src/changedetection_tui/dashboard/buttons.py @@ -1,11 +1,14 @@ from typing import final import httpx from textual import on -from changedetection_tui.dashboard.diff_widgets import DiffPanelScreen -from changedetection_tui.types import ApiListWatch +from changedetection_tui.dashboard.diff_widgets import DiffPanelScreen, execute_diff +from changedetection_tui.types import ApiListWatch, ApiWatch from textual.widgets import Button from textual.message import Message -from changedetection_tui.utils import make_api_request +from changedetection_tui.utils import ( + make_api_request, + get_best_snapshot_ts_based_on_last_viewed, +) assigned_jump_keys: set[str] = set() @@ -52,7 +55,34 @@ def __init__(self, *args, **kwargs) -> None: self.jump_key = key async def action_execute_diff(self, uuid: str) -> None: - self.app.push_screen(DiffPanelScreen(uuid=uuid)) + from changedetection_tui.settings import SETTINGS + + if not SETTINGS.get().skip_diff_dialog: + self.app.push_screen(DiffPanelScreen(uuid=uuid)) + return + + # Skip the dialog: fetch history and watch data, then run diff directly. + res = await make_api_request(self.app, route=f"/api/v1/watch/{uuid}/history") + snapshot_timestamps = [ + int(x) + for x in sorted(res.json().keys(), key=lambda x: int(x), reverse=True) + ] + res = await make_api_request(self.app, route=f"/api/v1/watch/{uuid}") + watch = ApiWatch.model_validate(res.json()) + from_ts = get_best_snapshot_ts_based_on_last_viewed( + snapshot_timestamps=snapshot_timestamps, + last_viewed=int(watch.last_viewed), + ) + to_ts = snapshot_timestamps[0] + if from_ts == to_ts: + return + await execute_diff( + app=self.app, + watch=watch, + uuid=uuid, + from_ts=from_ts, + to_ts=to_ts, + ) @final diff --git a/src/changedetection_tui/dashboard/diff_widgets.py b/src/changedetection_tui/dashboard/diff_widgets.py index 2cb985e..a17d75a 100644 --- a/src/changedetection_tui/dashboard/diff_widgets.py +++ b/src/changedetection_tui/dashboard/diff_widgets.py @@ -3,7 +3,7 @@ import shutil import subprocess from textual import on, work -from textual.app import ComposeResult +from textual.app import App, ComposeResult from textual.containers import Grid, VerticalGroup from textual.screen import ModalScreen from textual.types import NoSelection @@ -23,6 +23,162 @@ from pathvalidate import sanitize_filename +def _get_path_for(cmd: str) -> str: + """Get the path to cmd in the current Python environment.""" + cmd_path = shutil.which(cmd) + if cmd_path: + return cmd_path + + python_dir = path.dirname(sys.executable) + fallback_path = path.join(python_dir, cmd) + if path.exists(fallback_path): + return fallback_path + + for bin_dir in ["bin", "Scripts"]: + fallback_path = path.join(path.dirname(python_dir), bin_dir, cmd) + if path.exists(fallback_path): + return fallback_path + + raise RuntimeError(f"{cmd} binary not found.") + + +def _bool_to_api_string(value: bool) -> str: + return value and "true" or "false" + + +def _filename_for_diff(watch: ApiListWatch, timestamp: int) -> str: + return sanitize_filename( + f"{watch.title_or_url()}_{format_timestamp(timestamp)}", + replacement_text="_", + ) + + +def _filename_for_internal_diff( + watch: ApiListWatch, from_timestamp: int, to_timestamp: int +) -> str: + return sanitize_filename( + f"{watch.title_or_url()}_internal_diff_{format_timestamp(from_timestamp)}_to_{format_timestamp(to_timestamp)}.txt", + replacement_text="_", + ) + + +def _expand_command_based_diff_template(from_filepath: str, to_filepath: str) -> str: + settings = SETTINGS.get() + template = settings.diff.command_template + tokens = { + "{ICDIFF}": shlex.quote(_get_path_for("icdiff")), + "{FILE_FROM}": shlex.quote(from_filepath), + "{FILE_TO}": shlex.quote(to_filepath), + } + expanded_command = template + for token, value in tokens.items(): + expanded_command = expanded_command.replace(token, value) + return expanded_command + + +def run_command_based_diff( + app: App, # type: ignore[type-arg] + watch: ApiListWatch, + from_data: str, + to_data: str, + from_ts: int, + to_ts: int, +) -> None: + with TemporaryDirectory() as tmpdir: + from_filename = _filename_for_diff(watch, from_ts) + to_filename = _filename_for_diff(watch, to_ts) + from_filepath = path.join(tmpdir, from_filename) + to_filepath = path.join(tmpdir, to_filename) + + with open(from_filepath, "w", encoding="utf-8") as from_file: + from_file.write(from_data) + with open(to_filepath, "w", encoding="utf-8") as to_file: + to_file.write(to_data) + + with app.suspend(): + cmd = _expand_command_based_diff_template( + from_filepath=from_filepath, + to_filepath=to_filepath, + ) + _ = subprocess.run(cmd, shell=True, check=True) + + +async def run_internal_diff( + app: App, # type: ignore[type-arg] + watch: ApiListWatch, + uuid: str, + from_ts: int, + to_ts: int, +) -> None: + settings = SETTINGS.get().diff + params: dict[str, str] = { + "format": settings.internal_format, + "word_diff": _bool_to_api_string(settings.internal_word_diff), + "no_markup": _bool_to_api_string(settings.internal_no_markup), + "type": settings.internal_type, + "changesOnly": _bool_to_api_string(settings.internal_changes_only), + "ignoreWhitespace": _bool_to_api_string(settings.internal_ignore_whitespace), + "removed": _bool_to_api_string(settings.internal_removed), + "added": _bool_to_api_string(settings.internal_added), + "replaced": _bool_to_api_string(settings.internal_replaced), + } + internal_diff = ( + await make_api_request( + app, + route=f"/api/v1/watch/{uuid}/difference/{from_ts}/{to_ts}", + params=params, + ) + ).text + with TemporaryDirectory() as tmpdir: + internal_diff_filepath = path.join( + tmpdir, + _filename_for_internal_diff(watch, from_ts, to_ts), + ) + with open(internal_diff_filepath, "w", encoding="utf-8") as output_file: + output_file.write(internal_diff) + with app.suspend(): + _ = subprocess.run( + [ + "less", + "--RAW-CONTROL-CHARS", + "-+S", + "--wordwrap", + internal_diff_filepath, + ], + check=True, + ) + + +async def execute_diff( + app: App, # type: ignore[type-arg] + watch: ApiListWatch, + uuid: str, + from_ts: int, + to_ts: int, +) -> None: + """Run the diff for the given timestamps using the current diff settings.""" + diff_settings = SETTINGS.get().diff + if diff_settings.mode == "command-based": + from_data = ( + await make_api_request(app, route=f"/api/v1/watch/{uuid}/history/{from_ts}") + ).text + to_data = ( + await make_api_request(app, route=f"/api/v1/watch/{uuid}/history/{to_ts}") + ).text + run_command_based_diff( + app=app, + watch=watch, + from_data=from_data, + to_data=to_data, + from_ts=from_ts, + to_ts=to_ts, + ) + else: + await run_internal_diff( + app=app, watch=watch, uuid=uuid, from_ts=from_ts, to_ts=to_ts + ) + + class DiffPanelScreen(ModalScreen): """Screen for diff selection""" @@ -68,119 +224,14 @@ async def on_button_pressed(self, event: Button.Pressed) -> None: or isinstance(from_ts, NoSelection) ): return - diff_settings = SETTINGS.get().diff - if diff_settings.mode == "command-based": - from_data = ( - await make_api_request( - self.app, route=f"/api/v1/watch/{self.uuid}/history/{from_ts}" - ) - ).text - to_data = ( - await make_api_request( - self.app, route=f"/api/v1/watch/{self.uuid}/history/{to_ts}" - ) - ).text - self._run_command_based_diff( - from_data=from_data, - to_data=to_data, - from_ts=from_ts, - to_ts=to_ts, - ) - else: - await self._run_internal_diff(from_ts=from_ts, to_ts=to_ts) - _ = self.app.pop_screen() - - def _run_command_based_diff( - self, - from_data: str, - to_data: str, - from_ts: int, - to_ts: int, - ) -> None: - with TemporaryDirectory() as tmpdir: - from_filename = self._filename_for_diff(self.api_watch, from_ts) - to_filename = self._filename_for_diff(self.api_watch, to_ts) - from_filepath = path.join(tmpdir, from_filename) - to_filepath = path.join(tmpdir, to_filename) - - with open(from_filepath, "w", encoding="utf-8") as from_file: - from_file.write(from_data) - with open(to_filepath, "w", encoding="utf-8") as to_file: - to_file.write(to_data) - - with self.app.suspend(): - cmd = self._expand_command_based_diff_template( - from_filepath=from_filepath, - to_filepath=to_filepath, - ) - _ = subprocess.run(cmd, shell=True, check=True) - - def _expand_command_based_diff_template( - self, from_filepath: str, to_filepath: str - ) -> str: - settings = SETTINGS.get() - template = settings.diff.command_template - tokens = { - "{ICDIFF}": shlex.quote(self._get_path_for("icdiff")), - "{FILE_FROM}": shlex.quote(from_filepath), - "{FILE_TO}": shlex.quote(to_filepath), - } - expanded_command = template - for token, value in tokens.items(): - expanded_command = expanded_command.replace(token, value) - return expanded_command - - async def _run_internal_diff(self, from_ts: int, to_ts: int) -> None: - settings = SETTINGS.get().diff - params: dict[str, str] = { - "format": settings.internal_format, - "word_diff": self._bool_to_api_string(settings.internal_word_diff), - "no_markup": self._bool_to_api_string(settings.internal_no_markup), - "type": settings.internal_type, - "changesOnly": self._bool_to_api_string(settings.internal_changes_only), - "ignoreWhitespace": self._bool_to_api_string( - settings.internal_ignore_whitespace - ), - "removed": self._bool_to_api_string(settings.internal_removed), - "added": self._bool_to_api_string(settings.internal_added), - "replaced": self._bool_to_api_string(settings.internal_replaced), - } - internal_diff = ( - await make_api_request( - self.app, - route=f"/api/v1/watch/{self.uuid}/difference/{from_ts}/{to_ts}", - params=params, - ) - ).text - with TemporaryDirectory() as tmpdir: - internal_diff_filepath = path.join( - tmpdir, - self._filename_for_internal_diff(self.api_watch, from_ts, to_ts), - ) - with open(internal_diff_filepath, "w", encoding="utf-8") as output_file: - output_file.write(internal_diff) - with self.app.suspend(): - _ = subprocess.run( - [ - "less", - "--RAW-CONTROL-CHARS", - "-+S", - "--wordwrap", - internal_diff_filepath, - ], - check=True, - ) - - def _filename_for_internal_diff( - self, watch: ApiListWatch, from_timestamp: int, to_timestamp: int - ) -> str: - return sanitize_filename( - f"{watch.title_or_url()}_internal_diff_{format_timestamp(from_timestamp)}_to_{format_timestamp(to_timestamp)}.txt", - replacement_text="_", + await execute_diff( + app=self.app, + watch=self.api_watch, + uuid=self.uuid, + from_ts=from_ts, + to_ts=to_ts, ) - - def _bool_to_api_string(self, value: bool) -> str: - return value and "true" or "false" + _ = self.app.pop_screen() @work(exclusive=True) async def load_data(self, uuid: str) -> tuple[list[int], int, ApiWatch]: @@ -229,29 +280,3 @@ def get_watch_list_result_from_worker(self, event: Worker.StateChanged) -> None: select_to.value = snapshot_timestamps[0] self.api_watch = worker.result[2] - - def _get_path_for(self, cmd: str) -> str: - """Get the path to the cmd in the current Python environment.""" - cmd_path = shutil.which(cmd) - if cmd_path: - return cmd_path - - # Fallback: try to find it in the same directory as the Python executable - python_dir = path.dirname(sys.executable) - fallback_path = path.join(python_dir, cmd) - if path.exists(fallback_path): - return fallback_path - - # Another fallback: try common bin directories - for bin_dir in ["bin", "Scripts"]: - fallback_path = path.join(path.dirname(python_dir), bin_dir, cmd) - if path.exists(fallback_path): - return fallback_path - - raise RuntimeError(f"{cmd} binary not found.") - - def _filename_for_diff(self, watch: ApiListWatch, timestamp: int) -> str: - return sanitize_filename( - f"{watch.title_or_url()}_{format_timestamp(timestamp)}", - replacement_text="_", - ) diff --git a/src/changedetection_tui/dashboard/watchlist.py b/src/changedetection_tui/dashboard/watchlist.py index b109fe0..85a7eb3 100644 --- a/src/changedetection_tui/dashboard/watchlist.py +++ b/src/changedetection_tui/dashboard/watchlist.py @@ -140,15 +140,7 @@ def compose(self) -> ComposeResult: rows_per_page = self.rows_per_page_from_resize self.can_focus: bool = False # order, filter, chunk. Here I have to materialize the list because I need to get the length of it. - filtered_tuples = [ - x - for x in sorted( - self.all_rows.root.items(), - key=self._get_list_sorting_key, - reverse=(self.ordering.order_direction == Ordering.OrderDirection.DESC), - ) - if not self.only_unviewed or not x[1].viewed - ] # [(uuid, ApiListWatch), (uuid, ApiListWatch), ...] + filtered_tuples = self._visible_rows() # [(uuid, ApiListWatch), ...] tuples_for_page = batched(filtered_tuples, rows_per_page) batch = next( islice(tuples_for_page, self.current_page, self.current_page + 1), () @@ -209,4 +201,100 @@ def update_all_rows(self, event: buttons.UpdatedWatchEvent) -> None: """Takes care of updating the single watch in the list of rows""" # Invalidate watchlist cache after recheck invalidate_watchlist_cache() + # Capture focus context before data mutation/recompose. Recompose can + # destroy the currently focused button widget. + visible_rows_before = self._visible_rows() + focused_row_uuid, focused_col_index = self._focused_row_uuid_and_col_index() + # Keep row position as fallback when the focused row disappears after + # filtering (e.g. toggling "viewed" while "only_unviewed" is active). + focused_row_index = ( + next( + ( + idx + for idx, (uuid, _) in enumerate(visible_rows_before) + if uuid == focused_row_uuid + ), + None, + ) + if focused_row_uuid + else None + ) self.all_rows.root[event.uuid] = event.watch + self.mutate_reactive(WatchListWidget.all_rows) + + if focused_col_index is None: + return + visible_rows_after = self._visible_rows() + # Choose a stable target row after refresh: prefer same row, otherwise + # preserve user's relative position in the list. + target_uuid = self._target_uuid_after_update( + visible_rows_after=visible_rows_after, + focused_row_uuid=focused_row_uuid, + focused_row_index=focused_row_index, + ) + if not target_uuid: + return + # Restore focus after the DOM has been refreshed. + _ = self.call_after_refresh( + self._restore_focus_on_row, + target_uuid, + focused_col_index, + ) + + def _visible_rows( + self, + ) -> list[tuple[str, ApiListWatch]]: + return [ + x + for x in sorted( + self.all_rows.root.items(), + key=self._get_list_sorting_key, + reverse=(self.ordering.order_direction == Ordering.OrderDirection.DESC), + ) + if not self.only_unviewed or not x[1].viewed + ] + + def _focused_row_uuid_and_col_index(self) -> tuple[str | None, int | None]: + focused = self.screen.focused + if not focused: + return (None, None) + parent = focused + while parent and not isinstance(parent, WatchRow): + parent = parent.parent + if not isinstance(parent, WatchRow): + return (None, None) + # Save the column index (0=Recheck, 1=Set viewed, 2=View Diff) so we + # can restore focus to the same action column on the target row. + focusables = [w for w in parent.query() if w.focusable] + col_index = next( + (i for i, w in enumerate(focusables) if w is focused), + 0, + ) + return (parent.uuid, col_index) + + def _target_uuid_after_update( + self, + visible_rows_after: list[tuple[str, ApiListWatch]], + focused_row_uuid: str | None, + focused_row_index: int | None, + ) -> str | None: + if not visible_rows_after: + return None + if focused_row_uuid and any( + focused_row_uuid == row_uuid for row_uuid, _ in visible_rows_after + ): + return focused_row_uuid + if focused_row_index is None: + return None + if focused_row_index < len(visible_rows_after): + return visible_rows_after[focused_row_index][0] + return visible_rows_after[-1][0] + + def _restore_focus_on_row(self, target_uuid: str, focused_col_index: int) -> None: + target_row = next( + (row for row in self.query(WatchRow) if row.uuid == target_uuid), + None, + ) + if not target_row: + return + target_row.focus_row(at_col_index=focused_col_index) diff --git a/src/changedetection_tui/dashboard/watchrow.py b/src/changedetection_tui/dashboard/watchrow.py index cebcc23..8ad2151 100644 --- a/src/changedetection_tui/dashboard/watchrow.py +++ b/src/changedetection_tui/dashboard/watchrow.py @@ -17,7 +17,6 @@ RecheckButton, SwitchViewedStateButton, DiffButton, - UpdatedWatchEvent, ) from changedetection_tui.utils import format_timestamp @@ -103,20 +102,33 @@ def compose(self) -> ComposeResult: action=f'focused.execute_diff("{self.uuid}")', ) - @on(UpdatedWatchEvent) - def refresh_the_watchrow(self, event: UpdatedWatchEvent) -> None: - self.api_list_watch = event.watch + # NOTE: UpdatedWatchEvent is NOT handled here. It bubbles up to + # WatchListWidget.update_all_rows which mutates the shared data reactive + # and triggers a single recompose of the whole list. Handling it here + # as well would cause a WatchRow-level recompose that destroys the + # focused button *before* the list-level handler can capture focus + # context, resulting in wrong focus restoration. @on(Click) - def focus_row(self, at_virtual_x: int | None = None) -> None: + def focus_row( + self, + at_virtual_x: int | None = None, + at_col_index: int | None = None, + ) -> None: my_focusables = [w for w in list(self.query()) if w.focusable] - idx_in_row = next( - ( - i - for i, w in enumerate(my_focusables) - if w.virtual_region.x == at_virtual_x - ), - 0, - ) - if self.screen.focused not in my_focusables: - self.screen.set_focus(my_focusables[idx_in_row]) + if not my_focusables: + return + if at_col_index is not None: + # Direct index lookup – used after recompose when virtual_region + # coordinates are not yet reliable. + idx_in_row = min(at_col_index, len(my_focusables) - 1) + else: + idx_in_row = next( + ( + i + for i, w in enumerate(my_focusables) + if w.virtual_region.x == at_virtual_x + ), + 0, + ) + self.screen.set_focus(my_focusables[idx_in_row]) diff --git a/src/changedetection_tui/settings/settings.py b/src/changedetection_tui/settings/settings.py index c399cb4..6dff559 100644 --- a/src/changedetection_tui/settings/settings.py +++ b/src/changedetection_tui/settings/settings.py @@ -294,6 +294,12 @@ class Settings(BaseSettings): "help": "Display in compact mode", }, ] = True + skip_diff_dialog: Annotated[ + bool, + { + "help": "Skip the diff dialog and show diff directly using defaults", + }, + ] = True keybindings: KeyBindingSettings = Field(default_factory=KeyBindingSettings) diff: DiffSettings = Field(default_factory=DiffSettings) diff --git a/src/changedetection_tui/settings/settings_screen.py b/src/changedetection_tui/settings/settings_screen.py index 3033b83..31516c0 100644 --- a/src/changedetection_tui/settings/settings_screen.py +++ b/src/changedetection_tui/settings/settings_screen.py @@ -276,6 +276,8 @@ def compose(self) -> ComposeResult: "(In the API Key field you can use the $ENV_VAR syntax to avoid storing the secret value to the config file)", classes="required-field-description", ) + with TabPane("UI", id="tabpane-ui"): + with Grid(id="grid-ui-settings"): yield Label( Settings.model_fields["compact_mode"].metadata[0]["help"] or "No Label" @@ -284,6 +286,16 @@ def compose(self) -> ComposeResult: value=self.settings.compact_mode, id="checkbox-for-compact_mode", ) + yield Label( + Settings.model_fields["skip_diff_dialog"].metadata[0][ + "help" + ] + or "No Label" + ) + yield Checkbox( + value=self.settings.skip_diff_dialog, + id="checkbox-for-skip_diff_dialog", + ) with TabPane("Keybindings", id="tabpane-keybindings"): with Grid(id="keybindings-settings-grid"): for context_name, fieldinfo in type( @@ -481,6 +493,14 @@ def _reconstruct_settings_from_form(self) -> Settings: if not isinstance(checkbox_for_compact_mode, Checkbox): raise ValueError(f"Expected Checkbox, got {type(input_for_url)}") + checkbox_for_skip_diff_dialog = self.screen.query_exactly_one( + "#checkbox-for-skip_diff_dialog" + ) + if not isinstance(checkbox_for_skip_diff_dialog, Checkbox): + raise ValueError( + f"Expected Checkbox, got {type(checkbox_for_skip_diff_dialog)}" + ) + input_for_diff_command_template = self.screen.query_exactly_one( "#input-for-diff-command-template" ) @@ -570,6 +590,7 @@ def _reconstruct_settings_from_form(self) -> Settings: url=form_url, api_key=form_apikey, compact_mode=checkbox_for_compact_mode.value, + skip_diff_dialog=checkbox_for_skip_diff_dialog.value, keybindings=KeyBindingSettings(**kbs_payload), # pyright: ignore[reportArgumentType] diff=diff_settings, ) diff --git a/src/changedetection_tui/tui.scss b/src/changedetection_tui/tui.scss index 886420c..71f98a9 100644 --- a/src/changedetection_tui/tui.scss +++ b/src/changedetection_tui/tui.scss @@ -265,6 +265,25 @@ SettingsScreen { } } /* END #tabpane-main */ + + #tabpane-ui { + #grid-ui-settings { + grid-size: 2; + grid-columns: 1fr 2fr; + grid-rows: auto; + grid-gutter: 1 1; + height: auto; + Label { + width: 100%; + height: 100%; + text-align: right; + content-align-vertical: middle; + } + Checkbox { + } + } + } /* END #tabpane-ui */ + #tabpane-keybindings { #keybindings-settings-grid { grid-size: 2; diff --git a/tests/test_settings.py b/tests/test_settings.py index d37f02d..fbde904 100644 --- a/tests/test_settings.py +++ b/tests/test_settings.py @@ -153,6 +153,7 @@ def test_settings_with_no_yaml(setup_config): "url": FAKE_URL, "api_key": FAKE_APIKEY, "compact_mode": True, + "skip_diff_dialog": True, "keybindings": copy.deepcopy(DEFAULT_KEYBINDINGS), "diff": copy.deepcopy(DEFAULT_DIFF_SETTINGS), }, "With no file it still needs to be able to access defaults"