From bdb9fb5d88a8203a09368c41a063380a9f6f0b3c Mon Sep 17 00:00:00 2001 From: Guilherme Costa Date: Fri, 13 Mar 2026 12:07:29 +0000 Subject: [PATCH] fix(configfile): fix bugs, optimize parser, add support for normal an inline comments and add unit tests loadWidget: fix bug that would result in a crash --- BlocksScreen/configfile.py | 347 ++++----- BlocksScreen/lib/panels/widgets/loadWidget.py | 4 +- tests/util/test_configfile_unit.py | 664 ++++++++++++++++++ 3 files changed, 844 insertions(+), 171 deletions(-) create mode 100644 tests/util/test_configfile_unit.py diff --git a/BlocksScreen/configfile.py b/BlocksScreen/configfile.py index ed5828a0..904519bb 100644 --- a/BlocksScreen/configfile.py +++ b/BlocksScreen/configfile.py @@ -27,31 +27,29 @@ import configparser import enum -import io import logging -import os import pathlib import re import threading -import typing +from typing import Any from helper_methods import check_file_on_path logger = logging.getLogger(__name__) -HOME_DIR = os.path.expanduser("~/") -WORKING_DIR = os.getcwd() -DEFAULT_CONFIGFILE_PATH = pathlib.Path(HOME_DIR, "printer_data", "config") -FALLBACK_CONFIGFILE_PATH = pathlib.Path(WORKING_DIR) +DEFAULT_CONFIGFILE_PATH = pathlib.Path.home() / "printer_data" / "config" +FALLBACK_CONFIGFILE_PATH = pathlib.Path.cwd() -T = typing.TypeVar("T") -indentation_size = 4 +_RE_SECTION = re.compile(r"^\s*\[([^]]+)\]") +_RE_OPTION = re.compile(r"^(\w+)[=:]") +_RE_INLINE_COMMENT = re.compile(r"(?<=\w)\s+[#;]") +_RE_SEP_NORMALIZE = re.compile(r"\s*[:=]\s*") class Sentinel(enum.Enum): """Sentinel value to signify missing condition, absence of value""" - MISSING = object + MISSING = object() class ConfigError(Exception): @@ -71,43 +69,50 @@ class BlocksScreenConfig: changes without losing comments or formatting. """ - config = configparser.ConfigParser( - allow_no_value=True, - ) - update_pending: bool = False - _instance = None - - def __init__( - self, configfile: typing.Union[str, pathlib.Path], section: str - ) -> None: + def __init__(self, configfile: str | pathlib.Path, section: str) -> None: """Initialise with the path to the config file and the default section name.""" self.configfile = pathlib.Path(configfile) self.section = section - self.raw_config: typing.List[str] = [] - self.raw_dict_config: typing.Dict = {} - self.file_lock = threading.Lock() # Thread safety for future work + self.config = configparser.ConfigParser( + allow_no_value=True, + comment_prefixes=("#", ";"), + inline_comment_prefixes=None, # handled manually in _parse_file + ) + self.update_pending: bool = False + self.raw_config: list[str] = [] + # RLock: update_option calls add_section/add_option while holding the lock + self.file_lock = threading.RLock() - def __getitem__(self, key: str) -> BlocksScreenConfig: + def __getitem__(self, key: str) -> BlocksScreenConfig | None: """Return a :class:`BlocksScreenConfig` for *key* section (same as ``get_section``).""" return self.get_section(key) - def __contains__(self, key): + def __contains__(self, key: str) -> bool: """Return True if *key* is a section in the underlying ConfigParser.""" return key in self.config - def sections(self) -> typing.List[str]: + def sections(self) -> list[str]: """Returns list of all sections""" return self.config.sections() def get_section( - self, section: str, fallback: typing.Optional[T] = None - ) -> BlocksScreenConfig: - """Get configfile section""" + self, section: str, fallback: BlocksScreenConfig | None = None + ) -> BlocksScreenConfig | None: + """Return a section-scoped view sharing this instance's parsed ConfigParser. + + The returned view's read methods (get, has_option, getint, …) operate on + the same data as the parent. Write operations (add_option, update_option, + save_configuration) should still be called on the root object. + """ if not self.config.has_section(section): return fallback - return BlocksScreenConfig(self.configfile, section) + view = BlocksScreenConfig(self.configfile, section) + view.config = self.config # share — reads see the actual parsed data + view.raw_config = self.raw_config # share — _find_section_* helpers work + view.file_lock = self.file_lock # share — thread safety consistent + return view - def get_options(self) -> list: + def get_options(self) -> list[str]: """Get section options""" return self.config.options(self.section) @@ -120,7 +125,7 @@ def has_section(self, section: str) -> bool: Returns: bool: true if section exists, false otherwise """ - return bool(self.config.has_section(section)) + return self.config.has_section(section) def has_option(self, option: str) -> bool: """Check if section has a option @@ -131,58 +136,71 @@ def has_option(self, option: str) -> bool: Returns: bool: true if section exists, false otherwise """ - return bool(self.config.has_option(self.section, option)) + return self.config.has_option(self.section, option) def get( self, option: str, parser: type = str, - default: typing.Union[Sentinel, str, T] = Sentinel.MISSING, - ) -> typing.Union[Sentinel, str]: - """Get option value + default: Any = Sentinel.MISSING, + ) -> Any: + """Get option value. Args: option (str): option name - parser (type, optional): bool, float, int. Defaults to str. - default (typing.Union[Sentinel, str, T], optional): Default value for specified option. Defaults to Sentinel.MISSING. + parser (type, optional): int, float, or str. Defaults to str. + Do **not** pass ``bool`` — use :meth:`getboolean` instead. + ``get(parser=bool)`` is intercepted and delegated to + ``getboolean`` to avoid the ``bool("false") == True`` pitfall. + default: Returned as-is when the option is absent. + Defaults to Sentinel.MISSING (raises if option not found). Returns: - typing.Union[Sentinel, str]: Requested option. Defaults to the specified default value + Any: Parsed option value, or *default* if the option is absent. """ - return parser( - self.config.get(section=self.section, option=option, fallback=default) - ) + if parser is bool: + return self.getboolean(option, default=default) + try: + return parser(self.config.get(section=self.section, option=option)) + except (configparser.NoOptionError, configparser.NoSectionError): + if default is Sentinel.MISSING: + raise + return default def getint( self, option: str, - default: typing.Union[Sentinel, int] = Sentinel.MISSING, - ) -> typing.Union[Sentinel, int]: - """Get option value + default: Sentinel | int = Sentinel.MISSING, + ) -> int: + """Get option value as int. Args: option (str): option name - default (typing.Union[Sentinel, int], optional): Default value for specified option. Defaults to Sentinel.MISSING. + default (int, optional): returned as-is when absent. Raises if omitted. Returns: - typing.Union[Sentinel, int]: Requested option. + int: parsed value, or *default* if absent. """ + if default is Sentinel.MISSING: + return self.config.getint(section=self.section, option=option) return self.config.getint(section=self.section, option=option, fallback=default) def getfloat( self, option: str, - default: typing.Union[Sentinel, float] = Sentinel.MISSING, - ) -> typing.Union[Sentinel, float]: - """Get the value for the specified option + default: Sentinel | float = Sentinel.MISSING, + ) -> float: + """Get option value as float. Args: option (str): option name - default (typing.Union[Sentinel, float], optional): Default value for specified option. Defaults to Sentinel.MISSING. + default (float, optional): returned as-is when absent. Raises if omitted. Returns: - typing.Union[Sentinel, float]: _description_ + float: parsed value, or *default* if absent. """ + if default is Sentinel.MISSING: + return self.config.getfloat(section=self.section, option=option) return self.config.getfloat( section=self.section, option=option, fallback=default ) @@ -190,17 +208,19 @@ def getfloat( def getboolean( self, option: str, - default: typing.Union[Sentinel, bool] = Sentinel.MISSING, - ) -> typing.Union[Sentinel, bool]: - """Get option value + default: Sentinel | bool = Sentinel.MISSING, + ) -> bool: + """Get option value as bool. Args: option (str): option name - default (typing.Union[Sentinel, bool], optional): Default value for specified option. Defaults to Sentinel.MISSING. + default (bool, optional): returned as-is when absent. Raises if omitted. Returns: - typing.Union[Sentinel, bool]: _description_ + bool: parsed value, or *default* if absent. """ + if default is Sentinel.MISSING: + return self.config.getboolean(section=self.section, option=option) return self.config.getboolean( section=self.section, option=option, fallback=default ) @@ -208,40 +228,22 @@ def getboolean( def _find_section_index(self, section: str) -> int: """Return the index of the ``[section]`` header line in ``raw_config``.""" try: - return self.raw_config.index("[" + section + "]") + return self.raw_config.index(f"[{section}]") except ValueError as e: raise configparser.Error(f'Section "{section}" does not exist: {e}') - def _find_section_limits(self, section: str) -> typing.Tuple: + def _find_section_limits(self, section: str) -> tuple[int, int]: """Return ``(start_index, end_index)`` of *section* in ``raw_config``.""" try: section_start = self._find_section_index(section) buffer = self.raw_config[section_start:] section_end = buffer.index("") - return (section_start, int(section_end + section_start)) - except configparser.Error as e: + return (section_start, section_end + section_start) + except (configparser.Error, ValueError) as e: raise configparser.Error( f'Error while finding section "{section}" limits on local tracking: {e}' ) - def _find_option_index( - self, section: str, option: str - ) -> typing.Union[Sentinel, int, None]: - """Return the index of the *option* line within *section* in ``raw_config``.""" - try: - start, end = self._find_section_limits(section) - section_buffer = self.raw_config[start:][:end] - for index in range(len(section_buffer)): - if "[" + option + "]" in section_buffer[index]: - return start + index - raise configparser.Error( - f'Cannot find option "{option}" in section "{section}"' - ) - except configparser.Error as e: - raise configparser.Error( - f'Unable to find option "{option}" in section "{section}": {e}' - ) - def add_section(self, section: str) -> None: """Add a section to configuration file @@ -258,58 +260,49 @@ def add_section(self, section: str) -> None: raise configparser.DuplicateSectionError( f'Section "{sec_string}" already exists' ) - config = self.raw_config - if config and config[-1].strip() != "": - config.append("") - config.extend([sec_string, ""]) - updated_config = "\n".join(config) - self.raw_config = updated_config.splitlines() - if self.raw_config[-1] != "": + if self.raw_config and self.raw_config[-1].strip() != "": self.raw_config.append("") + self.raw_config.extend([sec_string, ""]) self.config.add_section(section) self.update_pending = True except configparser.DuplicateSectionError as e: - logger.error(f'Section "{section}" already exists. {e}') + logger.error('Section "%s" already exists. %s', section, e) except configparser.Error as e: - logger.error(f'Unable to add "{section}" section to configuration: {e}') + logger.error('Unable to add "%s" section to configuration: %s', section, e) def add_option( self, section: str, option: str, - value: typing.Union[str, None] = None, + value: str | None = None, ) -> None: - """Add option with a value to a section + """Add option with a value to a section. Args: section (str): section name option (str): option name - value (typing.Union[str, None], optional): value for the specified option. Defaults to None. + value (str | None, optional): value for the option. ``None`` writes + a value-less option (``allow_no_value=True``). Defaults to None. """ try: with self.file_lock: - section_start, section_end = self._find_section_limits(section) - config = self.raw_config.copy() - opt_string = f"{option}: {value}" - config.insert(section_end, opt_string) - updated_config = "\n".join(config) - self.raw_config = updated_config.splitlines() - if self.raw_config[-1] != "": - self.raw_config.append("") + _, section_end = self._find_section_limits(section) + raw_line = f"{option}: {value}" if value is not None else f"{option}:" + self.raw_config.insert(section_end, raw_line) self.config.set(section, option, value) self.update_pending = True except configparser.DuplicateOptionError as e: - logger.error(f"Option {option} already present on {section}: {e}") + logger.error("Option %s already present on %s: %s", option, section, e) except configparser.Error as e: logger.error( - f'Unable to add "{option}" option to section "{section}": {e} ' + 'Unable to add "%s" option to section "%s": %s', option, section, e ) def update_option( self, section: str, option: str, - value: typing.Any, + value: Any, ) -> None: """Update an existing option's value in both raw tracking and configparser.""" try: @@ -327,7 +320,11 @@ def update_option( self.update_pending = True except Exception as e: logger.error( - f'Unable to update option "{option}" in section "{section}": {e}' + 'Unable to update option "%s" in section "%s": %s', + option, + section, + e, + exc_info=True, ) def _find_option_line_index(self, section: str, option: str) -> int: @@ -340,103 +337,113 @@ def _find_option_line_index(self, section: str, option: str) -> int: raise configparser.Error(f'Option "{option}" not found in section "{section}"') def save_configuration(self) -> None: - """Save teh configuration to file""" + """Save the configuration to file. + + ``update_pending`` is only cleared on a successful write so that a + caller can detect a failed save and retry. + """ try: - if not self.update_pending: - return with self.file_lock: + if not self.update_pending: + return self.configfile.write_text("\n".join(self.raw_config), encoding="utf-8") - sio = io.StringIO() - sio.writelines(self.raw_config) - self.config.write(sio) - sio.close() + self.update_pending = False except Exception as e: logger.error( - f"ERROR: Unable to save new configuration, something went wrong while saving updated configuration. {e}" + "Unable to save configuration to %s: %s", + self.configfile, + e, + exc_info=True, ) - finally: - self.update_pending = False - def load_config(self): - """Load configuration file""" + def load_config(self) -> None: + """Load configuration file. + + Updates ``raw_config`` in-place so that existing section-view objects + (which share the same list reference) remain valid after a reload. + """ try: + new_raw = self._parse_file() # can raise without corrupting state + self.config.clear() self.raw_config.clear() - self.config.clear() # Reset configparser - self.raw_config, self.raw_dict_config = self._parse_file() + self.raw_config.extend(new_raw) if self.raw_config: self.config.read_file(self.raw_config) except Exception as e: - raise configparser.Error(f"Error loading configuration file: {e}") + raise configparser.Error(f"Error loading configuration file: {e}") from e - def _parse_file(self) -> typing.Tuple[typing.List[str], typing.Dict]: - """Read and normalise the config file into a raw line list and a nested dict. + def _parse_file(self) -> list[str]: + """Read and normalise the config file into a raw line list. - Strips comments, normalises ``=`` to ``:`` separators, deduplicates - sections/options, and ensures the buffer ends with an empty line. + Strips comments, normalises only the **first** ``=``/``:`` separator + to ``: `` (preserving values that contain colons, e.g. URLs or hex + colours), deduplicates sections/options, and ensures the buffer ends + with an empty line. Returns: - A tuple of (raw_lines, dict_representation). + Normalised list of config lines. """ - buffer = [] - dict_buff: typing.Dict = {} - curr_sec: typing.Union[Sentinel, str] = Sentinel.MISSING + buffer: list[str] = [] + seen: dict[str, set[str]] = {} # section → set of seen option names + curr_sec: str | None = None try: - self.file_lock.acquire(blocking=False) - f = self.configfile.read_text(encoding="utf-8") - for line in f.splitlines(): - line = line.strip() - if not line: + with self.file_lock: + text = self.configfile.read_text(encoding="utf-8") + for raw in text.splitlines(): + line = raw.strip() + if not line or line.startswith(("#", ";")): continue - line.expandtabs(indentation_size) - re_match = re.search(r"\s*#\s*(.*)(\s*)", line) - if re_match: - line = line[: re_match.start()] + # Strip inline comments (delimiter must be preceded by a word char, + # so values like `color: #ff0000` are not affected). + m = _RE_INLINE_COMMENT.search(line) + if m: + line = line[: m.start()].rstrip() if not line: continue - # remove leading and trailing white spaces - line = re.sub(r"\s*([:=])\s*", r"\1 ", line) - line = re.sub(r"=", r":", line) - # find the beginning of sections - section_match = re.compile(r"[^\s]*\[([^]]+)\]") - match_sec = re.match(section_match, line) # - if match_sec: - sec_name = re.sub(r"[\[*\]]", r"", line) - if sec_name not in dict_buff.keys(): - if buffer: - buffer.extend( - [""] - ) # REFACTOR: Just add some line separation between sections - dict_buff.update({sec_name: {}}) - curr_sec = sec_name - else: - continue - option_match = re.compile(r"^(?:\w+)([:*])(?:.*)") - match_opt = re.match(option_match, line) - if match_opt: - option_name, value = line.split(":", maxsplit=1) - if option_name not in dict_buff.get(curr_sec, {}).keys(): - if curr_sec in dict_buff.keys(): - section: dict = dict_buff.get(curr_sec, {}) - section.update({option_name: value}) - dict_buff.update({curr_sec: section}) - else: - continue - buffer.append(line) - if buffer[-1] != "": + + # --- Section header --- + # Checked before separator normalisation so that a colon inside + # a section name (e.g. `[a:b]`) is never mangled. + m_sec = _RE_SECTION.match(line) + if m_sec: + sec_name = m_sec.group(1) + if sec_name in seen: + continue # duplicate section — skip + if buffer: + buffer.append("") # blank separator before each new section + seen[sec_name] = set() + curr_sec = sec_name + buffer.append(line) + continue # header fully handled — do not try to parse as option + + # --- Option line --- + # Normalise only the *first* separator so that values containing + # additional ':' or '=' characters (URLs, tokens, regex …) are + # preserved verbatim. + line = _RE_SEP_NORMALIZE.sub(": ", line, count=1) + m_opt = _RE_OPTION.match(line) + if m_opt and curr_sec is not None: + opt_name = m_opt.group(1) + if opt_name in seen[curr_sec]: + continue # duplicate option — skip + seen[curr_sec].add(opt_name) + buffer.append(line) + # Lines matching neither pattern are silently dropped + # (they cannot be round-tripped through configparser anyway). + + if not buffer or buffer[-1] != "": buffer.append("") - return buffer, dict_buff + return buffer except Exception as e: raise configparser.Error( f"Unexpected error while parsing configuration file: {e}" - ) - finally: - self.file_lock.release() + ) from e def get_configparser() -> BlocksScreenConfig: """Loads configuration from file and returns that configuration""" - wanted_target = os.path.join(DEFAULT_CONFIGFILE_PATH, "BlocksScreen.cfg") - fallback = os.path.join(WORKING_DIR, "BlocksScreen.cfg") + wanted_target = DEFAULT_CONFIGFILE_PATH / "BlocksScreen.cfg" + fallback = FALLBACK_CONFIGFILE_PATH / "BlocksScreen.cfg" configfile = ( wanted_target if check_file_on_path(DEFAULT_CONFIGFILE_PATH, "BlocksScreen.cfg") diff --git a/BlocksScreen/lib/panels/widgets/loadWidget.py b/BlocksScreen/lib/panels/widgets/loadWidget.py index 1119bd14..5a54dd89 100644 --- a/BlocksScreen/lib/panels/widgets/loadWidget.py +++ b/BlocksScreen/lib/panels/widgets/loadWidget.py @@ -42,7 +42,9 @@ def __init__( else: try: - loading_config = config.loading + loading_config = config["loading"] + if loading_config is None: + raise KeyError("missing [loading] section") animation_path = loading_config.get( str(initial_anim_type.name), ) diff --git a/tests/util/test_configfile_unit.py b/tests/util/test_configfile_unit.py new file mode 100644 index 00000000..408c5139 --- /dev/null +++ b/tests/util/test_configfile_unit.py @@ -0,0 +1,664 @@ +"""Unit tests for BlocksScreen/configfile.py + +Covers: parsing, read methods, get_section views, add/update/save, get_configparser factory, +comment handling, and thread-safety (RLock reentrancy). +""" + +from __future__ import annotations + +import configparser +import logging +import pathlib +import sys +import textwrap +import threading + +import pytest + +# configfile.py lives in BlocksScreen/ and imports helper_methods from the same dir; +# both resolve when BlocksScreen/ is on sys.path. +_BLOCKSSCREEN = pathlib.Path(__file__).parent.parent.parent / "BlocksScreen" +if str(_BLOCKSSCREEN) not in sys.path: + sys.path.insert(0, str(_BLOCKSSCREEN)) + +# tests/network/conftest.py installs a mock at sys.modules["configfile"] for network +# tests. Popping it here is safe — network modules already bound the mock's symbols +# at their own import time and won't re-import. +sys.modules.pop("configfile", None) + +import configfile as cfmod # noqa: E402 +from configfile import BlocksScreenConfig, ConfigError, Sentinel, get_configparser # noqa: E402 + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_cfg(tmp_path: pathlib.Path, content: str, section: str = "server") -> BlocksScreenConfig: + """Write *content* to a temp file, load it, and return the config object.""" + cfg_file = tmp_path / "test.cfg" + cfg_file.write_text(textwrap.dedent(content).strip(), encoding="utf-8") + cfg = BlocksScreenConfig(cfg_file, section) + cfg.load_config() + return cfg + + +# --------------------------------------------------------------------------- +# Sentinel +# --------------------------------------------------------------------------- + + +@pytest.mark.unit +class TestSentinel: + def test_missing_value_is_an_instance_not_the_class(self): + assert Sentinel.MISSING.value is not object + assert isinstance(Sentinel.MISSING.value, object) + + def test_missing_is_an_enum_member(self): + assert isinstance(Sentinel.MISSING, Sentinel) + + +# --------------------------------------------------------------------------- +# Parsing (_parse_file via load_config) +# --------------------------------------------------------------------------- + + +@pytest.mark.unit +class TestParsing: + def test_basic_section_and_option(self, tmp_path): + cfg = _make_cfg(tmp_path, """ + [server] + host: localhost + """) + assert cfg.has_section("server") + assert cfg.get("host") == "localhost" + + def test_equals_separator_normalised_to_colon(self, tmp_path): + cfg = _make_cfg(tmp_path, """ + [server] + host = localhost + """) + assert cfg.get("host") == "localhost" + + def test_full_line_hash_comment_ignored(self, tmp_path): + cfg = _make_cfg(tmp_path, """ + # this is a comment + [server] + host: localhost + """) + assert cfg.has_section("server") + assert not cfg.has_section("# this is a comment") + + def test_full_line_semicolon_comment_ignored(self, tmp_path): + cfg = _make_cfg(tmp_path, """ + ; this is also a comment + [server] + host: localhost + """) + assert cfg.has_section("server") + assert not cfg.has_section("; this is also a comment") + + def test_inline_hash_comment_stripped(self, tmp_path): + cfg = _make_cfg(tmp_path, """ + [server] + host: localhost # inline comment + """) + assert cfg.get("host") == "localhost" + + def test_inline_semicolon_comment_stripped(self, tmp_path): + cfg = _make_cfg(tmp_path, """ + [server] + host: localhost ; inline comment + """) + assert cfg.get("host") == "localhost" + + def test_hash_in_value_without_leading_space_preserved(self, tmp_path): + """#ff0000 has no space before # so must not be stripped as a comment.""" + cfg = _make_cfg(tmp_path, """ + [server] + color: #ff0000 + """) + assert cfg.get("color") == "#ff0000" + + def test_duplicate_section_uses_first_occurrence(self, tmp_path): + cfg = _make_cfg(tmp_path, """ + [server] + host: first + [server] + host: second + """) + assert cfg.get("host") == "first" + + def test_duplicate_option_uses_first_occurrence(self, tmp_path): + cfg = _make_cfg(tmp_path, """ + [server] + host: first + host: second + """) + assert cfg.get("host") == "first" + + def test_multiple_sections_all_loaded(self, tmp_path): + cfg = _make_cfg(tmp_path, """ + [server] + host: localhost + [display] + width: 800 + """) + assert cfg.has_section("server") + assert cfg.has_section("display") + + def test_raw_config_always_ends_with_empty_line(self, tmp_path): + cfg = _make_cfg(tmp_path, """ + [server] + host: localhost + """) + assert cfg.raw_config[-1] == "" + + def test_load_config_resets_previous_state(self, tmp_path): + cfg_file = tmp_path / "test.cfg" + cfg_file.write_text("[server]\nhost: localhost\n", encoding="utf-8") + cfg = BlocksScreenConfig(cfg_file, "server") + cfg.load_config() + assert cfg.get("host") == "localhost" + + cfg_file.write_text("[server]\nhost: remotehost\n", encoding="utf-8") + cfg.load_config() + assert cfg.get("host") == "remotehost" + + def test_value_with_embedded_colon_preserved(self, tmp_path): + """Values containing ':' after the key separator must not be mangled (e.g. URLs).""" + cfg = _make_cfg(tmp_path, """ + [server] + url: http://localhost:7125 + """) + assert cfg.get("url") == "http://localhost:7125" + + def test_value_with_embedded_equals_preserved(self, tmp_path): + """Values containing '=' after the key separator must not be mangled (e.g. tokens).""" + cfg = _make_cfg(tmp_path, """ + [server] + token: abc=def==ghi + """) + assert cfg.get("token") == "abc=def==ghi" + + def test_section_name_with_colon_preserved(self, tmp_path): + """Section names containing ':' must not be mangled by separator normalisation.""" + cfg = _make_cfg(tmp_path, """ + [fan:heater_fan] + pin: PA8 + """, section="fan:heater_fan") + assert cfg.has_section("fan:heater_fan") + assert cfg.get("pin") == "PA8" + + +# --------------------------------------------------------------------------- +# Read methods (get, getint, getfloat, getboolean, sections, get_options) +# --------------------------------------------------------------------------- + + +@pytest.mark.unit +class TestReadMethods: + @pytest.fixture + def cfg(self, tmp_path): + return _make_cfg(tmp_path, """ + [server] + host: localhost + port: 7125 + ratio: 1.5 + enabled: true + """) + + def test_has_section_present(self, cfg): + assert cfg.has_section("server") is True + + def test_has_section_absent(self, cfg): + assert cfg.has_section("missing") is False + + def test_has_option_present(self, cfg): + assert cfg.has_option("host") is True + + def test_has_option_absent(self, cfg): + assert cfg.has_option("missing") is False + + def test_get_returns_string(self, cfg): + assert cfg.get("host") == "localhost" + + def test_get_returns_default_when_absent(self, cfg): + assert cfg.get("missing", default="fallback") == "fallback" + + def test_get_with_int_parser(self, cfg): + assert cfg.get("port", parser=int) == 7125 + + def test_get_with_bool_parser_false_value(self, tmp_path): + """get(parser=bool) must not treat 'false' as True (bool('false') pitfall).""" + cfg = _make_cfg(tmp_path, """ + [server] + host: localhost + flag: false + """) + assert cfg.get("flag", parser=bool) is False + + def test_get_with_bool_parser_true_value(self, tmp_path): + cfg = _make_cfg(tmp_path, """ + [server] + host: localhost + flag: true + """) + assert cfg.get("flag", parser=bool) is True + + def test_getint(self, cfg): + assert cfg.getint("port") == 7125 + + def test_getint_default_when_absent(self, cfg): + assert cfg.getint("missing", default=42) == 42 + + def test_getfloat(self, cfg): + assert cfg.getfloat("ratio") == pytest.approx(1.5) + + def test_getfloat_default_when_absent(self, cfg): + assert cfg.getfloat("missing", default=0.5) == pytest.approx(0.5) + + def test_getboolean_true(self, cfg): + assert cfg.getboolean("enabled") is True + + def test_getboolean_default_when_absent(self, cfg): + assert cfg.getboolean("missing", default=False) is False + + def test_sections_returns_list_of_names(self, cfg): + assert "server" in cfg.sections() + + def test_get_options_returns_option_names(self, cfg): + opts = cfg.get_options() + assert "host" in opts + assert "port" in opts + + def test_get_default_returned_as_is_not_parsed(self, cfg): + """When the option is absent the default is returned without passing through parser. + Previously parser(default) was called — int('N/A') would raise ValueError.""" + result = cfg.get("missing", parser=int, default="N/A") + assert result == "N/A" + + def test_getint_raises_when_absent_no_default(self, cfg): + """getint() with no default must raise, not silently return Sentinel.MISSING.""" + with pytest.raises((configparser.NoOptionError, configparser.NoSectionError)): + cfg.getint("missing") + + def test_getfloat_raises_when_absent_no_default(self, cfg): + """getfloat() with no default must raise, not silently return Sentinel.MISSING.""" + with pytest.raises((configparser.NoOptionError, configparser.NoSectionError)): + cfg.getfloat("missing") + + def test_getboolean_raises_when_absent_no_default(self, cfg): + """getboolean() with no default must raise, not silently return Sentinel.MISSING.""" + with pytest.raises((configparser.NoOptionError, configparser.NoSectionError)): + cfg.getboolean("missing") + + +# --------------------------------------------------------------------------- +# get_section / __getitem__ / __contains__ +# --------------------------------------------------------------------------- + + +@pytest.mark.unit +class TestGetSection: + @pytest.fixture + def cfg(self, tmp_path): + return _make_cfg(tmp_path, """ + [server] + host: localhost + [display] + width: 800 + """) + + def test_view_reads_actual_values(self, cfg): + view = cfg.get_section("server") + assert view is not None + assert view.get("host") == "localhost" + + def test_view_shares_the_configparser_instance(self, cfg): + view = cfg.get_section("display") + assert view.config is cfg.config + + def test_view_uses_correct_section(self, cfg): + view = cfg.get_section("display") + assert view.section == "display" + assert view.getint("width") == 800 + + def test_returns_none_when_section_missing(self, cfg): + assert cfg.get_section("nonexistent") is None + + def test_returns_custom_fallback_when_section_missing(self, cfg): + sentinel = object() + assert cfg.get_section("nonexistent", fallback=sentinel) is sentinel + + def test_getitem_returns_working_view(self, cfg): + view = cfg["server"] + assert view is not None + assert view.get("host") == "localhost" + + def test_getitem_returns_none_for_missing_section(self, cfg): + assert cfg["nonexistent"] is None + + def test_contains_returns_true_for_existing_section(self, cfg): + assert "server" in cfg + + def test_contains_returns_false_for_missing_section(self, cfg): + assert "nonexistent" not in cfg + + def test_view_raw_config_stays_valid_after_reload(self, tmp_path): + """load_config must update raw_config in-place so existing views remain valid.""" + cfg_file = tmp_path / "test.cfg" + cfg_file.write_text("[server]\nhost: first\n[display]\nwidth: 800\n", encoding="utf-8") + cfg = BlocksScreenConfig(cfg_file, "server") + cfg.load_config() + view = cfg.get_section("display") + assert view is not None + assert view.raw_config is cfg.raw_config # same list object + + # Reload with new content — view must reflect the updated list + cfg_file.write_text("[server]\nhost: second\n[display]\nwidth: 1024\n", encoding="utf-8") + cfg.load_config() + assert view.raw_config is cfg.raw_config # still the same list object + assert any("1024" in line for line in view.raw_config) + + +# --------------------------------------------------------------------------- +# add_section +# --------------------------------------------------------------------------- + + +@pytest.mark.unit +class TestAddSection: + @pytest.fixture + def cfg(self, tmp_path): + return _make_cfg(tmp_path, """ + [server] + host: localhost + """) + + def test_section_becomes_visible_after_add(self, cfg): + cfg.add_section("display") + assert cfg.has_section("display") + + def test_section_header_appears_in_raw_config(self, cfg): + cfg.add_section("display") + assert "[display]" in cfg.raw_config + + def test_sets_update_pending(self, cfg): + cfg.add_section("display") + assert cfg.update_pending is True + + def test_raw_config_ends_with_empty_line_after_add(self, cfg): + cfg.add_section("display") + assert cfg.raw_config[-1] == "" + + def test_duplicate_section_logs_error_not_raises(self, cfg, caplog): + with caplog.at_level(logging.ERROR): + cfg.add_section("server") # already exists + assert "already exists" in caplog.text.lower() + + def test_duplicate_section_does_not_raise(self, cfg): + cfg.add_section("server") # must not propagate an exception + + +# --------------------------------------------------------------------------- +# add_option +# --------------------------------------------------------------------------- + + +@pytest.mark.unit +class TestAddOption: + @pytest.fixture + def cfg(self, tmp_path): + return _make_cfg(tmp_path, """ + [server] + host: localhost + """) + + def test_option_becomes_visible_after_add(self, cfg): + cfg.add_option("server", "port", "7125") + assert cfg.has_option("port") + + def test_added_option_is_readable(self, cfg): + cfg.add_option("server", "port", "7125") + assert cfg.getint("port") == 7125 + + def test_option_line_appears_in_raw_config(self, cfg): + cfg.add_option("server", "port", "7125") + assert any("port: 7125" in line for line in cfg.raw_config) + + def test_sets_update_pending(self, cfg): + cfg.add_option("server", "port", "7125") + assert cfg.update_pending is True + + def test_option_inserted_before_section_separator(self, cfg): + cfg.add_option("server", "port", "7125") + port_idx = next(i for i, l in enumerate(cfg.raw_config) if "port" in l) + assert cfg.raw_config[port_idx + 1] == "" + + def test_none_value_writes_valueless_option_line(self, cfg): + """add_option(value=None) must write 'flag:' not 'flag: None'.""" + cfg.add_option("server", "flag", None) + assert not any("None" in line for line in cfg.raw_config) + assert any(line.strip() == "flag:" for line in cfg.raw_config) + + def test_none_value_option_is_readable(self, cfg): + """Value-less options must be detectable via has_option after add.""" + cfg.add_option("server", "flag", None) + assert cfg.has_option("flag") + + +# --------------------------------------------------------------------------- +# update_option +# --------------------------------------------------------------------------- + + +@pytest.mark.unit +class TestUpdateOption: + @pytest.fixture + def cfg(self, tmp_path): + return _make_cfg(tmp_path, """ + [server] + host: localhost + port: 7125 + """) + + def test_updates_existing_option_value(self, cfg): + cfg.update_option("server", "host", "remotehost") + assert cfg.get("host") == "remotehost" + + def test_updated_value_reflected_in_raw_config(self, cfg): + cfg.update_option("server", "host", "remotehost") + assert any("host: remotehost" in line for line in cfg.raw_config) + + def test_old_value_absent_from_raw_config(self, cfg): + cfg.update_option("server", "host", "remotehost") + assert not any("host: localhost" in line for line in cfg.raw_config) + + def test_sets_update_pending(self, cfg): + cfg.update_option("server", "host", "remotehost") + assert cfg.update_pending is True + + def test_creates_missing_option(self, cfg): + cfg.update_option("server", "timeout", "30") + assert cfg.get("timeout") == "30" + + def test_creates_missing_section_and_option(self, cfg): + cfg.update_option("display", "width", "800") + assert cfg.has_section("display") + view = cfg.get_section("display") + assert view.get("width") == "800" + + def test_reentrancy_does_not_deadlock(self, tmp_path): + """update_option calls add_section/add_option while holding the RLock — must not deadlock.""" + cfg = _make_cfg(tmp_path, "[server]\nhost: localhost\n") + cfg.update_option("brandnew", "key", "value") + assert cfg.has_section("brandnew") + assert cfg.get_section("brandnew").get("key") == "value" + + +# --------------------------------------------------------------------------- +# save_configuration +# --------------------------------------------------------------------------- + + +@pytest.mark.unit +class TestSaveConfiguration: + def test_writes_updated_value_to_file(self, tmp_path): + cfg_file = tmp_path / "test.cfg" + cfg_file.write_text("[server]\nhost: localhost\n", encoding="utf-8") + cfg = BlocksScreenConfig(cfg_file, "server") + cfg.load_config() + cfg.update_option("server", "host", "newhost") + cfg.save_configuration() + assert "host: newhost" in cfg_file.read_text(encoding="utf-8") + + def test_clears_update_pending_after_save(self, tmp_path): + cfg_file = tmp_path / "test.cfg" + cfg_file.write_text("[server]\nhost: localhost\n", encoding="utf-8") + cfg = BlocksScreenConfig(cfg_file, "server") + cfg.load_config() + cfg.update_option("server", "host", "newhost") + cfg.save_configuration() + assert cfg.update_pending is False + + def test_skips_write_when_not_pending(self, tmp_path): + original = "[server]\nhost: localhost\n" + cfg_file = tmp_path / "test.cfg" + cfg_file.write_text(original, encoding="utf-8") + cfg = BlocksScreenConfig(cfg_file, "server") + cfg.load_config() + assert cfg.update_pending is False + cfg.save_configuration() + assert cfg_file.read_text(encoding="utf-8") == original + + def test_pending_preserved_when_write_fails(self, tmp_path): + """A failed write must leave update_pending True so the caller can retry.""" + cfg_file = tmp_path / "test.cfg" + cfg_file.write_text("[server]\nhost: localhost\n", encoding="utf-8") + cfg = BlocksScreenConfig(cfg_file, "server") + cfg.load_config() + cfg.update_pending = True + cfg.configfile = pathlib.Path("/nonexistent_dir/file.cfg") + cfg.save_configuration() + assert cfg.update_pending is True + + +# --------------------------------------------------------------------------- +# get_configparser factory +# --------------------------------------------------------------------------- + + +@pytest.mark.unit +class TestGetConfigparser: + def test_loads_from_fallback_path(self, tmp_path, monkeypatch): + cfg_file = tmp_path / "BlocksScreen.cfg" + cfg_file.write_text("[server]\nhost: localhost\n", encoding="utf-8") + + monkeypatch.setattr(cfmod, "DEFAULT_CONFIGFILE_PATH", tmp_path / "nonexistent") + monkeypatch.setattr(cfmod, "FALLBACK_CONFIGFILE_PATH", tmp_path) + + cfg = get_configparser() + assert cfg.has_section("server") + + def test_raises_config_error_when_server_section_missing(self, tmp_path, monkeypatch): + cfg_file = tmp_path / "BlocksScreen.cfg" + cfg_file.write_text("[display]\nwidth: 800\n", encoding="utf-8") + + monkeypatch.setattr(cfmod, "DEFAULT_CONFIGFILE_PATH", tmp_path / "nonexistent") + monkeypatch.setattr(cfmod, "FALLBACK_CONFIGFILE_PATH", tmp_path) + + with pytest.raises(ConfigError, match=r"\[server\]"): + get_configparser() + + +# --------------------------------------------------------------------------- +# Thread safety +# --------------------------------------------------------------------------- + + +@pytest.mark.unit +class TestThreadSafety: + def test_concurrent_updates_do_not_corrupt_state(self, tmp_path): + cfg_file = tmp_path / "test.cfg" + cfg_file.write_text("[server]\nhost: localhost\n", encoding="utf-8") + cfg = BlocksScreenConfig(cfg_file, "server") + cfg.load_config() + + errors: list[Exception] = [] + + def updater(value: str) -> None: + try: + cfg.update_option("server", "host", value) + except Exception as exc: + errors.append(exc) + + threads = [threading.Thread(target=updater, args=(f"host{i}",)) for i in range(20)] + for t in threads: + t.start() + for t in threads: + t.join() + + assert not errors + assert cfg.has_option("host") + + +# --------------------------------------------------------------------------- +# Bug fixes (regression tests) +# --------------------------------------------------------------------------- + + +@pytest.mark.unit +class TestEdgeCases: + def test_parse_empty_file_does_not_crash(self, tmp_path): + """Empty (or comment-only) file must not raise IndexError.""" + cfg_file = tmp_path / "test.cfg" + cfg_file.write_text("# only a comment\n", encoding="utf-8") + cfg = BlocksScreenConfig(cfg_file, "server") + cfg.load_config() # must not raise + assert cfg.raw_config == [""] + + def test_parse_completely_empty_file_does_not_crash(self, tmp_path): + cfg_file = tmp_path / "test.cfg" + cfg_file.write_text("", encoding="utf-8") + cfg = BlocksScreenConfig(cfg_file, "server") + cfg.load_config() + assert cfg.raw_config == [""] + + def test_get_missing_option_without_default_raises(self, tmp_path): + """get() with no default must raise NoOptionError, not silently return Sentinel.MISSING.""" + cfg = _make_cfg(tmp_path, "[server]\nhost: localhost\n") + with pytest.raises(configparser.NoOptionError): + cfg.get("nonexistent") + + def test_load_config_missing_file_raises(self, tmp_path): + """load_config on a non-existent file must raise configparser.Error, not crash silently.""" + cfg = BlocksScreenConfig(tmp_path / "nonexistent.cfg", "server") + with pytest.raises(configparser.Error): + cfg.load_config() + + def test_save_configuration_toctou_safe(self, tmp_path): + """update_pending check and file write happen atomically under the lock.""" + cfg_file = tmp_path / "test.cfg" + cfg_file.write_text("[server]\nhost: localhost\n", encoding="utf-8") + cfg = BlocksScreenConfig(cfg_file, "server") + cfg.load_config() + cfg.update_option("server", "host", "newhost") + + errors: list[Exception] = [] + + def saver() -> None: + try: + cfg.save_configuration() + except Exception as exc: + errors.append(exc) + + threads = [threading.Thread(target=saver) for _ in range(10)] + for t in threads: + t.start() + for t in threads: + t.join() + + assert not errors + assert "host: newhost" in cfg_file.read_text(encoding="utf-8")