diff --git a/docs/concepts/transforms.rst b/docs/concepts/transforms.rst index 6cba8886c..e2871d40e 100644 --- a/docs/concepts/transforms.rst +++ b/docs/concepts/transforms.rst @@ -105,17 +105,16 @@ about the state of the tasks at given points. Here is an example: .. code-block:: python - from voluptuous import Optional, Required - + from typing import Optional from taskgraph.transforms.base import TransformSequence - from taskgraph.util.schema import LegacySchema + from taskgraph.util.schema import Schema - my_schema = LegacySchema({ - Required("foo"): str, - Optional("bar"): bool, - }) + class MySchema(Schema): + foo: str # Required field + bar: Optional[bool] = None # Optional field - transforms.add_validate(my_schema) + transforms = TransformSequence() + transforms.add_validate(MySchema) In the above example, we can be sure that every task dict has a string field called ``foo``, and may or may not have a boolean field called ``bar``. diff --git a/docs/tutorials/creating-a-task-graph.rst b/docs/tutorials/creating-a-task-graph.rst index 35be52bc6..d7c5c0453 100644 --- a/docs/tutorials/creating-a-task-graph.rst +++ b/docs/tutorials/creating-a-task-graph.rst @@ -136,23 +136,23 @@ comments for explanations): .. code-block:: python - from voluptuous import Optional, Required - + from typing import Optional + from taskgraph.util.schema import Schema from taskgraph.transforms.base import TransformSequence - from taskgraph.util.schema import LegacySchema - # Define the schema. We use the `voluptuous` package to handle validation. - hello_description_schema = LegacySchema({ - Required("text"): str, - Optional("description"): str, - }) + # Define the schema using Schema base class. + class HelloDescriptionSchema(Schema): + text: str # Required field + description: Optional[str] = None # Optional field + + hello_description_struct = HelloDescriptionSchema # Create a 'TransformSequence' instance. This class collects transform # functions to run later. transforms = TransformSequence() # First let's validate tasks against the schema. - transforms.add_validate(hello_description_schema) + transforms.add_validate(hello_description_struct) # Register our first transform functions via decorator. @transforms.add diff --git a/src/taskgraph/config.py b/src/taskgraph/config.py index b9a2b57d2..e02299863 100644 --- a/src/taskgraph/config.py +++ b/src/taskgraph/config.py @@ -2,112 +2,121 @@ # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. - import logging import os import sys from dataclasses import dataclass from pathlib import Path - -from voluptuous import ALLOW_EXTRA, All, Any, Extra, Length, Optional, Required +from typing import Literal, Optional, Union from .util.caches import CACHES from .util.python_path import find_object -from .util.schema import LegacySchema, optionally_keyed_by, validate_schema +from .util.schema import ( + Schema, + TaskPriority, + optionally_keyed_by, + validate_schema, +) from .util.vcs import get_repository from .util.yaml import load_yaml logger = logging.getLogger(__name__) +# CacheName type for valid cache names +CacheName = Literal[tuple(CACHES.keys())] -#: Schema for the graph config -graph_config_schema = LegacySchema( - { - # The trust-domain for this graph. - # (See https://firefox-source-docs.mozilla.org/taskcluster/taskcluster/taskgraph.html#taskgraph-trust-domain) # noqa - Required("trust-domain"): str, - Optional( - "docker-image-kind", - description="Name of the docker image kind (default: docker-image)", - ): str, - Required("task-priority"): optionally_keyed_by( - "project", - "level", - Any( - "highest", - "very-high", - "high", - "medium", - "low", - "very-low", - "lowest", - ), - ), - Optional( - "task-deadline-after", - description="Default 'deadline' for tasks, in relative date format. " - "Eg: '1 week'", - ): optionally_keyed_by("project", str), - Optional( - "task-expires-after", - description="Default 'expires-after' for level 1 tasks, in relative date format. " - "Eg: '90 days'", - ): str, - Required("workers"): { - Required("aliases"): { - str: { - Required("provisioner"): optionally_keyed_by("level", str), - Required("implementation"): str, - Required("os"): str, - Required("worker-type"): optionally_keyed_by("level", str), - } - }, - }, - Required("taskgraph"): { - Optional( - "register", - description="Python function to call to register extensions.", - ): str, - Optional("decision-parameters"): str, - Optional( - "cached-task-prefix", - description="The taskcluster index prefix to use for caching tasks. " - "Defaults to `trust-domain`.", - ): str, - Optional( - "cache-pull-requests", - description="Should tasks from pull requests populate the cache", - ): bool, - Optional( - "index-path-regexes", - description="Regular expressions matching index paths to be summarized.", - ): [str], - Optional( - "run", - description="Configuration related to the 'run' transforms.", - ): { - Optional( - "use-caches", - description="List of caches to enable, or a boolean to " - "enable/disable all of them.", - ): Any(bool, list(CACHES.keys())), - }, - Required("repositories"): All( - { - str: { - Required("name"): str, - Optional("project-regex"): str, - Optional("ssh-secret-name"): str, - # FIXME - Extra: str, - } - }, - Length(min=1), - ), - }, - }, - extra=ALLOW_EXTRA, -) + +class WorkerAliasSchema(Schema): + """Worker alias configuration.""" + + provisioner: optionally_keyed_by("level", str, use_msgspec=True) # type: ignore + implementation: str + os: str + worker_type: optionally_keyed_by("level", str, use_msgspec=True) # type: ignore + + +class WorkersSchema(Schema, rename=None): + """Workers configuration.""" + + aliases: dict[str, WorkerAliasSchema] + + +class Repository(Schema, forbid_unknown_fields=False): + """Repository configuration. + + This schema allows extra fields for repository-specific configuration. + """ + + # Required fields first + name: str + + # Optional fields + project_regex: Optional[str] = None # Maps from "project-regex" + ssh_secret_name: Optional[str] = None # Maps from "ssh-secret-name" + + +class RunConfig(Schema): + """Run transforms configuration.""" + + # List of caches to enable, or a boolean to enable/disable all of them. + use_caches: Optional[Union[bool, list[str]]] = None # Maps from "use-caches" + + def __post_init__(self): + """Validate that cache names are valid.""" + if isinstance(self.use_caches, list): + invalid = set(self.use_caches) - set(CACHES.keys()) + if invalid: + raise ValueError( + f"Invalid cache names: {invalid}. " + f"Valid names are: {list(CACHES.keys())}" + ) + + +class TaskGraphSchema(Schema): + """Taskgraph specific configuration.""" + + # Required fields first + repositories: dict[str, Repository] + + # Optional fields + # Python function to call to register extensions. + register: Optional[str] = None + decision_parameters: Optional[str] = None # Maps from "decision-parameters" + # The taskcluster index prefix to use for caching tasks. Defaults to `trust-domain`. + cached_task_prefix: Optional[str] = None # Maps from "cached-task-prefix" + # Should tasks from pull requests populate the cache + cache_pull_requests: Optional[bool] = None # Maps from "cache-pull-requests" + # Regular expressions matching index paths to be summarized. + index_path_regexes: Optional[list[str]] = None # Maps from "index-path-regexes" + # Configuration related to the 'run' transforms. + run: Optional[RunConfig] = None + + +class GraphConfigSchema(Schema, forbid_unknown_fields=False): + """Main graph configuration schema. + + This schema allows extra fields for flexibility in graph configuration. + """ + + # Required fields first + # The trust-domain for this graph. + # (See https://firefox-source-docs.mozilla.org/taskcluster/taskcluster/taskgraph.html#taskgraph-trust-domain) + trust_domain: str # Maps from "trust-domain" + task_priority: optionally_keyed_by( + "project", "level", TaskPriority, use_msgspec=True + ) # type: ignore + workers: WorkersSchema + taskgraph: TaskGraphSchema + + # Optional fields + # Name of the docker image kind (default: docker-image) + docker_image_kind: Optional[str] = None # Maps from "docker-image-kind" + # Default 'deadline' for tasks, in relative date format. Eg: '1 week' + task_deadline_after: Optional[ + optionally_keyed_by("project", str, use_msgspec=True) # pyright: ignore[reportInvalidTypeForm] + ] = None + # Default 'expires-after' for level 1 tasks, in relative date format. Eg: '90 days' + task_expires_after: Optional[str] = None # Maps from "task-expires-after" @dataclass(frozen=True, eq=False) @@ -178,7 +187,8 @@ def kinds_dir(self): def validate_graph_config(config): - validate_schema(graph_config_schema, config, "Invalid graph configuration:") + """Validate graph configuration using msgspec.""" + validate_schema(GraphConfigSchema, config, "Invalid graph configuration:") def load_graph_config(root_dir): diff --git a/src/taskgraph/decision.py b/src/taskgraph/decision.py index 888ad12fe..d636a6f71 100644 --- a/src/taskgraph/decision.py +++ b/src/taskgraph/decision.py @@ -9,9 +9,9 @@ import shutil import time from pathlib import Path +from typing import Any, Optional import yaml -from voluptuous import Optional from taskgraph.actions import render_actions_json from taskgraph.create import create_tasks @@ -20,7 +20,7 @@ from taskgraph.taskgraph import TaskGraph from taskgraph.util import json from taskgraph.util.python_path import find_object -from taskgraph.util.schema import LegacySchema, validate_schema +from taskgraph.util.schema import Schema, validate_schema from taskgraph.util.vcs import get_repository from taskgraph.util.yaml import load_yaml @@ -40,11 +40,9 @@ #: Schema for try_task_config.json version 2 -try_task_config_schema_v2 = LegacySchema( - { - Optional("parameters"): {str: object}, - } -) +class TryTaskConfigSchemaV2(Schema): + # All fields are optional + parameters: Optional[dict[str, Any]] = None def full_task_graph_to_runnable_tasks(full_task_json): @@ -277,7 +275,7 @@ def set_try_config(parameters, task_config_file): task_config_version = task_config.pop("version") if task_config_version == 2: validate_schema( - try_task_config_schema_v2, + TryTaskConfigSchemaV2, task_config, "Invalid v2 `try_task_config.json`.", ) diff --git a/src/taskgraph/parameters.py b/src/taskgraph/parameters.py index 12f470f15..709429428 100644 --- a/src/taskgraph/parameters.py +++ b/src/taskgraph/parameters.py @@ -10,16 +10,18 @@ from io import BytesIO from pprint import pformat from subprocess import CalledProcessError +from typing import Optional, Union from unittest.mock import Mock from urllib.parse import urlparse from urllib.request import urlopen import mozilla_repo_urls -from voluptuous import ALLOW_EXTRA, Any, Optional, Required, Schema +import msgspec +import voluptuous from taskgraph.util import json, yaml from taskgraph.util.readonlydict import ReadOnlyDict -from taskgraph.util.schema import validate_schema +from taskgraph.util.schema import Schema from taskgraph.util.taskcluster import find_task_id, get_artifact_url from taskgraph.util.vcs import get_repository @@ -28,44 +30,54 @@ class ParameterMismatch(Exception): """Raised when a parameters.yml has extra or missing parameters.""" +class CodeReviewSchema(Schema): + """Code review configuration.""" + + # Required field + phabricator_build_target: str + + #: Schema for base parameters. #: Please keep this list sorted and in sync with docs/reference/parameters.rst -base_schema = Schema( - { - Required("base_repository"): str, - Optional("base_ref"): str, - Required("base_rev"): str, - Required("build_date"): int, - Required("build_number"): int, - Required("do_not_optimize"): [str], - Required("enable_always_target"): Any(bool, [str]), - Required("existing_tasks"): {str: str}, - Required("files_changed"): [str], - Required("filters"): [str], - Required("head_ref"): str, - Required("head_repository"): str, - Required("head_rev"): str, - Required("head_tag"): str, - Required("level"): str, - Required("moz_build_date"): str, - Required("next_version"): Any(str, None), - Required("optimize_strategies"): Any(str, None), - Required("optimize_target_tasks"): bool, - Required("owner"): str, - Required("project"): str, - Required("pushdate"): int, - Required("pushlog_id"): str, - Required("repository_type"): str, - # target-kinds is not included, since it should never be - # used at run-time - Required("target_tasks_method"): str, - Required("tasks_for"): str, - Required("version"): Any(str, None), - Optional("code-review"): { - Required("phabricator-build-target"): str, - }, - } -) +class BaseSchema(Schema): + """Base parameters schema. + + This defines the core parameters that all taskgraph runs require. + """ + + # Required fields (most are required) + base_repository: str + base_rev: str + build_date: int + build_number: int + do_not_optimize: list[str] + enable_always_target: Union[bool, list[str]] + existing_tasks: dict[str, str] + files_changed: list[str] + filters: list[str] + head_ref: str + head_repository: str + head_rev: str + head_tag: str + level: str + moz_build_date: str + optimize_target_tasks: bool + owner: str + project: str + pushdate: int + pushlog_id: str + repository_type: str + # target-kinds is not included, since it should never be + # used at run-time + target_tasks_method: str + tasks_for: str + + # Optional fields + next_version: Optional[str] = None + optimize_strategies: Optional[str] = None + version: Optional[str] = None + code_review: Optional[CodeReviewSchema] = None + base_ref: Optional[str] = None def get_contents(path): @@ -135,6 +147,10 @@ def _get_defaults(repo_root=None): defaults_functions = [_get_defaults] +# Keep track of schema extensions separately +_schema_extensions = [] + + def extend_parameters_schema(schema, defaults_fn=None): """ Extend the schema for parameters to include per-project configuration. @@ -143,15 +159,21 @@ def extend_parameters_schema(schema, defaults_fn=None): graph-configuration. Args: - schema (Schema): The voluptuous.Schema object used to describe extended + schema: The schema object (msgspec) used to describe extended parameters. defaults_fn (function): A function which takes no arguments and returns a dict mapping parameter name to default value in the event strict=False (optional). """ - global base_schema + global BaseSchema global defaults_functions - base_schema = base_schema.extend(schema) + + # Store the extension schema for use during validation + _schema_extensions.append(schema) + + # With msgspec, schema extensions are tracked in the _schema_extensions list + # for validation purposes rather than being merged into a single schema + if defaults_fn: defaults_functions.append(defaults_fn) @@ -214,12 +236,56 @@ def _fill_defaults(repo_root=None, **kwargs): return kwargs def check(self): - schema = ( - base_schema if self.strict else base_schema.extend({}, extra=ALLOW_EXTRA) - ) + # Validate parameters using msgspec schema + # In strict mode: validate required fields and reject extra fields + # In non-strict mode: only validate required fields and allow extra fields try: - validate_schema(schema, self.copy(), "Invalid parameters:") - except Exception as e: + # Convert underscore keys to kebab-case since BaseSchema uses rename="kebab" + kebab_params = {k.replace("_", "-"): v for k, v in self.items()} + + # Collect all valid fields from BaseSchema and schema extensions + required_fields = set() + valid_fields = set() + + # Add BaseSchema fields + for f in msgspec.structs.fields(BaseSchema): + valid_fields.add(f.encode_name) + if f.required: + required_fields.add(f.encode_name) + + # Add fields from schema extensions + for schema in _schema_extensions: + if isinstance(schema, type) and issubclass(schema, Schema): + for f in msgspec.structs.fields(schema): + valid_fields.add(f.encode_name) + else: + schema_dict = schema if isinstance(schema, dict) else schema.schema + for key in schema_dict: + field_name = ( + key.schema + if isinstance( + key, (voluptuous.Required, voluptuous.Optional) + ) + else key + ) + # Convert to kebab-case to match how parameters are validated + kebab_field_name = field_name.replace("_", "-") + valid_fields.add(kebab_field_name) + + missing_fields = required_fields - set(kebab_params.keys()) + if missing_fields: + raise ParameterMismatch( + f"Invalid parameters: Missing required fields: {missing_fields}" + ) + + # In strict mode, reject extra fields + if self.strict: + extra_fields = set(kebab_params.keys()) - valid_fields + if extra_fields: + raise ParameterMismatch( + f"Invalid parameters: Extra fields not allowed: {extra_fields}" + ) + except (msgspec.ValidationError, msgspec.DecodeError) as e: raise ParameterMismatch(str(e)) def __getitem__(self, k): diff --git a/src/taskgraph/transforms/chunking.py b/src/taskgraph/transforms/chunking.py index 59818337b..83f740549 100644 --- a/src/taskgraph/transforms/chunking.py +++ b/src/taskgraph/transforms/chunking.py @@ -2,52 +2,35 @@ # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. import copy -from textwrap import dedent - -from voluptuous import ALLOW_EXTRA, Optional, Required +from typing import Optional from taskgraph.transforms.base import TransformSequence -from taskgraph.util.schema import LegacySchema +from taskgraph.util.schema import Schema from taskgraph.util.templates import substitute + +class ChunkSchema(Schema): + """ + `chunk` can be used to split one task into `total-chunks` + tasks, substituting `this_chunk` and `total_chunks` into any + fields in `substitution-fields`. + """ + + # The total number of chunks to split the task into. + total_chunks: int + # A list of fields that need to have `{this_chunk}` and/or + # `{total_chunks}` replaced in them. + substitution_fields: Optional[list[str]] = None + + #: Schema for chunking transforms -CHUNK_SCHEMA = LegacySchema( - { - # Optional, so it can be used for a subset of tasks in a kind - Optional( - "chunk", - description=dedent( - """ - `chunk` can be used to split one task into `total-chunks` - tasks, substituting `this_chunk` and `total_chunks` into any - fields in `substitution-fields`. - """.lstrip() - ), - ): { - Required( - "total-chunks", - description=dedent( - """ - The total number of chunks to split the task into. - """.lstrip() - ), - ): int, - Optional( - "substitution-fields", - description=dedent( - """ - A list of fields that need to have `{this_chunk}` and/or - `{total_chunks}` replaced in them. - """.lstrip() - ), - ): [str], - } - }, - extra=ALLOW_EXTRA, -) +class ChunksSchema(Schema, forbid_unknown_fields=False): + # Optional, so it can be used for a subset of tasks in a kind + chunk: Optional[ChunkSchema] = None + transforms = TransformSequence() -transforms.add_validate(CHUNK_SCHEMA) +transforms.add_validate(ChunksSchema) @transforms.add diff --git a/src/taskgraph/transforms/docker_image.py b/src/taskgraph/transforms/docker_image.py index a9e76abfc..fb64e1aa4 100644 --- a/src/taskgraph/transforms/docker_image.py +++ b/src/taskgraph/transforms/docker_image.py @@ -5,17 +5,14 @@ import logging import os import re -from textwrap import dedent - -from voluptuous import Optional, Required +from typing import Optional import taskgraph from taskgraph.transforms.base import TransformSequence +from taskgraph.transforms.task import TaskDescriptionIndexSchema from taskgraph.util import json from taskgraph.util.docker import create_context_tar, generate_context_hash -from taskgraph.util.schema import LegacySchema - -from .task import task_description_schema +from taskgraph.util.schema import Schema logger = logging.getLogger(__name__) @@ -31,87 +28,34 @@ transforms = TransformSequence() -#: Schema for docker_image transforms -docker_image_schema = LegacySchema( - { - Required( - "name", - description=dedent( - """ - Name of the docker image. - """ - ).lstrip(), - ): str, - Optional( - "parent", - description=dedent( - """ - Name of the parent docker image. - """ - ).lstrip(), - ): str, - Optional( - "symbol", - description=dedent( - """ - Treeherder symbol. - """ - ).lstrip(), - ): str, - Optional( - "task-from", - description=dedent( - """ - Relative path (from config.path) to the file the docker image was defined in. - """ - ).lstrip(), - ): str, - Optional( - "args", - description=dedent( - """ - Arguments to use for the Dockerfile. - """ - ).lstrip(), - ): {str: str}, - Optional( - "definition", - description=dedent( - """ - Name of the docker image definition under taskcluster/docker, when - different from the docker image name. - """ - ).lstrip(), - ): str, - Optional( - "packages", - description=dedent( - """ - List of package tasks this docker image depends on. - """ - ).lstrip(), - ): [str], - Optional( - "index", - description=dedent( - """ - Information for indexing this build so its artifacts can be discovered. - """ - ).lstrip(), - ): task_description_schema["index"], - Optional( - "cache", - description=dedent( - """ - Whether this image should be cached based on inputs. - """ - ).lstrip(), - ): bool, - } -) - -transforms.add_validate(docker_image_schema) +#: Schema for docker_image transforms +class DockerImageSchema(Schema): + # Required field first + # Name of the docker image. + name: str + + # Optional fields + # Name of the parent docker image. + parent: Optional[str] = None + # Treeherder symbol. + symbol: Optional[str] = None + # Relative path (from config.path) to the file the docker image was defined in. + task_from: Optional[str] = None + # Arguments to use for the Dockerfile. + args: Optional[dict[str, str]] = None + # Name of the docker image definition under taskcluster/docker, when + # different from the docker image name. + definition: Optional[str] = None + # List of package tasks this docker image depends on. + packages: Optional[list[str]] = None + # Information for indexing this build so its artifacts can be discovered. + index: Optional[TaskDescriptionIndexSchema] = None + # Whether this image should be cached based on inputs. + cache: Optional[bool] = None + + +transforms.add_validate(DockerImageSchema) @transforms.add @@ -126,7 +70,8 @@ def fill_template(config, tasks): context_hashes = {} if not taskgraph.fast and config.write_artifacts: - os.makedirs(CONTEXTS_DIR, exist_ok=True) + if not os.path.isdir(CONTEXTS_DIR): + os.makedirs(CONTEXTS_DIR) for task in tasks: image_name = task.pop("name") diff --git a/src/taskgraph/transforms/fetch.py b/src/taskgraph/transforms/fetch.py index e165eec31..ff67475eb 100644 --- a/src/taskgraph/transforms/fetch.py +++ b/src/taskgraph/transforms/fetch.py @@ -9,77 +9,59 @@ import os import re from dataclasses import dataclass -from textwrap import dedent -from typing import Callable +from typing import Any, Callable, Literal, Optional, Union -from voluptuous import Extra, Optional, Required +import msgspec import taskgraph from ..util import path from ..util.cached_tasks import add_optimization -from ..util.schema import LegacySchema, validate_schema +from ..util.schema import Schema, Schema from ..util.treeherder import join_symbol from .base import TransformSequence CACHE_TYPE = "content.v1" -#: Schema for fetch transforms -FETCH_SCHEMA = LegacySchema( - { - Required( - "name", - description=dedent( - """ - Name of the task. - """.lstrip() - ), - ): str, - Optional( - "task-from", - description=dedent( - """ - Relative path (from config.path) to the file the task was defined - in. - """.lstrip() - ), - ): str, - Required( - "description", - description=dedent( - """ - Description of the task. - """.lstrip() - ), - ): str, - Optional("expires-after"): str, - Optional("docker-image"): object, - Optional( - "fetch-alias", - description=dedent( - """ - An alias that can be used instead of the real fetch task name in - fetch stanzas for tasks. - """.lstrip() - ), - ): str, - Optional( - "artifact-prefix", - description=dedent( - """ - The prefix of the taskcluster artifact being uploaded. - Defaults to `public/`; if it starts with something other than - `public/` the artifact will require scopes to access. - """.lstrip() - ), - ): str, - Optional("attributes"): {str: object}, - Required("fetch"): { - Required("type"): str, - Extra: object, - }, - } -) + +# Base class for fetch configurations - ensures type field exists +class BaseFetchSchema(Schema, forbid_unknown_fields=False): + """Base schema for fetch configurations. + + This allows any additional fields beyond 'type' to support + different fetch types (static-url, git, etc). + """ + + type: str + + +class FetchSchema(Schema): + # Required fields + # Name of the task. + name: str + # Description of the task. + description: str + # Fetch configuration - validated as BaseFetchSchema which ensures 'type' exists + # Additional type-specific validation is done by the fetch_builder decorator + fetch: BaseFetchSchema + + # Optional fields + # Relative path (from config.path) to the file the task was defined in. + task_from: Optional[str] = None + # When the task expires. + expires_after: Optional[str] = None + # Docker image configuration. + docker_image: Optional[Any] = None + # An alias that can be used instead of the real fetch task name in + # fetch stanzas for tasks. + fetch_alias: Optional[str] = None + # The prefix of the taskcluster artifact being uploaded. + # Defaults to `public/`; if it starts with something other than + # `public/` the artifact will require scopes to access. + artifact_prefix: Optional[str] = None + # Task attributes. + attributes: Optional[dict[str, Any]] = None + # define a collection of payload builders, depending on the worker implementation fetch_builders = {} @@ -87,13 +69,12 @@ @dataclass(frozen=True) class FetchBuilder: - schema: LegacySchema + schema: Union[Schema, Callable] builder: Callable def fetch_builder(name, schema): - schema = LegacySchema({Required("type"): name}).extend(schema) - + # schema should be a msgspec.Schema type def wrap(func): fetch_builders[name] = FetchBuilder(schema, func) # type: ignore return func @@ -102,7 +83,7 @@ def wrap(func): transforms = TransformSequence() -transforms.add_validate(FETCH_SCHEMA) +transforms.add_validate(FetchSchema) @transforms.add @@ -115,7 +96,11 @@ def process_fetch_task(config, tasks): if typ not in fetch_builders: raise Exception(f"Unknown fetch type {typ} in fetch {name}") - validate_schema(fetch_builders[typ].schema, fetch, f"In task.fetch {name!r}:") + # Validate fetch config using msgspec + try: + msgspec.convert(fetch, fetch_builders[typ].schema) + except msgspec.ValidationError as e: + raise Exception(f"In task.fetch {name!r}: {e}") task.update(configure_fetch(config, typ, name, fetch)) @@ -125,7 +110,11 @@ def process_fetch_task(config, tasks): def configure_fetch(config, typ, name, fetch): if typ not in fetch_builders: raise Exception(f"No fetch type {typ} in fetch {name}") - validate_schema(fetch_builders[typ].schema, fetch, f"In task.fetch {name!r}:") + # Validate fetch config using msgspec + try: + msgspec.convert(fetch, fetch_builders[typ].schema) + except msgspec.ValidationError as e: + raise Exception(f"In task.fetch {name!r}: {e}") return fetch_builders[typ].builder(config, name, fetch) @@ -204,45 +193,46 @@ def make_task(config, tasks): yield task_desc -@fetch_builder( - "static-url", - schema={ - # The URL to download. - Required("url"): str, - # The SHA-256 of the downloaded content. - Required("sha256"): str, - # Size of the downloaded entity, in bytes. - Required("size"): int, - # GPG signature verification. - Optional("gpg-signature"): { - # URL where GPG signature document can be obtained. Can contain the - # value ``{url}``, which will be substituted with the value from - # ``url``. - Required("sig-url"): str, - # Path to file containing GPG public key(s) used to validate - # download. - Required("key-path"): str, - }, - # The name to give to the generated artifact. Defaults to the file - # portion of the URL. Using a different extension converts the - # archive to the given type. Only conversion to .tar.zst is - # supported. - Optional("artifact-name"): str, - # Strip the given number of path components at the beginning of - # each file entry in the archive. - # Requires an artifact-name ending with .tar.zst. - Optional("strip-components"): int, - # Add the given prefix to each file entry in the archive. - # Requires an artifact-name ending with .tar.zst. - Optional("add-prefix"): str, - # Headers to pass alongside the request. - Optional("headers"): { - str: str, - }, - # IMPORTANT: when adding anything that changes the behavior of the task, - # it is important to update the digest data used to compute cache hits. - }, -) +class GPGSignatureSchema(Schema): + """GPG signature verification configuration.""" + + # URL where GPG signature document can be obtained. Can contain the + # value ``{url}``, which will be substituted with the value from ``url``. + sig_url: str + # Path to file containing GPG public key(s) used to validate download. + key_path: str + + +class StaticUrlFetchSchema(Schema): + """Configuration for static-url fetch type.""" + + type: Literal["static-url"] + # The URL to download. + url: str + # The SHA-256 of the downloaded content. + sha256: str + # Size of the downloaded entity, in bytes. + size: int + # GPG signature verification. + gpg_signature: Optional[GPGSignatureSchema] = None + # The name to give to the generated artifact. Defaults to the file + # portion of the URL. Using a different extension converts the + # archive to the given type. Only conversion to .tar.zst is supported. + artifact_name: Optional[str] = None + # Strip the given number of path components at the beginning of + # each file entry in the archive. + # Requires an artifact-name ending with .tar.zst. + strip_components: Optional[int] = None + # Add the given prefix to each file entry in the archive. + # Requires an artifact-name ending with .tar.zst. + add_prefix: Optional[str] = None + # Headers to pass alongside the request. + headers: Optional[dict[str, str]] = None + # IMPORTANT: when adding anything that changes the behavior of the task, + # it is important to update the digest data used to compute cache hits. + + +@fetch_builder("static-url", StaticUrlFetchSchema) def create_fetch_url_task(config, name, fetch): artifact_name = fetch.get("artifact-name") if not artifact_name: @@ -305,21 +295,23 @@ def create_fetch_url_task(config, name, fetch): } -@fetch_builder( - "git", - schema={ - Required("repo"): str, - Required("revision"): str, - Optional("include-dot-git"): bool, - Optional("artifact-name"): str, - Optional("path-prefix"): str, - # ssh-key is a taskcluster secret path (e.g. project/civet/github-deploy-key) - # In the secret dictionary, the key should be specified as - # "ssh_privkey": "-----BEGIN OPENSSH PRIVATE KEY-----\nkfksnb3jc..." - # n.b. The OpenSSH private key file format requires a newline at the end of the file. - Optional("ssh-key"): str, - }, -) +class GitFetchSchema(Schema): + """Configuration for git fetch type.""" + + type: Literal["git"] + repo: str + revision: str + include_dot_git: Optional[bool] = None + artifact_name: Optional[str] = None + path_prefix: Optional[str] = None + # ssh-key is a taskcluster secret path (e.g. project/civet/github-deploy-key) + # In the secret dictionary, the key should be specified as + # "ssh_privkey": "-----BEGIN OPENSSH PRIVATE KEY-----\nkfksnb3jc..." + # n.b. The OpenSSH private key file format requires a newline at the end of the file. + ssh_key: Optional[str] = None + + +@fetch_builder("git", GitFetchSchema) def create_git_fetch_task(config, name, fetch): path_prefix = fetch.get("path-prefix") if not path_prefix: diff --git a/src/taskgraph/transforms/from_deps.py b/src/taskgraph/transforms/from_deps.py index c03148c99..94dbe7ce1 100644 --- a/src/taskgraph/transforms/from_deps.py +++ b/src/taskgraph/transforms/from_deps.py @@ -13,108 +13,75 @@ from copy import deepcopy from textwrap import dedent - -from voluptuous import ALLOW_EXTRA, Any, Optional, Required +from typing import Any, Literal, Optional, Union from taskgraph.transforms.base import TransformSequence -from taskgraph.transforms.run import fetches_schema +from taskgraph.transforms.run import FetchesSchema from taskgraph.util.attributes import attrmatch from taskgraph.util.dependencies import GROUP_BY_MAP, get_dependencies -from taskgraph.util.schema import LegacySchema, validate_schema +from taskgraph.util.schema import Schema, validate_schema from taskgraph.util.set_name import SET_NAME_MAP -#: Schema for from_deps transforms -FROM_DEPS_SCHEMA = LegacySchema( - { - Required("from-deps"): { - Optional( - "kinds", - description=dedent( - """ - Limit dependencies to specified kinds (defaults to all kinds in - `kind-dependencies`). +SetNameType = Literal["strip-kind", "retain-kind"] +GroupByType = Literal[ + "single", + "all", + "attribute", + "single-with-filters", + "platform", + "single-locale", + "chunk-locales", + "partner-repack-ids", + "component", + "build-type", +] + + +class FromDepsChildSchema(Schema): + # Optional fields + # Limit dependencies to specified kinds (defaults to all kinds in + # `kind-dependencies`). + # + # The first kind in the list is the "primary" kind. The + # dependency of this kind will be used to derive the label + # and copy attributes (if `copy-attributes` is True). + kinds: Optional[list[str]] = None + # Set the task name using the specified function. Can be False to + # disable name setting, or a string/dict specifying the function to use. + set_name: Optional[Union[SetNameType, bool, dict[SetNameType, Any]]] = None + # Limit dependencies to tasks whose attributes match + # using :func:`~taskgraph.util.attributes.attrmatch`. + with_attributes: Optional[dict[str, Union[list[Any], str]]] = None + # Group cross-kind dependencies using the given group-by + # function. One task will be created for each group. If not + # specified, the 'single' function will be used which creates + # a new task for each individual dependency. + group_by: Optional[Union[GroupByType, dict[GroupByType, Any]]] = None + # If True, copy attributes from the dependency matching the + # first kind in the `kinds` list (whether specified explicitly + # or taken from `kind-dependencies`). + copy_attributes: Optional[bool] = None + # If true (the default), there must be only a single unique task + # for each kind in a dependency group. Setting this to false + # disables that requirement. + unique_kinds: Optional[bool] = None + # If present, a `fetches` entry will be added for each task + # dependency. Attributes of the upstream task may be used as + # substitution values in the `artifact` or `dest` values of the + # `fetches` entry. + # Keys are task kind names, values are lists of FetchesSchema objects. + fetches: Optional[dict[str, list[FetchesSchema]]] = None + + +# Schema for from_deps transforms +class FromDepsSchema(Schema, forbid_unknown_fields=False): + """Schema for from_deps transforms.""" + + from_deps: FromDepsChildSchema - The first kind in the list is the "primary" kind. The - dependency of this kind will be used to derive the label - and copy attributes (if `copy-attributes` is True). - """.lstrip() - ), - ): [str], - Optional( - "set-name", - description=dedent( - """ - UPDATE ME AND DOCS - """.lstrip() - ), - ): Any( - None, - False, - *SET_NAME_MAP, - {Any(*SET_NAME_MAP): object}, - ), - Optional( - "with-attributes", - description=dedent( - """ - Limit dependencies to tasks whose attributes match - using :func:`~taskgraph.util.attributes.attrmatch`. - """.lstrip() - ), - ): {str: Any(list, str)}, - Optional( - "group-by", - description=dedent( - """ - Group cross-kind dependencies using the given group-by - function. One task will be created for each group. If not - specified, the 'single' function will be used which creates - a new task for each individual dependency. - """.lstrip() - ), - ): Any( - None, - *GROUP_BY_MAP, - {Any(*GROUP_BY_MAP): object}, - ), - Optional( - "copy-attributes", - description=dedent( - """ - If True, copy attributes from the dependency matching the - first kind in the `kinds` list (whether specified explicitly - or taken from `kind-dependencies`). - """.lstrip() - ), - ): bool, - Optional( - "unique-kinds", - description=dedent( - """ - If true (the default), there must be only a single unique task - for each kind in a dependency group. Setting this to false - disables that requirement. - """.lstrip() - ), - ): bool, - Optional( - "fetches", - description=dedent( - """ - If present, a `fetches` entry will be added for each task - dependency. Attributes of the upstream task may be used as - substitution values in the `artifact` or `dest` values of the - `fetches` entry. - """.lstrip() - ), - ): {str: [fetches_schema]}, - }, - }, - extra=ALLOW_EXTRA, -) transforms = TransformSequence() -transforms.add_validate(FROM_DEPS_SCHEMA) +transforms.add_validate(FromDepsSchema) @transforms.add diff --git a/src/taskgraph/transforms/matrix.py b/src/taskgraph/transforms/matrix.py index 855bffa41..d928045d4 100644 --- a/src/taskgraph/transforms/matrix.py +++ b/src/taskgraph/transforms/matrix.py @@ -8,62 +8,50 @@ """ from copy import deepcopy -from textwrap import dedent - -from voluptuous import ALLOW_EXTRA, Extra, Optional, Required +from typing import Optional from taskgraph.transforms.base import TransformSequence -from taskgraph.util.schema import LegacySchema +from taskgraph.util.schema import Schema from taskgraph.util.templates import substitute_task_fields + +class MatrixChildSchema(Schema, forbid_unknown_fields=False): + """ + Matrix configuration for generating multiple tasks. + """ + + # Exclude the specified combination(s) of matrix values from the + # final list of tasks. + # + # If only a subset of the possible rows are present in the + # exclusion rule, then *all* combinations including that subset + # subset will be excluded. + exclude: Optional[list[dict[str, str]]] = None + # Sets the task name to the specified format string. + # + # Useful for cases where the default of joining matrix values by + # a dash is not desired. + set_name: Optional[str] = None + # List of fields in the task definition to substitute matrix values into. + # + # If not specified, all fields in the task definition will be + # substituted. + substitution_fields: Optional[list[str]] = None + + #: Schema for matrix transforms -MATRIX_SCHEMA = LegacySchema( - { - Required("name"): str, - Optional("matrix"): { - Optional( - "exclude", - description=dedent( - """ - Exclude the specified combination(s) of matrix values from the - final list of tasks. - - If only a subset of the possible rows are present in the - exclusion rule, then *all* combinations including that subset - subset will be excluded. - """.lstrip() - ), - ): [{str: str}], - Optional( - "set-name", - description=dedent( - """ - Sets the task name to the specified format string. - - Useful for cases where the default of joining matrix values by - a dash is not desired. - """.lstrip() - ), - ): str, - Optional( - "substitution-fields", - description=dedent( - """ - List of fields in the task definition to substitute matrix values into. - - If not specified, all fields in the task definition will be - substituted. - """ - ), - ): [str], - Extra: [str], - }, - }, - extra=ALLOW_EXTRA, -) +class MatrixSchema(Schema, forbid_unknown_fields=False): + """Schema for matrix transforms. + + This schema allows extra fields to be passed through to the task. + """ + + name: str + matrix: Optional[MatrixChildSchema] = None + transforms = TransformSequence() -transforms.add_validate(MATRIX_SCHEMA) +transforms.add_validate(MatrixSchema) def _resolve_matrix(tasks, key, values, exclude): diff --git a/src/taskgraph/transforms/notify.py b/src/taskgraph/transforms/notify.py index a7d118f10..3b486996e 100644 --- a/src/taskgraph/transforms/notify.py +++ b/src/taskgraph/transforms/notify.py @@ -8,12 +8,14 @@ more information. """ -from voluptuous import ALLOW_EXTRA, Any, Exclusive, Optional, Required +from typing import Any, Literal, Optional, Union + +import msgspec from taskgraph.transforms.base import TransformSequence -from taskgraph.util.schema import LegacySchema, optionally_keyed_by, resolve_keyed_by +from taskgraph.util.schema import Schema, optionally_keyed_by, resolve_keyed_by -_status_type = Any( +StatusType = Literal[ "on-completed", "on-defined", "on-exception", @@ -21,29 +23,42 @@ "on-pending", "on-resolved", "on-running", -) - -_recipients = [ - { - Required("type"): "email", - Required("address"): optionally_keyed_by("project", "level", str), - Optional("status-type"): _status_type, - }, - { - Required("type"): "matrix-room", - Required("room-id"): str, - Optional("status-type"): _status_type, - }, - { - Required("type"): "pulse", - Required("routing-key"): str, - Optional("status-type"): _status_type, - }, - { - Required("type"): "slack-channel", - Required("channel-id"): str, - Optional("status-type"): _status_type, - }, +] + + +class EmailRecipientSchema(Schema, tag_field="type", tag="email"): + """Email notification recipient.""" + + address: optionally_keyed_by("project", "level", str, use_msgspec=True) # type: ignore + status_type: Optional[StatusType] = None + + +class MatrixRoomRecipientSchema(Schema, tag_field="type", tag="matrix-room"): + """Matrix room notification recipient.""" + + room_id: str + status_type: Optional[StatusType] = None + + +class PulseRecipientSchema(Schema, tag_field="type", tag="pulse"): + """Pulse notification recipient.""" + + routing_key: str + status_type: Optional[StatusType] = None + + +class SlackChannelRecipientSchema(Schema, tag_field="type", tag="slack-channel"): + """Slack channel notification recipient.""" + + channel_id: str + status_type: Optional[StatusType] = None + + +Recipient = Union[ + EmailRecipientSchema, + MatrixRoomRecipientSchema, + PulseRecipientSchema, + SlackChannelRecipientSchema, ] _route_keys = { @@ -54,46 +69,92 @@ } """Map each type to its primary key that will be used in the route.""" + +class EmailLinkSchema(Schema, rename=None, omit_defaults=False): + """Email link configuration.""" + + text: str + href: str + + +class EmailContentSchema(Schema, rename=None): + """Email notification content.""" + + subject: Optional[str] = None + content: Optional[str] = None + link: Optional[EmailLinkSchema] = None + + +class MatrixContentSchema(Schema): + """Matrix notification content.""" + + body: Optional[str] = None + formatted_body: Optional[str] = None + format: Optional[str] = None + msg_type: Optional[str] = None + + +class SlackContentSchema(Schema, rename=None): + """Slack notification content.""" + + text: Optional[str] = None + blocks: Optional[list[Any]] = None + attachments: Optional[list[Any]] = None + + +class NotifyContentSchema(Schema, rename=None): + """Notification content configuration.""" + + email: Optional[EmailContentSchema] = None + matrix: Optional[MatrixContentSchema] = None + slack: Optional[SlackContentSchema] = None + + +RecipientSchema = Union[ + EmailRecipientSchema, + MatrixRoomRecipientSchema, + PulseRecipientSchema, + SlackChannelRecipientSchema, +] + + +class NotifyConfigSchema(Schema, rename=None): + """Modern notification configuration.""" + + recipients: list[RecipientSchema] + content: Optional[NotifyContentSchema] = None + + +class LegacyNotificationsConfigSchema(Schema, rename="kebab"): + """Legacy notification configuration for backwards compatibility.""" + + emails: Union[list[str], dict[str, Any]] # Can be keyed-by + subject: str + message: Optional[str] = None + status_types: Optional[list[StatusType]] = None + + #: Schema for notify transforms -NOTIFY_SCHEMA = LegacySchema( - { - Exclusive("notify", "config"): { - Required("recipients"): [Any(*_recipients)], - Optional("content"): { - Optional("email"): { - Optional("subject"): str, - Optional("content"): str, - Optional("link"): { - Required("text"): str, - Required("href"): str, - }, - }, - Optional("matrix"): { - Optional("body"): str, - Optional("formatted-body"): str, - Optional("format"): str, - Optional("msg-type"): str, - }, - Optional("slack"): { - Optional("text"): str, - Optional("blocks"): list, - Optional("attachments"): list, - }, - }, - }, - # Continue supporting the legacy schema for backwards compat. - Exclusive("notifications", "config"): { - Required("emails"): optionally_keyed_by("project", "level", [str]), - Required("subject"): str, - Optional("message"): str, - Optional("status-types"): [_status_type], - }, - }, - extra=ALLOW_EXTRA, -) +class NotifySchema(Schema, tag_field="notify_type", forbid_unknown_fields=False): + """Schema for notify transforms. + + Note: This schema allows either 'notify' or 'notifications' field, + but not both. The validation will be done in __post_init__. + """ + + notify: Optional[NotifyConfigSchema] = None + notifications: Optional[LegacyNotificationsConfigSchema] = None + + def __post_init__(self): + # Ensure only one of notify or notifications is present + if self.notify and self.notifications: + raise msgspec.ValidationError( + "Cannot specify both 'notify' and 'notifications'" + ) + transforms = TransformSequence() -transforms.add_validate(NOTIFY_SCHEMA) +transforms.add_validate(NotifySchema) def _convert_legacy(config, legacy, label): diff --git a/src/taskgraph/transforms/run/__init__.py b/src/taskgraph/transforms/run/__init__.py index ed3d7bf02..7afbf06f1 100644 --- a/src/taskgraph/transforms/run/__init__.py +++ b/src/taskgraph/transforms/run/__init__.py @@ -11,162 +11,134 @@ import copy import logging -from textwrap import dedent +from typing import Literal, Optional, Union -from voluptuous import Exclusive, Extra, Optional, Required +import msgspec from taskgraph.transforms.base import TransformSequence from taskgraph.transforms.cached_tasks import order_tasks -from taskgraph.transforms.task import task_description_schema +from taskgraph.transforms.task import TaskDescriptionSchema, TaskDescriptionWorkerSchema from taskgraph.util import json from taskgraph.util import path as mozpath from taskgraph.util.python_path import import_sibling_modules -from taskgraph.util.schema import LegacySchema, validate_schema +from taskgraph.util.schema import Schema, validate_schema from taskgraph.util.taskcluster import get_artifact_prefix from taskgraph.util.workertypes import worker_type_implementation logger = logging.getLogger(__name__) -# Fetches may be accepted in other transforms and eventually passed along -# to a `task` (eg: from_deps). Defining this here allows them to reuse -# the schema and avoid duplication. -fetches_schema = { - Required("artifact"): str, - Optional("dest"): str, - Optional("extract"): bool, - Optional("verify-hash"): bool, -} - -#: Schema for a run transforms -run_description_schema = LegacySchema( - { - Optional( - "name", - description=dedent( - """ - The name of the task. At least one of 'name' or 'label' must be - specified. If 'label' is not provided, it will be generated from - the 'name' by prepending the kind. - """ - ), - ): str, - Optional( - "label", - description=dedent( - """ - The label of the task. At least one of 'name' or 'label' must be - specified. If 'label' is not provided, it will be generated from - the 'name' by prepending the kind. - """ - ), - ): str, - # the following fields are passed directly through to the task description, - # possibly modified by the run implementation. See - # taskcluster/taskgraph/transforms/task.py for the schema details. - Required("description"): task_description_schema["description"], - Optional("priority"): task_description_schema["priority"], - Optional("attributes"): task_description_schema["attributes"], - Optional("task-from"): task_description_schema["task-from"], - Optional("dependencies"): task_description_schema["dependencies"], - Optional("soft-dependencies"): task_description_schema["soft-dependencies"], - Optional("if-dependencies"): task_description_schema["if-dependencies"], - Optional("requires"): task_description_schema["requires"], - Optional("deadline-after"): task_description_schema["deadline-after"], - Optional("expires-after"): task_description_schema["expires-after"], - Optional("routes"): task_description_schema["routes"], - Optional("scopes"): task_description_schema["scopes"], - Optional("tags"): task_description_schema["tags"], - Optional("extra"): task_description_schema["extra"], - Optional("treeherder"): task_description_schema["treeherder"], - Optional("index"): task_description_schema["index"], - Optional("run-on-projects"): task_description_schema["run-on-projects"], - Optional("run-on-tasks-for"): task_description_schema["run-on-tasks-for"], - Optional("run-on-git-branches"): task_description_schema["run-on-git-branches"], - Optional("shipping-phase"): task_description_schema["shipping-phase"], - Optional("always-target"): task_description_schema["always-target"], - Exclusive("optimization", "optimization"): task_description_schema[ - "optimization" - ], - Optional("needs-sccache"): task_description_schema["needs-sccache"], - Exclusive( - "when", - "optimization", - description=dedent( - """ - The "when" section contains descriptions of the circumstances under - which this task should be included in the task graph. This will be - converted into an optimization, so it cannot be specified in a run - description that also gives 'optimization'. - """ - ), - ): { - Optional( - "files-changed", - description=dedent( - """ - This task only needs to be run if a file matching one of the given - patterns has changed in the push. The patterns use the mozpack - match function (python/mozbuild/mozpack/path.py). - """ - ), - ): [str], - }, - Optional( - "fetches", - description=dedent( - """ - A list of artifacts to install from 'fetch' tasks. - """ - ), - ): { - str: [ - str, - fetches_schema, - ], - }, - Required( - "run", - description=dedent( - """ - A description of how to run this task. - """ - ), - ): { - Required( - "using", - description=dedent( - """ - The key to a run implementation in a peer module to this one. - """ - ), - ): str, - Optional( - "workdir", - description=dedent( - """ - Base work directory used to set up the task. - """ - ), - ): str, - # Any remaining content is verified against that run implementation's - # own schema. - Extra: object, - }, - Required("worker-type"): task_description_schema["worker-type"], - Optional( - "worker", - description=dedent( - """ - This object will be passed through to the task description, with additions - provided by the task's run-using function. - """ - ), - ): dict, - } -) + +# Fetches schema using msgspec +class FetchesSchema(Schema): + """Schema for fetch configuration.""" + + artifact: str + dest: Optional[str] = None + extract: bool = True + verify_hash: bool = False + + +# When configuration using msgspec +class WhenSchema(Schema): + """Configuration for when a task should be included.""" + + # This task only needs to be run if a file matching one of the given + # patterns has changed in the push. The patterns use the mozpack + # match function (python/mozbuild/mozpack/path.py). + files_changed: list[str] = msgspec.field(default_factory=list) + + +# Run configuration using msgspec +class RunSchema(Schema, rename=None, forbid_unknown_fields=False): + """Configuration for how to run a task. + + This schema allows extra fields for run implementation-specific configuration. + """ + + # The key to a run implementation in a peer module to this one. + using: str + # Base work directory used to set up the task. + workdir: Optional[str] = None + + +# Run description schema using msgspec +class RunDescriptionSchema(Schema): + """Schema for run transforms.""" + + # Required fields first + description: TaskDescriptionSchema.__annotations__["description"] # type: ignore # noqa: F821 + run: RunSchema + worker_type: TaskDescriptionSchema.__annotations__["worker_type"] # type: ignore # noqa: F821 + + # Optional fields + # The name of the task. At least one of 'name' or 'label' must be + # specified. If 'label' is not provided, it will be generated from + # the 'name' by prepending the kind. + name: Optional[str] = None + # The label of the task. At least one of 'name' or 'label' must be + # specified. If 'label' is not provided, it will be generated from + # the 'name' by prepending the kind. + label: Optional[str] = None + + # Optional fields from task description + priority: TaskDescriptionSchema.__annotations__["priority"] = None # type: ignore[misc,assignment] + attributes: TaskDescriptionSchema.__annotations__["attributes"] = msgspec.field( # type: ignore[misc,assignment] + default_factory=dict + ) + task_from: TaskDescriptionSchema.__annotations__["task_from"] = None # type: ignore[misc,assignment] + dependencies: TaskDescriptionSchema.__annotations__["dependencies"] = msgspec.field( # type: ignore[misc,assignment] + default_factory=dict + ) + soft_dependencies: TaskDescriptionSchema.__annotations__["soft_dependencies"] = ( # type: ignore[misc,assignment] + msgspec.field(default_factory=list) + ) + if_dependencies: TaskDescriptionSchema.__annotations__["if_dependencies"] = ( # type: ignore[misc,assignment] + msgspec.field(default_factory=list) + ) + requires: TaskDescriptionSchema.__annotations__["requires"] = "all-completed" # type: ignore[misc,assignment] + deadline_after: TaskDescriptionSchema.__annotations__["deadline_after"] = None # type: ignore[misc,assignment] + expires_after: TaskDescriptionSchema.__annotations__["expires_after"] = None # type: ignore[misc,assignment] + routes: TaskDescriptionSchema.__annotations__["routes"] = msgspec.field( # type: ignore[misc,assignment] + default_factory=list + ) + scopes: TaskDescriptionSchema.__annotations__["scopes"] = msgspec.field( # type: ignore[misc,assignment] + default_factory=list + ) + tags: TaskDescriptionSchema.__annotations__["tags"] = msgspec.field( # type: ignore[misc,assignment] + default_factory=dict + ) + extra: TaskDescriptionSchema.__annotations__["extra"] = msgspec.field( # type: ignore[misc,assignment] + default_factory=dict + ) + treeherder: TaskDescriptionSchema.__annotations__["treeherder"] = None # type: ignore[misc,assignment] + index: TaskDescriptionSchema.__annotations__["index"] = None # type: ignore[misc,assignment] + run_on_projects: TaskDescriptionSchema.__annotations__["run_on_projects"] = None # type: ignore[misc,assignment] + run_on_tasks_for: TaskDescriptionSchema.__annotations__["run_on_tasks_for"] = ( # type: ignore[misc,assignment] + msgspec.field(default_factory=list) + ) + run_on_git_branches: TaskDescriptionSchema.__annotations__[ # type: ignore[misc,assignment] + "run_on_git_branches" # type: ignore[misc,assignment] + ] = msgspec.field(default_factory=list) + shipping_phase: TaskDescriptionSchema.__annotations__["shipping_phase"] = None # type: ignore[misc,assignment] + always_target: TaskDescriptionSchema.__annotations__["always_target"] = False # type: ignore[misc,assignment] + optimization: TaskDescriptionSchema.__annotations__["optimization"] = None # type: ignore[misc,assignment] + needs_sccache: TaskDescriptionSchema.__annotations__["needs_sccache"] = False # type: ignore[misc,assignment] + # The "when" section contains descriptions of the circumstances under + # which this task should be included in the task graph. This will be + # converted into an optimization, so it cannot be specified in a run + # description that also gives 'optimization'. + when: Optional[WhenSchema] = None + # A list of artifacts to install from 'fetch' tasks. + fetches: dict[str, list[Union[str, FetchesSchema]]] = msgspec.field( + default_factory=dict + ) + # This object will be passed through to the task description, with additions + # provided by the task's run-using function. + worker: Optional[TaskDescriptionWorkerSchema] = None + transforms = TransformSequence() -transforms.add_validate(run_description_schema) +transforms.add_validate(RunDescriptionSchema) @transforms.add @@ -456,9 +428,14 @@ def wrap(func): return wrap -@run_task_using( - "always-optimized", "always-optimized", LegacySchema({"using": "always-optimized"}) -) +# Simple schema for always-optimized +class AlwaysOptimizedRunSchema(Schema, omit_defaults=False): + """Schema for always-optimized run tasks.""" + + using: Literal["always-optimized"] + + +@run_task_using("always-optimized", "always-optimized", AlwaysOptimizedRunSchema) def always_optimized(config, task, taskdesc): pass diff --git a/src/taskgraph/transforms/run/index_search.py b/src/taskgraph/transforms/run/index_search.py index d5c0c6109..b224ceb5d 100644 --- a/src/taskgraph/transforms/run/index_search.py +++ b/src/taskgraph/transforms/run/index_search.py @@ -8,29 +8,24 @@ phase will replace the task with the task from the other graph. """ -from voluptuous import Required +from typing import Literal from taskgraph.transforms.base import TransformSequence from taskgraph.transforms.run import run_task_using -from taskgraph.util.schema import LegacySchema +from taskgraph.util.schema import Schema transforms = TransformSequence() #: Schema for run.using index-search -run_task_schema = LegacySchema( - { - Required("using"): "index-search", - Required( - "index-search", - "A list of indexes in decreasing order of priority at which to lookup for this " - "task. This is interpolated with the graph parameters.", - ): [str], - } -) +class RunTaskSchema(Schema): + using: Literal["index-search"] + # A list of indexes in decreasing order of priority at which to lookup for this + # task. This is interpolated with the graph parameters. + index_search: list[str] -@run_task_using("always-optimized", "index-search", schema=run_task_schema) +@run_task_using("always-optimized", "index-search", schema=RunTaskSchema) def fill_template(config, task, taskdesc): run = task["run"] taskdesc["optimization"] = { diff --git a/src/taskgraph/transforms/run/run_task.py b/src/taskgraph/transforms/run/run_task.py index ce81d7d23..172ba997b 100644 --- a/src/taskgraph/transforms/run/run_task.py +++ b/src/taskgraph/transforms/run/run_task.py @@ -7,19 +7,16 @@ import dataclasses import os -from textwrap import dedent - -from voluptuous import Any, Optional, Required +from typing import Literal, Optional, Union from taskgraph.transforms.run import run_task_using from taskgraph.transforms.run.common import ( support_caches, support_vcs_checkout, ) -from taskgraph.transforms.task import taskref_or_string from taskgraph.util import path, taskcluster from taskgraph.util.caches import CACHES -from taskgraph.util.schema import LegacySchema +from taskgraph.util.schema import Schema, taskref_or_string_msgspec EXEC_COMMANDS = { "bash": ["bash", "-cx"], @@ -28,90 +25,65 @@ #: Schema for run.using run_task -run_task_schema = LegacySchema( - { - Required( - "using", - description=dedent( - """ - Specifies the task type. Must be 'run-task'. - """.lstrip() - ), - ): "run-task", - Optional( - "use-caches", - description=dedent( - """ - Specifies which caches to use. May take a boolean in which case either all - (True) or no (False) caches will be used. Alternatively, it can accept a - list of caches to enable. Defaults to only the checkout cache enabled. - """.lstrip() - ), - ): Any(bool, list(CACHES.keys())), - Required( - "checkout", - description=dedent( - """ - If true (the default), perform a checkout on the worker. Can also be a - dictionary specifying explicit checkouts. - """.lstrip() - ), - ): Any(bool, {str: dict}), - Optional( - "cwd", - description=dedent( - """ - Path to run command in. If a checkout is present, the path to the checkout - will be interpolated with the key `checkout`. - """.lstrip() - ), - ): str, - Required( - "command", - description=dedent( - """ - The command arguments to pass to the `run-task` script, after the checkout - arguments. If a list, it will be passed directly; otherwise it will be - included in a single argument to the command specified by `exec-with`. - """.lstrip() - ), - ): Any([taskref_or_string], taskref_or_string), - Optional( - "exec-with", - description=dedent( - """ - Specifies what to execute the command with in the event the command is a - string. - """.lstrip() - ), - ): Any(*list(EXEC_COMMANDS)), - Optional( - "run-task-command", - description=dedent( - """ - Command used to invoke the `run-task` script. Can be used if the script - or Python installation is in a non-standard location on the workers. - """.lstrip() - ), - ): list, - Required( - "workdir", - description=dedent( - """ - Base work directory used to set up the task. - """.lstrip() - ), - ): str, - Optional( - "run-as-root", - description=dedent( - """ - Whether to run as root. Defaults to False. - """.lstrip() - ), - ): bool, - } -) +class RunTaskSchema(Schema): + """ + Schema for run.using run_task. + """ + + # Required fields first + # Specifies the task type. Must be 'run-task'. + using: Literal["run-task"] + + # The command arguments to pass to the `run-task` script, after the checkout + # arguments. If a list, it will be passed directly; otherwise it will be + # included in a single argument to the command specified by `exec-with`. + command: Union[list[taskref_or_string_msgspec], taskref_or_string_msgspec] + + # Base work directory used to set up the task. + workdir: str + + # Optional fields + # Specifies which caches to use. May take a boolean in which case either all + # (True) or no (False) caches will be used. Alternatively, it can accept a + # list of caches to enable. Defaults to only the checkout cache enabled. + use_caches: Optional[Union[bool, list[str]]] = None + + # If true (the default), perform a checkout on the worker. Can also be a + # dictionary specifying explicit checkouts. + checkout: Union[bool, dict[str, dict]] = True + + # Path to run command in. If a checkout is present, the path to the checkout + # will be interpolated with the key `checkout`. + cwd: Optional[str] = None + + # Specifies what to execute the command with in the event the command is a + # string. + exec_with: Optional[str] = None + + # Command used to invoke the `run-task` script. Can be used if the script + # or Python installation is in a non-standard location on the workers. + run_task_command: Optional[list[str]] = None + + # Whether to run as root. Defaults to False. + run_as_root: bool = False + + def __post_init__(self): + """Validate cache names and exec_with values.""" + # Validate cache names + if isinstance(self.use_caches, list): + invalid = set(self.use_caches) - set(CACHES.keys()) + if invalid: + raise ValueError( + f"Invalid cache names: {invalid}. " + f"Valid names are: {list(CACHES.keys())}" + ) + + # Validate exec_with + if self.exec_with is not None and self.exec_with not in EXEC_COMMANDS: + raise ValueError( + f"Invalid exec_with value: {self.exec_with}. " + f"Valid values are: {list(EXEC_COMMANDS.keys())}" + ) def common_setup(config, task, taskdesc, command): @@ -165,13 +137,13 @@ def script_url(config, script): raise Exception("TASK_ID must be defined to use run-task on generic-worker") task_id = os.environ.get("TASK_ID", "") # Assumes the cluster allows anonymous downloads of public artifacts - tc_url = taskcluster.get_root_url(block_proxy=True) + tc_url = taskcluster.get_root_url() # TODO: Use util/taskcluster.py:get_artifact_url once hack for Bug 1405889 is removed return f"{tc_url}/api/queue/v1/task/{task_id}/artifacts/public/{script}" @run_task_using( - "docker-worker", "run-task", schema=run_task_schema, defaults=worker_defaults + "docker-worker", "run-task", schema=RunTaskSchema, defaults=worker_defaults ) def docker_worker_run_task(config, task, taskdesc): run = task["run"] @@ -193,7 +165,7 @@ def docker_worker_run_task(config, task, taskdesc): @run_task_using( - "generic-worker", "run-task", schema=run_task_schema, defaults=worker_defaults + "generic-worker", "run-task", schema=RunTaskSchema, defaults=worker_defaults ) def generic_worker_run_task(config, task, taskdesc): run = task["run"] diff --git a/src/taskgraph/transforms/run/toolchain.py b/src/taskgraph/transforms/run/toolchain.py index 77406ad61..07fbcc844 100644 --- a/src/taskgraph/transforms/run/toolchain.py +++ b/src/taskgraph/transforms/run/toolchain.py @@ -5,9 +5,7 @@ Support for running toolchain-building tasks via dedicated scripts """ -from textwrap import dedent - -from voluptuous import ALLOW_EXTRA, Any, Optional, Required +from typing import Any, Literal, Optional, Union import taskgraph from taskgraph.transforms.run import configure_taskdesc_for_run, run_task_using @@ -18,84 +16,43 @@ ) from taskgraph.util import path as mozpath from taskgraph.util.hash import hash_paths -from taskgraph.util.schema import LegacySchema +from taskgraph.util.schema import Schema from taskgraph.util.shell import quote as shell_quote CACHE_TYPE = "toolchains.v3" + #: Schema for run.using toolchain -toolchain_run_schema = LegacySchema( - { - Required( - "using", - description=dedent( - """ - Specifies the run type. Must be "toolchain-script". - """ - ), - ): "toolchain-script", - Required( - "script", - description=dedent( - """ - The script (in taskcluster/scripts/misc) to run. - """ - ), - ): str, - Optional( - "arguments", - description=dedent( - """ - Arguments to pass to the script. - """ - ), - ): [str], - Optional( - "resources", - description=dedent( - """ - Paths/patterns pointing to files that influence the outcome of - a toolchain build. - """ - ), - ): [str], - Required( - "toolchain-artifact", - description=dedent( - """ - Path to the artifact produced by the toolchain task. - """ - ), - ): str, - Optional( - "toolchain-alias", - description=dedent( - """ - An alias that can be used instead of the real toolchain task name in - fetch stanzas for tasks. - """ - ), - ): Any(str, [str]), - Optional( - "toolchain-env", - description=dedent( - """ - Additional env variables to add to the worker when using this - toolchain. - """ - ), - ): {str: object}, - Required( - "workdir", - description=dedent( - """ - Base work directory used to set up the task. - """ - ), - ): str, - }, - extra=ALLOW_EXTRA, -) +class ToolchainRunSchema(Schema, forbid_unknown_fields=False): + # Required fields first + + # Specifies the run type. Must be "toolchain-script". + using: Literal["toolchain-script"] + # The script (in taskcluster/scripts/misc) to run. + script: str + + # Path to the artifact produced by the toolchain task. + toolchain_artifact: str + + # Base work directory used to set up the task. + workdir: str + + # Optional fields + + # Arguments to pass to the script. + arguments: Optional[list[str]] = None + + # Paths/patterns pointing to files that influence the outcome of + # a toolchain build. + resources: Optional[list[str]] = None + + # An alias that can be used instead of the real toolchain task name in + # fetch stanzas for tasks. + toolchain_alias: Optional[Union[str, list[str]]] = None + + # Additional env variables to add to the worker when using this + # toolchain. + toolchain_env: Optional[dict[str, Any]] = None def get_digest_data(config, run, taskdesc): @@ -199,7 +156,7 @@ def common_toolchain(config, task, taskdesc, is_docker): @run_task_using( "docker-worker", "toolchain-script", - schema=toolchain_run_schema, + schema=ToolchainRunSchema, defaults=toolchain_defaults, ) def docker_worker_toolchain(config, task, taskdesc): @@ -209,7 +166,7 @@ def docker_worker_toolchain(config, task, taskdesc): @run_task_using( "generic-worker", "toolchain-script", - schema=toolchain_run_schema, + schema=ToolchainRunSchema, defaults=toolchain_defaults, ) def generic_worker_toolchain(config, task, taskdesc): diff --git a/src/taskgraph/transforms/task.py b/src/taskgraph/transforms/task.py index 59efbd8f3..ad42a20ae 100644 --- a/src/taskgraph/transforms/task.py +++ b/src/taskgraph/transforms/task.py @@ -1,6 +1,7 @@ # This Source Code Form is subject to the terms of the Mozilla Public # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. + """ These transformations take a task description and turn it into a TaskCluster task definition (along with attributes, label, etc.). The input to these @@ -11,23 +12,25 @@ import functools import hashlib import os +import re import time from copy import deepcopy from dataclasses import dataclass -from textwrap import dedent -from typing import Callable +from typing import Any, Callable, Literal, Optional, Union -from voluptuous import All, Any, Extra, NotIn, Optional, Required +import msgspec +from taskgraph import MAX_DEPENDENCIES from taskgraph.transforms.base import TransformSequence from taskgraph.util.hash import hash_path from taskgraph.util.keyed_by import evaluate_keyed_by from taskgraph.util.schema import ( - LegacySchema, - OptimizationSchema, + OptimizationType, + Schema, + TaskPriority, optionally_keyed_by, resolve_keyed_by, - taskref_or_string, + taskref_or_string_msgspec, validate_schema, ) from taskgraph.util.treeherder import split_symbol, treeherder_defaults @@ -47,343 +50,145 @@ def run_task_suffix(): return hash_path(RUN_TASK)[0:20] -#: Schema for the task transforms -task_description_schema = LegacySchema( - { - Required( - "label", - description=dedent( - """ - The label for this task. - """.lstrip() - ), - ): str, - Required( - "description", - description=dedent( - """ - Description of the task (for metadata). - """.lstrip() - ), - ): str, - Optional( - "attributes", - description=dedent( - """ - Attributes for this task. - """.lstrip() - ), - ): {str: object}, - Optional( - "task-from", - description=dedent( - """ - Relative path (from config.path) to the file task was defined - in. - """.lstrip() - ), - ): str, - Optional( - "dependencies", - description=dedent( - """ - Dependencies of this task, keyed by name; these are passed - through verbatim and subject to the interpretation of the - Task's get_dependencies method. - """.lstrip() - ), - ): { - All( - str, - NotIn( - ["self", "decision"], - "Can't use 'self` or 'decision' as dependency names.", - ), - ): object, - }, - Optional( - "priority", - description=dedent( - """ - Priority of the task. - """.lstrip() - ), - ): Any( - "highest", - "very-high", - "high", - "medium", - "low", - "very-low", - "lowest", - ), - Optional( - "soft-dependencies", - description=dedent( - """ - Soft dependencies of this task, as a list of task labels. - """.lstrip() - ), - ): [str], - Optional( - "if-dependencies", - description=dedent( - """ - Dependencies that must be scheduled in order for this task to run. - """.lstrip() - ), - ): [str], - Optional( - "requires", - description=dedent( - """ - Specifies the condition for task execution. - """.lstrip() - ), - ): Any("all-completed", "all-resolved"), - Optional( - "expires-after", - description=dedent( - """ - Expiration time relative to task creation, with units (e.g., - '14 days'). Defaults are set based on the project. - """.lstrip() - ), - ): str, - Optional( - "deadline-after", - description=dedent( - """ - Deadline time relative to task creation, with units (e.g., - '14 days'). Defaults are set based on the project. - """.lstrip() - ), - ): str, - Optional( - "routes", - description=dedent( - """ - Custom routes for this task; the default treeherder routes will - be added automatically. - """.lstrip() - ), - ): [str], - Optional( - "scopes", - description=dedent( - """ - Custom scopes for this task; any scopes required for the worker - will be added automatically. The following parameters will be - substituted in each scope: - - {level} -- the scm level of this push - {project} -- the project of this push. - """.lstrip() - ), - ): [str], - Optional( - "tags", - description=dedent( - """ - Tags for this task. - """.lstrip() - ), - ): {str: str}, - Optional( - "extra", - description=dedent( - """ - Custom 'task.extra' content. - """.lstrip() - ), - ): {str: object}, - Optional( - "treeherder", - description=dedent( - """ - Treeherder-related information. Can be a simple `true` to - auto-generate information or a dictionary with specific keys. - """.lstrip() - ), - ): Any( - True, - { - "symbol": Optional( - str, - description=dedent( - """ - Either a bare symbol, or 'grp(sym)'. Defaults to the - uppercased first letter of each section of the kind - (delimited by '-') all smooshed together. - """.lstrip() - ), - ), - "kind": Optional( - Any("build", "test", "other"), - description=dedent( - """ - The task kind. Defaults to 'build', 'test', or 'other' - based on the kind name. - """.lstrip() - ), - ), - "tier": Optional( - int, - description=dedent( - """ - Tier for this task. Defaults to 1. - """.lstrip() - ), - ), - "platform": Optional( - str, - description=dedent( - """ - Task platform in the form platform/collection, used to - set treeherder.machine.platform and - treeherder.collection or treeherder.labels Defaults to - 'default/opt'. - """.lstrip() - ), - ), - }, - ), - Optional( - "index", - description=dedent( - """ - Information for indexing this build so its artifacts can be - discovered. If omitted, the build will not be indexed. - """.lstrip() - ), - ): { - # the name of the product this build produces - "product": str, - # the names to use for this task in the TaskCluster index - "job-name": str, - # Type of gecko v2 index to use - "type": str, - # The rank that the task will receive in the TaskCluster - # index. A newly completed task supersedes the currently - # indexed task iff it has a higher rank. If unspecified, - # 'by-tier' behavior will be used. - "rank": Any( - # Rank is equal the timestamp of the build_date for tier-1 - # tasks, and zero for non-tier-1. This sorts tier-{2,3} - # builds below tier-1 in the index. - "by-tier", - # Rank is given as an integer constant (e.g. zero to make - # sure a task is last in the index). - int, - # Rank is equal to the timestamp of the build_date. This - # option can be used to override the 'by-tier' behavior - # for non-tier-1 tasks. - "build_date", - ), - }, - Optional( - "run-on-projects", - description=dedent( - """ - The `run_on_projects` attribute, defaulting to 'all'. Dictates - the projects on which this task should be included in the - target task set. See the attributes documentation for details. - """.lstrip() - ), - ): optionally_keyed_by("build-platform", [str]), - Optional( - "run-on-tasks-for", - description=dedent( - """ - Specifies tasks for which this task should run. - """.lstrip() - ), - ): [str], - Optional( - "run-on-git-branches", - description=dedent( - """ - Specifies git branches for which this task should run. - """.lstrip() - ), - ): [str], - Optional( - "shipping-phase", - description=dedent( - """ - The `shipping_phase` attribute, defaulting to None. Specifies - the release promotion phase that this task belongs to. - """.lstrip() - ), - ): Any( - None, - "build", - "promote", - "push", - "ship", - ), - Required( - "always-target", - description=dedent( - """ - The `always-target` attribute will cause the task to be - included in the target_task_graph regardless of filtering. - - Tasks included in this manner will be candidates for - optimization even when `optimize_target_tasks` is False, unless - the task was also explicitly chosen by the target_tasks method. - """.lstrip() - ), - ): bool, - Required( - "optimization", - description=dedent( - """ - Optimization to perform on this task during the optimization - phase. Defined in taskcluster/taskgraph/optimize.py. - """.lstrip() - ), - ): OptimizationSchema, - Required( - "worker-type", - description=dedent( - """ - The provisioner-id/worker-type for the task. The following - parameters will be substituted in this string: - - {level} -- the scm level of this push. - """.lstrip() - ), - ): str, - Required( - "needs-sccache", - description=dedent( - """ - Whether the task should use sccache compiler caching. - """.lstrip() - ), - ): bool, - Optional( - "worker", - description=dedent( - """ - Information specific to the worker implementation that will run - this task. - """.lstrip() - ), - ): { - Required( - "implementation", - description=dedent( - """ - The worker implementation type. - """.lstrip() - ), - ): str, - Extra: object, - }, - } -) +# Task Description schema using msgspec +class TaskDescriptionTreeherderSchema(Schema, rename=None): + """Treeherder-related information for a task.""" + + # Either a bare symbol, or 'grp(sym)'. Defaults to the + # uppercased first letter of each section of the kind + # (delimited by '-') all smooshed together. + symbol: Optional[str] = None + # The task kind. Defaults to 'build', 'test', or 'other' + # based on the kind name. + kind: Optional[Literal["build", "test", "other"]] = None + # Tier for this task. Defaults to 1. + tier: Optional[int] = None + # Task platform in the form platform/collection, used to + # set treeherder.machine.platform and + # treeherder.collection or treeherder.labels. Defaults to + # 'default/opt'. + platform: Optional[str] = None + + +class TaskDescriptionIndexSchema(Schema, rename="kebab"): + """Index information for a task.""" + + # the name of the product this build produces + product: str + # the names to use for this task in the TaskCluster index + job_name: str + # Type of gecko v2 index to use + type: str = "generic" # Default to generic as that's what's commonly used + # The rank that the task will receive in the TaskCluster + # index. A newly completed task supersedes the currently + # indexed task iff it has a higher rank. If unspecified, + # 'by-tier' behavior will be used. + # Rank is equal the timestamp of the build_date for tier-1 + # tasks, and zero for non-tier-1. This sorts tier-{2,3} + # builds below tier-1 in the index. + # Can also be given as an integer constant (e.g. zero to make + # sure a task is last in the index) or 'build_date' to equal + # the timestamp of the build_date. + rank: Union[Literal["by-tier", "build_date"], int] = "by-tier" + + +class TaskDescriptionWorkerSchema(Schema, rename=None, forbid_unknown_fields=False): + """Worker configuration for a task. + + This schema allows extra fields for worker-specific configuration. + """ + + implementation: Optional[str] = None + + +class TaskDescriptionSchema(Schema, forbid_unknown_fields=False): + """Schema for task descriptions.""" + + # The label for this task. + label: str + # Description of the task (for metadata). + description: str + # The provisioner-id/worker-type for the task. The following + # parameters will be substituted in this string: + # {level} -- the scm level of this push. + worker_type: str + # Attributes for this task. + attributes: dict[str, Any] = msgspec.field(default_factory=dict) + # Relative path (from config.path) to the file task was defined in. + task_from: Optional[str] = None + # Dependencies of this task, keyed by name; these are passed + # through verbatim and subject to the interpretation of the + # Task's get_dependencies method. + dependencies: dict[str, Any] = msgspec.field(default_factory=dict) + # Priority of the task. + priority: Optional[TaskPriority] = None + # Soft dependencies of this task, as a list of task labels. + soft_dependencies: list[str] = msgspec.field(default_factory=list) + # Dependencies that must be scheduled in order for this task to run. + if_dependencies: list[str] = msgspec.field(default_factory=list) + # Specifies the condition for task execution. + requires: Literal["all-completed", "all-resolved"] = "all-completed" + # Expiration time relative to task creation, with units (e.g., + # '14 days'). Defaults are set based on the project. + expires_after: Optional[str] = None + # Deadline time relative to task creation, with units (e.g., + # '14 days'). Defaults are set based on the project. + deadline_after: Optional[str] = None + # Custom routes for this task; the default treeherder routes will + # be added automatically. + routes: list[str] = msgspec.field(default_factory=list) + # Custom scopes for this task; any scopes required for the worker + # will be added automatically. The following parameters will be + # substituted in each scope: + # {level} -- the scm level of this push + # {project} -- the project of this push. + scopes: list[str] = msgspec.field(default_factory=list) + # Tags for this task. + tags: dict[str, str] = msgspec.field(default_factory=dict) + # Custom 'task.extra' content. + extra: dict[str, Any] = msgspec.field(default_factory=dict) + # Treeherder-related information. Can be a simple `true` to + # auto-generate information or a dictionary with specific keys. + treeherder: Optional[Union[bool, TaskDescriptionTreeherderSchema]] = None + # Information for indexing this build so its artifacts can be + # discovered. If omitted, the build will not be indexed. + index: Optional[TaskDescriptionIndexSchema] = None + # The `run_on_projects` attribute, defaulting to 'all'. Dictates + # the projects on which this task should be included in the + # target task set. See the attributes documentation for details. + run_on_projects: optionally_keyed_by( + "build-platform", list[str], use_msgspec=True + ) = None # type: ignore + # Specifies tasks for which this task should run. + run_on_tasks_for: list[str] = msgspec.field(default_factory=list) + # Specifies git branches for which this task should run. + run_on_git_branches: list[str] = msgspec.field(default_factory=list) + # The `shipping_phase` attribute, defaulting to None. Specifies + # the release promotion phase that this task belongs to. + shipping_phase: Optional[Literal["build", "promote", "push", "ship"]] = None + # The `always-target` attribute will cause the task to be + # included in the target_task_graph regardless of filtering. + # Tasks included in this manner will be candidates for + # optimization even when `optimize_target_tasks` is False, unless + # the task was also explicitly chosen by the target_tasks method. + always_target: bool = False + # Optimization to perform on this task during the optimization + # phase. Defined in taskcluster/taskgraph/optimize.py. + optimization: OptimizationType = None + # Whether the task should use sccache compiler caching. + needs_sccache: bool = False + # Information specific to the worker implementation + worker: Optional[TaskDescriptionWorkerSchema] = None + + def __post_init__(self): + """Validate dependency names.""" + if self.dependencies: + invalid_names = {"self", "decision"} & set(self.dependencies.keys()) + if invalid_names: + raise ValueError( + f"Can't use {', '.join(repr(n) for n in sorted(invalid_names))} as dependency names." + ) + TC_TREEHERDER_SCHEMA_URL = ( "https://github.com/taskcluster/taskcluster-treeherder/" @@ -430,14 +235,18 @@ def get_default_deadline(graph_config, project): @dataclass(frozen=True) class PayloadBuilder: - schema: LegacySchema + schema: Schema builder: Callable def payload_builder(name, schema): - schema = LegacySchema( - {Required("implementation"): name, Optional("os"): str} - ).extend(schema) + """ + Decorator for registering payload builders. + """ + # Verify the schema has required fields using __annotations__ + # (more reliable than msgspec.structs.fields() across Python versions) + if "implementation" not in schema.__annotations__: + raise ValueError(f"Schema for {name} must include 'implementation' field") def wrap(func): assert name not in payload_builders, f"duplicate payload builder name {name}" @@ -472,86 +281,107 @@ def verify_index(config, index): raise Exception(UNSUPPORTED_INDEX_PRODUCT_ERROR.format(product=product)) -@payload_builder( - "docker-worker", - schema={ - Required("os"): "linux", - # For tasks that will run in docker-worker, this is the name of the docker - # image or in-tree docker image to run the task in. If in-tree, then a - # dependency will be created automatically. This is generally - # `desktop-test`, or an image that acts an awful lot like it. - Required("docker-image"): Any( - # a raw Docker image path (repo/image:tag) - str, - # an in-tree generated docker image (from `taskcluster/docker/`) - {"in-tree": str}, - # an indexed docker image - {"indexed": str}, - ), - # worker features that should be enabled - Required("relengapi-proxy"): bool, - Required("chain-of-trust"): bool, - Required("taskcluster-proxy"): bool, - Required("allow-ptrace"): bool, - Required("loopback-video"): bool, - Required("loopback-audio"): bool, - Required("docker-in-docker"): bool, # (aka 'dind') - Required("privileged"): bool, - # Paths to Docker volumes. - # - # For in-tree Docker images, volumes can be parsed from Dockerfile. - # This only works for the Dockerfile itself: if a volume is defined in - # a base image, it will need to be declared here. Out-of-tree Docker - # images will also require explicit volume annotation. - # - # Caches are often mounted to the same path as Docker volumes. In this - # case, they take precedence over a Docker volume. But a volume still - # needs to be declared for the path. - Optional("volumes"): [str], - # caches to set up for the task - Optional("caches"): [ - { - # only one type is supported by any of the workers right now - "type": "persistent", - # name of the cache, allowing reuse by subsequent tasks naming the - # same cache - "name": str, - # location in the task image where the cache will be mounted - "mount-point": str, - # Whether the cache is not used in untrusted environments - # (like the Try repo). - Optional("skip-untrusted"): bool, - } - ], - # artifacts to extract from the task image after completion - Optional("artifacts"): [ - { - # type of artifact -- simple file, or recursive directory, - # or a volume mounted directory. - "type": Any("file", "directory", "volume"), - # task image path from which to read artifact - "path": str, - # name of the produced artifact (root of the names for - # type=directory) - "name": str, - } - ], - # environment variables - Required("env"): {str: taskref_or_string}, - # the command to run; if not given, docker-worker will default to the - # command in the docker image - Optional("command"): [taskref_or_string], - # the maximum time to run, in seconds - Required("max-run-time"): int, - # the exit status code(s) that indicates the task should be retried - Optional("retry-exit-status"): [int], - # the exit status code(s) that indicates the caches used by the task - # should be purged - Optional("purge-caches-exit-status"): [int], - # Whether any artifacts are assigned to this worker - Optional("skip-artifacts"): bool, - }, -) +# Docker Worker schema using msgspec +class DockerWorkerCacheSchema(Schema, rename="kebab"): + """Cache configuration for docker-worker.""" + + # name of the cache, allowing reuse by subsequent tasks naming the same cache + name: str + # location in the task image where the cache will be mounted + mount_point: str + # only one type is supported by any of the workers right now + type: Literal["persistent"] = "persistent" + # Whether the cache is not used in untrusted environments (like the Try repo). + skip_untrusted: bool = False + + +class DockerImageInTreeSchema(Schema, rename="kebab"): + """In-tree generated docker image.""" + + in_tree: str + + +class DockerImageIndexedSchema(Schema): + """Indexed docker image.""" + + indexed: str + + +# Create a class for docker image types to avoid dict union issues +class DockerImageTypeSchema(Schema, forbid_unknown_fields=False): + """Schema that accepts either in-tree or indexed docker images.""" + + in_tree: Optional[str] = None + indexed: Optional[str] = None + + def __post_init__(self): + """Ensure exactly one image type is provided.""" + if self.in_tree and self.indexed: + raise ValueError("Cannot have both in-tree and indexed") + if not self.in_tree and not self.indexed: + raise ValueError("Must have either in-tree or indexed") + + +# Type for docker-image field +DockerImageType = Union[ + str, # a raw Docker image path (repo/image:tag) + DockerImageTypeSchema, # docker image configs +] + + +class DockerWorkerArtifactSchema(Schema, rename=None): + """Artifact configuration for docker-worker.""" + + # type of artifact -- simple file, or recursive directory, or a volume mounted directory. + type: Literal["file", "directory", "volume"] + # task image path from which to read artifact + path: str + # name of the produced artifact (root of the names for type=directory) + name: str + + +class DockerWorkerPayloadSchema(Schema): + """Schema for docker-worker payload.""" + + # Required fields first + implementation: Literal["docker-worker"] + # For tasks that will run in docker-worker, this is the name of the docker + # image or in-tree docker image to run the task in. + docker_image: DockerImageType + # the maximum time to run, in seconds + max_run_time: int + + # Optional fields + os: Literal["linux"] = "linux" + # worker features that should be enabled + relengapi_proxy: bool = False + chain_of_trust: bool = False + taskcluster_proxy: bool = False + allow_ptrace: bool = False + loopback_video: bool = False + loopback_audio: bool = False + docker_in_docker: bool = False # (aka 'dind') + privileged: bool = False + # Paths to Docker volumes. + volumes: list[str] = msgspec.field(default_factory=list) + # caches to set up for the task + caches: Optional[list[DockerWorkerCacheSchema]] = None + # artifacts to extract from the task image after completion + artifacts: Optional[list[DockerWorkerArtifactSchema]] = None + # environment variables + env: dict[str, taskref_or_string_msgspec] = msgspec.field(default_factory=dict) + # the command to run; if not given, docker-worker will default to the + # command in the docker image + command: Optional[list[taskref_or_string_msgspec]] = None + # the exit status code(s) that indicates the task should be retried + retry_exit_status: Optional[list[int]] = None + # the exit status code(s) that indicates the caches used by the task should be purged + purge_caches_exit_status: Optional[list[int]] = None + # Whether any artifacts are assigned to this worker + skip_artifacts: bool = False + + +@payload_builder("docker-worker", DockerWorkerPayloadSchema) def build_docker_worker_payload(config, task, task_def): worker = task["worker"] level = int(config.params["level"]) @@ -649,7 +479,7 @@ def build_docker_worker_payload(config, task, task_def): if "max-run-time" in worker: payload["maxRunTime"] = worker["max-run-time"] - run_task = os.path.basename(payload.get("command", [""])[0]).startswith("run-task") + run_task = payload.get("command", [""])[0].endswith("run-task") # run-task exits EXIT_PURGE_CACHES if there is a problem with caches. # Automatically retry the tasks and purge caches if we see this exit @@ -761,90 +591,86 @@ def build_docker_worker_payload(config, task, task_def): if capabilities: payload["capabilities"] = capabilities - -@payload_builder( - "generic-worker", - schema={ - Required("os"): Any("windows", "macosx", "linux", "linux-bitbar"), - # see http://schemas.taskcluster.net/generic-worker/v1/payload.json - # and https://docs.taskcluster.net/reference/workers/generic-worker/payload - # command is a list of commands to run, sequentially - # on Windows, each command is a string, on OS X and Linux, each command is - # a string array - Required("command"): Any( - [taskref_or_string], - [[taskref_or_string]], # Windows # Linux / OS X - ), - # artifacts to extract from the task image after completion; note that artifacts - # for the generic worker cannot have names - Optional("artifacts"): [ - { - # type of artifact -- simple file, or recursive directory - "type": Any("file", "directory"), - # filesystem path from which to read artifact - "path": str, - # if not specified, path is used for artifact name - Optional("name"): str, - } - ], - # Directories and/or files to be mounted. - # The actual allowed combinations are stricter than the model below, - # but this provides a simple starting point. - # See https://docs.taskcluster.net/reference/workers/generic-worker/payload - Optional("mounts"): [ - { - # A unique name for the cache volume, implies writable cache directory - # (otherwise mount is a read-only file or directory). - Optional("cache-name"): str, - # Optional content for pre-loading cache, or mandatory content for - # read-only file or directory. Pre-loaded content can come from either - # a task artifact or from a URL. - Optional("content"): { - # *** Either (artifact and task-id) or url must be specified. *** - # Artifact name that contains the content. - Optional("artifact"): str, - # Task ID that has the artifact that contains the content. - Optional("task-id"): taskref_or_string, - # URL that supplies the content in response to an unauthenticated - # GET request. - Optional("url"): str, - }, - # *** Either file or directory must be specified. *** - # If mounting a cache or read-only directory, the filesystem location of - # the directory should be specified as a relative path to the task - # directory here. - Optional("directory"): str, - # If mounting a file, specify the relative path within the task - # directory to mount the file (the file will be read only). - Optional("file"): str, - # Required if and only if `content` is specified and mounting a - # directory (not a file). This should be the archive format of the - # content (either pre-loaded cache or read-only directory). - Optional("format"): Any("rar", "tar.bz2", "tar.gz", "zip"), - } - ], - # environment variables - Required("env"): {str: taskref_or_string}, - # the maximum time to run, in seconds - Required("max-run-time"): int, - # the exit status code(s) that indicates the task should be retried - Optional("retry-exit-status"): [int], - # the exit status code(s) that indicates the caches used by the task - # should be purged - Optional("purge-caches-exit-status"): [int], - # os user groups for test task workers - Optional("os-groups"): [str], - # feature for test task to run as administarotr - Optional("run-as-administrator"): bool, - # feature for task to run as current OS user - Optional("run-task-as-current-user"): bool, - # optional features - Required("chain-of-trust"): bool, - Optional("taskcluster-proxy"): bool, - # Whether any artifacts are assigned to this worker - Optional("skip-artifacts"): bool, - }, -) + check_caches_are_volumes(task) + + +# Generic Worker schema using msgspec +class GenericWorkerArtifactSchema(Schema, rename=None): + """Artifact configuration for generic-worker.""" + + # type of artifact -- simple file, or recursive directory + type: Literal["file", "directory"] + # filesystem path from which to read artifact + path: str + # if not specified, path is used for artifact name + name: Optional[str] = None + + +class GenericWorkerMountContentSchema(Schema, rename="kebab"): + """Mount content configuration for generic-worker.""" + + # Artifact name that contains the content. + artifact: Optional[str] = None + # Task ID that has the artifact that contains the content. + task_id: Optional[taskref_or_string_msgspec] = None + # URL that supplies the content in response to an unauthenticated GET request. + url: Optional[str] = None + + +class GenericWorkerMountSchema(Schema, rename="kebab"): + """Mount configuration for generic-worker.""" + + # A unique name for the cache volume, implies writable cache directory + cache_name: Optional[str] = None + # Optional content for pre-loading cache, or mandatory content for read-only file or directory + content: Optional[GenericWorkerMountContentSchema] = None + # If mounting a cache or read-only directory, the filesystem location + directory: Optional[str] = None + # If mounting a file, specify the relative path within the task directory + file: Optional[str] = None + # Archive format of the content + format: Optional[Literal["rar", "tar.bz2", "tar.gz", "zip"]] = None + + +class GenericWorkerPayloadSchema(Schema): + """Schema for generic-worker payload.""" + + # Required fields first + implementation: Literal["generic-worker"] + os: Literal["windows", "macosx", "linux", "linux-bitbar"] + # command is a list of commands to run, sequentially + # on Windows, each command is a string, on OS X and Linux, each command is a string array + command: Union[ + list[taskref_or_string_msgspec], list[list[taskref_or_string_msgspec]] + ] + # the maximum time to run, in seconds + max_run_time: int + + # Optional fields + # artifacts to extract from the task image after completion + artifacts: Optional[list[GenericWorkerArtifactSchema]] = None + # Directories and/or files to be mounted + mounts: Optional[list[GenericWorkerMountSchema]] = None + # environment variables + env: dict[str, taskref_or_string_msgspec] = msgspec.field(default_factory=dict) + # the exit status code(s) that indicates the task should be retried + retry_exit_status: Optional[list[int]] = None + # the exit status code(s) that indicates the caches used by the task should be purged + purge_caches_exit_status: Optional[list[int]] = None + # os user groups for test task workers + os_groups: list[str] = msgspec.field(default_factory=list) + # feature for test task to run as administrator + run_as_administrator: bool = False + # feature for task to run as current OS user + run_task_as_current_user: bool = False + # optional features + chain_of_trust: bool = False + taskcluster_proxy: bool = False + # Whether any artifacts are assigned to this worker + skip_artifacts: bool = False + + +@payload_builder("generic-worker", GenericWorkerPayloadSchema) def build_generic_worker_payload(config, task, task_def): worker = task["worker"] @@ -956,38 +782,52 @@ def build_generic_worker_payload(config, task, task_def): task_def["payload"]["features"] = features -@payload_builder( - "beetmover", - schema={ - # the maximum time to run, in seconds - Required("max-run-time"): int, - # locale key, if this is a locale beetmover task - Optional("locale"): str, - Optional("partner-public"): bool, - Required("release-properties"): { - "app-name": str, - "app-version": str, - "branch": str, - "build-id": str, - "hash-type": str, - "platform": str, - }, - # list of artifact URLs for the artifacts that should be beetmoved - Required("upstream-artifacts"): [ - { - # taskId of the task with the artifact - Required("taskId"): taskref_or_string, - # type of signing task (for CoT) - Required("taskType"): str, - # Paths to the artifacts to sign - Required("paths"): [str], - # locale is used to map upload path and allow for duplicate simple names - Required("locale"): str, - } - ], - Optional("artifact-map"): object, - }, -) +# Beetmover schema using msgspec +class BeetmoverReleasePropertiesSchema(Schema): + """Release properties for beetmover tasks.""" + + app_name: str + app_version: str + branch: str + build_id: str + hash_type: str + platform: str + + +class BeetmoverUpstreamArtifactSchema(Schema, rename=None, omit_defaults=False): + """Upstream artifact definition for beetmover.""" + + # taskId of the task with the artifact + taskId: taskref_or_string_msgspec # Can be string or task-reference dict + # type of signing task (for CoT) + taskType: str + # Paths to the artifacts to sign + paths: list[str] + # locale is used to map upload path and allow for duplicate simple names + locale: str + + +class BeetmoverPayloadSchema(Schema): + """Schema for beetmover worker payload.""" + + # Required fields first + implementation: Literal["beetmover"] + # the maximum time to run, in seconds + max_run_time: int + release_properties: BeetmoverReleasePropertiesSchema + # list of artifact URLs for the artifacts that should be beetmoved + upstream_artifacts: list[BeetmoverUpstreamArtifactSchema] + + # Optional fields + os: str = "" + # locale key, if this is a locale beetmover task + locale: Optional[str] = None + partner_public: Optional[bool] = None + # Artifact map can be any object + artifact_map: Optional[dict] = None + + +@payload_builder("beetmover", BeetmoverPayloadSchema) def build_beetmover_payload(config, task, task_def): worker = task["worker"] release_properties = worker["release-properties"] @@ -1013,25 +853,41 @@ def build_beetmover_payload(config, task, task_def): task_def["payload"]["is_partner_repack_public"] = worker["partner-public"] -@payload_builder( - "invalid", - schema={ - # an invalid task is one which should never actually be created; this is used in - # release automation on branches where the task just doesn't make sense - Extra: object, - }, -) +# Simple payload schemas using msgspec +class InvalidPayloadSchema( + Schema, rename=None, omit_defaults=False, forbid_unknown_fields=False +): + """Schema for invalid tasks - allows any fields.""" + + implementation: Literal["invalid"] + os: str = "" + + +class AlwaysOptimizedPayloadSchema( + Schema, rename=None, omit_defaults=False, forbid_unknown_fields=False +): + """Schema for always-optimized tasks - allows any fields.""" + + implementation: Literal["always-optimized"] + os: str = "" + + +class SucceedPayloadSchema(Schema, rename=None, omit_defaults=False): + """Schema for succeed tasks - minimal schema.""" + + # Required field first + implementation: Literal["succeed"] + # Optional field + os: str = "" + + +@payload_builder("invalid", InvalidPayloadSchema) def build_invalid_payload(config, task, task_def): task_def["payload"] = "invalid task - should never be created" -@payload_builder( - "always-optimized", - schema={ - Extra: object, - }, -) -@payload_builder("succeed", schema={}) +@payload_builder("always-optimized", AlwaysOptimizedPayloadSchema) +@payload_builder("succeed", SucceedPayloadSchema) def build_dummy_payload(config, task, task_def): task_def["payload"] = {} @@ -1118,7 +974,7 @@ def task_name_from_label(config, tasks): def validate(config, tasks): for task in tasks: validate_schema( - task_description_schema, + TaskDescriptionSchema, task, "In task {!r}:".format(task.get("label", "?no-label?")), ) @@ -1453,3 +1309,135 @@ def chain_of_trust(config, tasks): "task-reference": "" } yield task + + +@transforms.add +def check_task_identifiers(config, tasks): + """Ensures that all tasks have well defined identifiers: + ``^[a-zA-Z0-9_-]{1,38}$`` + """ + e = re.compile("^[a-zA-Z0-9_-]{1,38}$") + for task in tasks: + for attrib in ("workerType", "provisionerId"): + if not e.match(task["task"][attrib]): + raise Exception( + "task {}.{} is not a valid identifier: {}".format( + task["label"], attrib, task["task"][attrib] + ) + ) + yield task + + +@transforms.add +def check_task_dependencies(config, tasks): + """Ensures that tasks don't have more than 100 dependencies.""" + for task in tasks: + number_of_dependencies = ( + len(task["dependencies"]) + + len(task["if-dependencies"]) + + len(task["soft-dependencies"]) + ) + if number_of_dependencies > MAX_DEPENDENCIES: + raise Exception( + "task {}/{} has too many dependencies ({} > {})".format( + config.kind, + task["label"], + number_of_dependencies, + MAX_DEPENDENCIES, + ) + ) + yield task + + +def check_caches_are_volumes(task): + """Ensures that all cache paths are defined as volumes. + + Caches and volumes are the only filesystem locations whose content + isn't defined by the Docker image itself. Some caches are optional + depending on the task environment. We want paths that are potentially + caches to have as similar behavior regardless of whether a cache is + used. To help enforce this, we require that all paths used as caches + to be declared as Docker volumes. This check won't catch all offenders. + But it is better than nothing. + """ + volumes = set(task["worker"]["volumes"]) + paths = {c["mount-point"] for c in task["worker"].get("caches", [])} + missing = paths - volumes + + if not missing: + return + + raise Exception( + "task {} (image {}) has caches that are not declared as " + "Docker volumes: {} " + "(have you added them as VOLUMEs in the Dockerfile?)".format( + task["label"], task["worker"]["docker-image"], ", ".join(sorted(missing)) + ) + ) + + +@transforms.add +def check_run_task_caches(config, tasks): + """Audit for caches requiring run-task. + + run-task manages caches in certain ways. If a cache managed by run-task + is used by a non run-task task, it could cause problems. So we audit for + that and make sure certain cache names are exclusive to run-task. + + IF YOU ARE TEMPTED TO MAKE EXCLUSIONS TO THIS POLICY, YOU ARE LIKELY + CONTRIBUTING TECHNICAL DEBT AND WILL HAVE TO SOLVE MANY OF THE PROBLEMS + THAT RUN-TASK ALREADY SOLVES. THINK LONG AND HARD BEFORE DOING THAT. + """ + re_reserved_caches = re.compile( + """^ + (checkouts|tooltool-cache) + """, + re.VERBOSE, + ) + + cache_prefix = "{trust_domain}-level-{level}-".format( + trust_domain=config.graph_config["trust-domain"], + level=config.params["level"], + ) + + suffix = run_task_suffix() + + for task in tasks: + payload = task["task"].get("payload", {}) + command = payload.get("command") or [""] + + main_command = command[0] if isinstance(command[0], str) else "" + run_task = main_command.endswith("run-task") + + for cache in payload.get("cache", {}).get( + "task-reference", payload.get("cache", {}) + ): + if not cache.startswith(cache_prefix): + raise Exception( + "{} is using a cache ({}) which is not appropriate " + "for its trust-domain and level. It should start with {}.".format( + task["label"], cache, cache_prefix + ) + ) + + cache = cache[len(cache_prefix) :] + + if not re_reserved_caches.match(cache): + continue + + if not run_task: + raise Exception( + f"{task['label']} is using a cache ({cache}) reserved for run-task " + "change the task to use run-task or use a different " + "cache name" + ) + + if suffix not in cache: + raise Exception( + f"{task['label']} is using a cache ({cache}) reserved for run-task " + "but the cache name is not dependent on the contents " + "of run-task; change the cache name to conform to the " + "naming requirements" + ) + + yield task diff --git a/src/taskgraph/transforms/task_context.py b/src/taskgraph/transforms/task_context.py index e38648cd3..06cf9ef01 100644 --- a/src/taskgraph/transforms/task_context.py +++ b/src/taskgraph/transforms/task_context.py @@ -1,85 +1,69 @@ -from textwrap import dedent - -from voluptuous import ALLOW_EXTRA, Any, Optional, Required +from typing import Any, Optional, Union from taskgraph.transforms.base import TransformSequence -from taskgraph.util.schema import LegacySchema +from taskgraph.util.schema import Schema from taskgraph.util.templates import deep_get, substitute_task_fields from taskgraph.util.yaml import load_yaml + +class TaskContextChildSchema(Schema): + """ + `task-context` can be used to substitute values into any field in a + task with data that is not known until `taskgraph` runs. + + This data can be provided via `from-parameters` or `from-file`, + which can pull in values from parameters and a defined yml file + respectively. + + Data may also be provided directly in the `from-object` section of + `task-context`. This can be useful in `kinds` that define most of + their contents in `task-defaults`, but have some values that may + differ for various concrete `tasks` in the `kind`. + + If the same key is found in multiple places the order of precedence + is as follows: + + - Parameters + - `from-object` keys + - File + + That is to say: parameters will always override anything else. + """ + + # Required field first + # A list of fields in the task to substitute the provided values + # into. + substitution_fields: list[str] + + # Optional fields + # Retrieve task context values from parameters. A single + # parameter may be provided or a list of parameters in + # priority order. The latter can be useful in implementing a + # "default" value if some other parameter is not provided. + from_parameters: Optional[dict[str, Union[list[str], str]]] = None + # Retrieve task context values from a yaml file. The provided + # file should usually only contain top level keys and values + # (eg: nested objects will not be interpolated - they will be + # substituted as text representations of the object). + from_file: Optional[str] = None + # Key/value pairs to be used as task context + from_object: Optional[Any] = None + + #: Schema for the task_context transforms -SCHEMA = LegacySchema( - { - Optional("name"): str, - Optional( - "task-context", - description=dedent( - """ - `task-context` can be used to substitute values into any field in a - task with data that is not known until `taskgraph` runs. - - This data can be provided via `from-parameters` or `from-file`, - which can pull in values from parameters and a defined yml file - respectively. - - Data may also be provided directly in the `from-object` section of - `task-context`. This can be useful in `kinds` that define most of - their contents in `task-defaults`, but have some values that may - differ for various concrete `tasks` in the `kind`. - - If the same key is found in multiple places the order of precedence - is as follows: - - Parameters - - `from-object` keys - - File - - That is to say: parameters will always override anything else. - - """.lstrip(), - ), - ): { - Optional( - "from-parameters", - description=dedent( - """ - Retrieve task context values from parameters. A single - parameter may be provided or a list of parameters in - priority order. The latter can be useful in implementing a - "default" value if some other parameter is not provided. - """.lstrip() - ), - ): {str: Any([str], str)}, - Optional( - "from-file", - description=dedent( - """ - Retrieve task context values from a yaml file. The provided - file should usually only contain top level keys and values - (eg: nested objects will not be interpolated - they will be - substituted as text representations of the object). - """.lstrip() - ), - ): str, - Optional( - "from-object", - description="Key/value pairs to be used as task context", - ): object, - Required( - "substitution-fields", - description=dedent( - """ - A list of fields in the task to substitute the provided values - into. - """.lstrip() - ), - ): [str], - }, - }, - extra=ALLOW_EXTRA, -) +class TaskContextSchema(Schema, forbid_unknown_fields=False): + """Schema for task context transforms. + + This schema allows extra fields to be passed through to the task. + """ + + # Optional fields + task_context: Optional[TaskContextChildSchema] = None + name: Optional[str] = None + transforms = TransformSequence() -transforms.add_validate(SCHEMA) +transforms.add_validate(TaskContextSchema) @transforms.add diff --git a/template/{{cookiecutter.project_name}}/taskcluster/{{cookiecutter.project_slug}}_taskgraph/transforms/hello.py b/template/{{cookiecutter.project_name}}/taskcluster/{{cookiecutter.project_slug}}_taskgraph/transforms/hello.py index f63b5feab..22100d1e5 100644 --- a/template/{{cookiecutter.project_name}}/taskcluster/{{cookiecutter.project_slug}}_taskgraph/transforms/hello.py +++ b/template/{{cookiecutter.project_name}}/taskcluster/{{cookiecutter.project_slug}}_taskgraph/transforms/hello.py @@ -1,17 +1,13 @@ -from voluptuous import ALLOW_EXTRA, Required - from taskgraph.transforms.base import TransformSequence -from taskgraph.util.schema import LegacySchema +from taskgraph.util.schema import Schema + + +class HelloSchema(Schema): + noun: str # Required field -HELLO_SCHEMA = LegacySchema( - { - Required("noun"): str, - }, - extra=ALLOW_EXTRA, -) transforms = TransformSequence() -transforms.add_validate(HELLO_SCHEMA) +transforms.add_validate(HelloSchema) @transforms.add diff --git a/test/test_parameters.py b/test/test_parameters.py index d1dcfb992..d833fbeca 100644 --- a/test/test_parameters.py +++ b/test/test_parameters.py @@ -11,7 +11,6 @@ import mozilla_repo_urls import pytest -from voluptuous import Optional, Required, Schema import taskgraph # noqa: F401 from taskgraph import parameters @@ -21,6 +20,7 @@ extend_parameters_schema, load_parameters_file, ) +from taskgraph.util.schema import Schema from .mockedopen import MockedOpen @@ -274,46 +274,74 @@ def test_parameters_format_spec(spec, expected): def test_extend_parameters_schema(monkeypatch): - monkeypatch.setattr( - parameters, - "base_schema", - Schema( - { - Required("foo"): str, - } - ), - ) + """Test parameter extension with msgspec schemas.""" + + # Define a test extension schema that adds new fields + class ExtensionSchema(Schema): + custom_field: str + optional_field: bool = False # Optional with default + + # Reset global _schema_extensions + monkeypatch.setattr(parameters, "_schema_extensions", []) + + # Keep the default functions monkeypatch.setattr( parameters, "defaults_functions", list(parameters.defaults_functions), ) - with pytest.raises(ParameterMismatch): - Parameters(strict=False).check() - - with pytest.raises(ParameterMismatch): - Parameters(foo="1", bar=True).check() - + # Extend the parameters schema with our custom schema extend_parameters_schema( - { - Optional("bar"): bool, + ExtensionSchema, + defaults_fn=lambda root: { + "custom_field": "default_value", + "optional_field": True, }, - defaults_fn=lambda root: {"foo": "1", "bar": False}, ) - params = Parameters(foo="1", bar=True) - params.check() - assert params["bar"] is True - - params = Parameters(foo="1") + # Verify the extension was added + assert ExtensionSchema in parameters._schema_extensions + + # Test with extended fields in strict mode + # Need to include all required base fields too + params = Parameters( + base_repository="https://example.com/repo", + base_ref="main", + base_rev="abc123", + build_date=1234567890, + build_number=1, + do_not_optimize=[], + enable_always_target=True, + existing_tasks={}, + files_changed=[], + filters=["target_tasks_method"], + head_ref="main", + head_repository="https://example.com/repo", + head_rev="abc123", + head_tag="", + level="3", + moz_build_date="20240101120000", + optimize_target_tasks=True, + owner="test@example.com", + project="test", + pushdate=1234567890, + pushlog_id="0", + repository_type="git", + target_tasks_method="default", + tasks_for="testing", + custom_field="my_value", # Extension field + optional_field=False, # Extension field + ) params.check() - assert "bar" not in params + assert params["custom_field"] == "my_value" + assert params["optional_field"] is False + # Test with defaults in non-strict mode params = Parameters(strict=False) params.check() - assert params["foo"] == "1" - assert params["bar"] is False + assert params["custom_field"] == "default_value" + assert params["optional_field"] is True @pytest.mark.parametrize( diff --git a/test/test_transforms_run_run_task.py b/test/test_transforms_run_run_task.py index 2ef25b9f2..f29876677 100644 --- a/test/test_transforms_run_run_task.py +++ b/test/test_transforms_run_run_task.py @@ -10,7 +10,7 @@ from taskgraph.transforms.run import make_task_description from taskgraph.transforms.task import payload_builders, set_defaults from taskgraph.util.caches import CACHES -from taskgraph.util.schema import LegacySchema, validate_schema +from taskgraph.util.schema import validate_schema from taskgraph.util.taskcluster import get_root_url from taskgraph.util.templates import merge @@ -257,10 +257,6 @@ def inner(task, **kwargs): print("Dumping for copy/paste:") pprint(caches, indent=2) - # Create a new schema object with just the part relevant to caches. - partial_schema = LegacySchema(payload_builders[impl].schema.schema[key]) - validate_schema(partial_schema, caches, "validation error") - return caches return inner diff --git a/test/test_util_schema.py b/test/test_util_schema.py index 3364b2813..4c1135b56 100644 --- a/test/test_util_schema.py +++ b/test/test_util_schema.py @@ -4,61 +4,82 @@ import unittest +import msgspec import pytest -from voluptuous import Invalid, MultipleInvalid import taskgraph from taskgraph.util.schema import ( - LegacySchema, + Schema, optionally_keyed_by, resolve_keyed_by, validate_schema, ) -schema = LegacySchema( - { - "x": int, - "y": str, - } -) + +class SimpleTestSchema(Schema, rename=None, omit_defaults=False): + x: int + y: str + + +struct = SimpleTestSchema class TestValidateSchema(unittest.TestCase): def test_valid(self): - validate_schema(schema, {"x": 10, "y": "foo"}, "pfx") + validate_schema(struct, {"x": 10, "y": "foo"}, "pfx") def test_invalid(self): try: - validate_schema(schema, {"x": "not-int"}, "pfx") + validate_schema(struct, {"x": "not-int"}, "pfx") self.fail("no exception raised") except Exception as e: - self.assertTrue(str(e).startswith("pfx\n")) + # Our new implementation includes pfx in the error message + self.assertTrue("pfx" in str(e)) class TestCheckSchema(unittest.TestCase): def test_schema(self): - "Creating a schema applies taskgraph checks." - with self.assertRaises(Exception): - LegacySchema({"camelCase": int}) + "Creating a msgspec schema works correctly." + + class CamelCaseSchema( + Schema, rename=None, omit_defaults=False, forbid_unknown_fields=False + ): + camelCase: int - def test_extend_schema(self): - "Extending a schema applies taskgraph checks." - with self.assertRaises(Exception): - LegacySchema({"kebab-case": int}).extend({"camelCase": int}) + struct = CamelCaseSchema + # Test that it validates correctly + result = struct.validate({"camelCase": 42}) + assert result.camelCase == 42 - def test_extend_schema_twice(self): - "Extending a schema twice applies taskgraph checks." - with self.assertRaises(Exception): - LegacySchema({"kebab-case": int}).extend({"more-kebab": int}).extend( - {"camelCase": int} - ) + with self.assertRaises(msgspec.ValidationError): + struct.validate({"camelCase": "not-an-int"}) + + def test_extend_not_supported(self): + "Extension is not supported for msgspec schemas." + + class SimpleSchema( + Schema, rename=None, omit_defaults=False, forbid_unknown_fields=False + ): + kebab_case: int + + struct = SimpleSchema + # Schema classes no longer have extend method + self.assertFalse(hasattr(struct, "extend")) def test_check_skipped(monkeypatch): - """Schema not validated if 'check=False' or taskgraph.fast is unset.""" - LegacySchema({"camelCase": int}, check=False) # assert no exception + """Schema not validated if taskgraph.fast is set.""" + + class SimpleSchema( + Schema, rename=None, omit_defaults=False, forbid_unknown_fields=False + ): + value: int + monkeypatch.setattr(taskgraph, "fast", True) - LegacySchema({"camelCase": int}) # assert no exception + struct = SimpleSchema + # When fast mode is on, validation is skipped + result = struct.validate({"value": "not-an-int"}) # Should not raise + assert result == {"value": "not-an-int"} class TestResolveKeyedBy(unittest.TestCase): @@ -238,29 +259,67 @@ def test_no_key(self): def test_optionally_keyed_by(): + # Test voluptuous behavior (default) validator = optionally_keyed_by("foo", str) - assert validator("baz") == "baz" - assert validator({"by-foo": {"a": "b", "c": "d"}}) == {"a": "b", "c": "d"} + # It returns a validator function + assert callable(validator) - with pytest.raises(Invalid): - validator({"by-foo": {"a": 1, "c": "d"}}) + # Test msgspec behavior + type_annotation = optionally_keyed_by("foo", str, use_msgspec=True) - with pytest.raises(MultipleInvalid): - validator({"by-bar": {"a": "b"}}) + # Create a struct with this type annotation to test validation + class TestSchema(Schema, forbid_unknown_fields=False): + value: type_annotation + + # Test that a simple string is accepted + result = msgspec.convert({"value": "baz"}, TestSchema) + assert result.value == "baz" + + # Test that keyed-by structure is accepted and works + result = msgspec.convert({"value": {"by-foo": {"a": "b", "c": "d"}}}, TestSchema) + assert result.value == {"by-foo": {"a": "b", "c": "d"}} + + # Test that invalid value types are rejected + with pytest.raises(msgspec.ValidationError): + msgspec.convert({"value": {"by-foo": {"a": 1, "c": "d"}}}, TestSchema) + + # Test that unknown by-keys are rejected due to Literal constraint + with pytest.raises(msgspec.ValidationError): + msgspec.convert({"value": {"by-bar": {"a": "b"}}}, TestSchema) def test_optionally_keyed_by_mulitple_keys(): + # Test voluptuous behavior (default) validator = optionally_keyed_by("foo", "bar", str) - assert validator("baz") == "baz" - assert validator({"by-foo": {"a": "b", "c": "d"}}) == {"a": "b", "c": "d"} - assert validator({"by-bar": {"x": "y"}}) == {"x": "y"} - assert validator({"by-foo": {"a": {"by-bar": {"x": "y"}}}}) == {"a": {"x": "y"}} + assert callable(validator) + + # Test msgspec behavior + type_annotation = optionally_keyed_by("foo", "bar", str, use_msgspec=True) + + # Create a struct with this type annotation to test validation + class TestSchema(Schema, forbid_unknown_fields=False): + value: type_annotation + + # Test that a simple string is accepted + result = msgspec.convert({"value": "baz"}, TestSchema) + assert result.value == "baz" + + # Test that keyed-by with "foo" is accepted + result = msgspec.convert({"value": {"by-foo": {"a": "b", "c": "d"}}}, TestSchema) + assert result.value == {"by-foo": {"a": "b", "c": "d"}} + + # Test that keyed-by with "bar" is accepted + result = msgspec.convert({"value": {"by-bar": {"x": "y"}}}, TestSchema) + assert result.value == {"by-bar": {"x": "y"}} - with pytest.raises(Invalid): - validator({"by-foo": {"a": 123, "c": "d"}}) + # Test that invalid value types in by-foo are rejected + with pytest.raises(msgspec.ValidationError): + msgspec.convert({"value": {"by-foo": {"a": 123, "c": "d"}}}, TestSchema) - with pytest.raises(MultipleInvalid): - validator({"by-bar": {"a": 1}}) + # Test that invalid value types in by-bar are rejected + with pytest.raises(msgspec.ValidationError): + msgspec.convert({"value": {"by-bar": {"a": 1}}}, TestSchema) - with pytest.raises(MultipleInvalid): - validator({"by-unknown": {"a": "b"}}) + # Test that unknown by-keys are rejected due to Literal constraint + with pytest.raises(msgspec.ValidationError): + msgspec.convert({"value": {"by-unknown": {"a": "b"}}}, TestSchema)