Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 156 additions & 0 deletions docs/howto/define-schemas.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
Define Schemas
==============

:doc:`Transforms </concepts/transforms>` can define schemas to validate task data at each step of the
pipeline. Taskgraph uses `msgspec`_ under the hood, and provides a
:class:`~taskgraph.util.schema.Schema` base class that integrates with
:meth:`TransformSequence.add_validate() <taskgraph.transforms.base.TransformSequence.add_validate>`.

There are two ways to define a schema: the **class based** approach and the
**dict based** approach. Both produce equivalent results; which one you prefer
is a matter of style.

.. _msgspec: https://jcristharif.com/msgspec/


Class Based Schemas
-------------------

Subclass :class:`~taskgraph.util.schema.Schema` and declare fields as class
attributes with type annotations:

.. code-block:: python

from typing import Optional
from taskgraph.transforms.base import TransformSequence
from taskgraph.util.schema import Schema

class MySubConfig(Schema):
total_num: int
fields: list[str] = []

class MySchema(Schema, forbid_unknown_fields=False):
config: Optional[MySubConfig] = None

transforms = TransformSequence()
transforms.add_validate(MySchema)

A few things to note:

- Field names use ``snake_case`` in Python but are **automatically renamed to
``kebab-case``** in YAML. So ``total_num`` in Python matches
``total-num`` in YAML.
- ``Optional[T]`` fields default to ``None`` unless you supply an explicit
default.
- Fields without a default are **required**.
- ``forbid_unknown_fields=True`` (the default) causes validation to fail if the
task data contains keys that are not declared in the schema. Set it to
``False`` on outer schemas so that fields belonging to later transforms are
not rejected.


Dict Based Schemas
------------------

Call :meth:`Schema.from_dict() <taskgraph.util.schema.Schema.from_dict>` with a
dictionary mapping field names to ``type`` or ``(type, default)`` tuples:

.. code-block:: python

from typing import Optional, Union
from taskgraph.transforms.base import TransformSequence
from taskgraph.util.schema import Schema

MySchema = Schema.from_dict(
{
"config": Schema.from_dict(
{
"total-num": int,
"fields": list[str] = []
},
optional=True,
),
},
forbid_unknown_fields=False,
)

transforms = TransformSequence()
transforms.add_validate(MySchema)

This example is equivalent to the first example. One advantage with the dict based approach
is that you can write keys in **kebab-case** directly.

Field specifications follow these rules:

- A bare type (e.g. ``str``) means the field is required.
- ``Optional[T]`` means the field is optional and defaults to ``None``.
- A ``(type, default)`` tuple supplies an explicit default, e.g.
``(list[str], [])``.

Keyword arguments to ``from_dict`` are forwarded to ``msgspec.defstruct``.
The most commonly used ones are ``name`` (for better error messages) and
``forbid_unknown_fields``.

.. note::
``Schema.from_dict`` does **not** apply ``rename="kebab"`` automatically,
because you can express the kebab-case names directly in the dict keys.
Underscores in dict keys stay as underscores and dashes become valid
kebab-case field names.


Nesting Schemas
---------------

Both approaches support nesting:

.. code-block:: python

# Class-based nesting
class Inner(Schema):
value: str

class Outer(Schema, forbid_unknown_fields=False, kw_only=True):
inner: Optional[Inner] = None

# Dict-based nesting
Outer = Schema.from_dict(
{
"inner": Schema.from_dict({"value": str}, optional=True),
},
forbid_unknown_fields=False,
)

Pass ``optional=True`` to ``from_dict`` to make the whole nested schema
optional. This is necessary as function calls are not allowed in type
annotations, so ``Optional[Schema.from_dict(...)]`` is not valid Python.


Mutually Exclusive Fields
-------------------------

Use the ``exclusive`` keyword to declare groups of fields where at most one
may be set at a time:

.. code-block:: python

# Class-based
class MySchema(Schema, exclusive=[["field_a", "field_b"]]):
field_a: Optional[str] = None
field_b: Optional[str] = None

# Dict-based
MySchema = Schema.from_dict(
{
"field-a": Optional[str],
"field-b": Optional[str],
},
exclusive=[["field_a", "field_b"]],
)

``exclusive`` takes a list of groups, where each group is a list of field
names (Python ``snake_case``). A validation error is raised if more than one
field in a group is set.

.. note::
When using ``exclusive`` with the dict-based approach, refer to fields by
their Python attribute names (``snake_case``), not their YAML keys.
1 change: 1 addition & 0 deletions docs/howto/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ A collection of how-to guides.
run-locally
debugging
bootstrap-taskgraph
define-schemas
resolve-keyed-by
use-fetches
docker
Expand Down
80 changes: 40 additions & 40 deletions src/taskgraph/transforms/from_deps.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,46 +22,46 @@
from taskgraph.util.schema import Schema, validate_schema
from taskgraph.util.set_name import SET_NAME_MAP


class FromDepsConfig(Schema):
# Limit dependencies to specified kinds (defaults to all kinds in
# `kind-dependencies`).
#
# The first kind in the list is the "primary" kind. The
# dependency of this kind will be used to derive the label
# and copy attributes (if `copy-attributes` is True).
kinds: Optional[list[str]] = None
# Set-name function (dynamic: validated at runtime against SET_NAME_MAP).
set_name: Optional[Union[bool, str, dict[str, object]]] = None
# Limit dependencies to tasks whose attributes match
# using :func:`~taskgraph.util.attributes.attrmatch`.
with_attributes: Optional[dict[str, Union[list, str]]] = None
# Group cross-kind dependencies using the given group-by
# function. One task will be created for each group. If not
# specified, the 'single' function will be used which creates
# a new task for each individual dependency.
group_by: Optional[Union[str, dict[str, object]]] = None
# If True, copy attributes from the dependency matching the
# first kind in the `kinds` list (whether specified explicitly
# or taken from `kind-dependencies`).
copy_attributes: Optional[bool] = None
# If true (the default), there must be only a single unique task
# for each kind in a dependency group. Setting this to false
# disables that requirement.
unique_kinds: Optional[bool] = None
# If present, a `fetches` entry will be added for each task
# dependency. Attributes of the upstream task may be used as
# substitution values in the `artifact` or `dest` values of the
# `fetches` entry.
fetches: Optional[dict[str, list[FetchesEntrySchema]]] = None


#: Schema for from_deps transforms
class FromDepsSchema(Schema, forbid_unknown_fields=False, kw_only=True):
from_deps: FromDepsConfig


FROM_DEPS_SCHEMA = FromDepsSchema
FROM_DEPS_SCHEMA = Schema.from_dict(
{
"from-deps": Schema.from_dict(
{
# Limit dependencies to specified kinds (defaults to all kinds in
# `kind-dependencies`).
#
# The first kind in the list is the "primary" kind. The
# dependency of this kind will be used to derive the label
# and copy attributes (if `copy-attributes` is True).
"kinds": Optional[list[str]],
# Set-name function (dynamic: validated at runtime against SET_NAME_MAP).
"set-name": Optional[Union[bool, str, dict[str, object]]],
# Limit dependencies to tasks whose attributes match
# using :func:`~taskgraph.util.attributes.attrmatch`.
"with-attributes": Optional[dict[str, Union[list, str]]],
# Group cross-kind dependencies using the given group-by
# function. One task will be created for each group. If not
# specified, the 'single' function will be used which creates
# a new task for each individual dependency.
"group-by": Optional[Union[str, dict[str, object]]],
# If True, copy attributes from the dependency matching the
# first kind in the `kinds` list (whether specified explicitly
# or taken from `kind-dependencies`).
"copy-attributes": Optional[bool],
# If true (the default), there must be only a single unique task
# for each kind in a dependency group. Setting this to false
# disables that requirement.
"unique-kinds": Optional[bool],
# If present, a `fetches` entry will be added for each task
# dependency. Attributes of the upstream task may be used as
# substitution values in the `artifact` or `dest` values of the
# `fetches` entry.
"fetches": Optional[dict[str, list[FetchesEntrySchema]]],
},
),
},
name="FromDepsSchema",
forbid_unknown_fields=False,
)

transforms = TransformSequence()
transforms.add_validate(FROM_DEPS_SCHEMA)
Expand Down
81 changes: 81 additions & 0 deletions src/taskgraph/util/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.

import inspect
import pprint
import re
import threading
Expand Down Expand Up @@ -318,6 +319,11 @@ def __getitem__(self, item):
return self.schema[item] # type: ignore


def _caller_module_name(depth=1):
frame = inspect.stack()[depth + 1].frame
return frame.f_globals.get("__name__", "schema")


class Schema(
msgspec.Struct,
kw_only=True,
Expand Down Expand Up @@ -345,6 +351,11 @@ class MySchema(Schema, forbid_unknown_fields=False, kw_only=True):
foo: str
"""

def __init_subclass__(cls, exclusive=None, **kwargs):
super().__init_subclass__(**kwargs)
if exclusive is not None:
cls.exclusive = exclusive

def __post_init__(self):
if taskgraph.fast:
return
Expand All @@ -370,6 +381,76 @@ def __post_init__(self):

keyed_by.validate(obj)

# Validate mutually exclusive field groups.
for group in getattr(self, "exclusive", []):
set_fields = [f for f in group if getattr(self, f) is not None]
if len(set_fields) > 1:
raise ValueError(
f"{' and '.join(repr(f) for f in set_fields)} are mutually exclusive"
)

@classmethod
def from_dict(
cls,
fields_dict: dict[str, Any],
name: Optional[str] = None,
optional: bool = False,
**kwargs,
) -> Union[type[msgspec.Struct], type[Optional[msgspec.Struct]]]:
"""Create a Schema subclass dynamically from a dict of field definitions.

Each key is a field name and each value is either a type annotation or a
``(type, default)`` tuple. Fields typed as ``Optional[...]`` automatically
receive a default of ``None`` when no explicit default is provided.

Usage::

Schema.from_dict("MySchema", {
"required_field": str,
"optional_field": Optional[int], # default None inferred
"explicit_default": (list[str], []), # explicit default
})

Keyword arguments are forwarded to ``msgspec.defstruct`` (e.g.
``forbid_unknown_fields=False``).
"""
# Don't use `rename=kebab` by default as we can define kebab case
# properly in dicts.
kwargs.setdefault("rename", None)

# Ensure name and module are set correctly for error messages.
caller_module = _caller_module_name()
kwargs.setdefault("module", caller_module)
name = name or caller_module.rsplit(".", 1)[-1]

fields = []
for field_name, field_spec in fields_dict.items():
python_name = field_name.replace("-", "_")

if isinstance(field_spec, tuple):
typ, default = field_spec
else:
typ = field_spec
if get_origin(typ) is Union and type(None) in get_args(typ):
default = None
else:
default = msgspec.NODEFAULT

if field_name != python_name:
# Use msgspec.field to preserve the kebab-case encoded name.
# Explicit field names take priority over the struct-level rename.
fields.append(
(python_name, typ, msgspec.field(name=field_name, default=default))
)
else:
fields.append((python_name, typ, default))

exclusive = kwargs.pop("exclusive", None)
result = msgspec.defstruct(name or "Schema", fields, bases=(cls,), **kwargs)
if exclusive:
result.exclusive = exclusive # type: ignore[attr-defined]
return Optional[result] if optional else result # type: ignore[valid-type]

@classmethod
def validate(cls, data):
"""Validate data against this schema."""
Expand Down
Loading
Loading