Jelajahi Sumber

feat: initial pass of features, some outstanding TODO remain

Sam Jaffe 1 bulan lalu
induk
melakukan
a4544bc8c8
6 mengubah file dengan 443 tambahan dan 0 penghapusan
  1. 49 0
      src/cipy/__init__.py
  2. 129 0
      src/cipy/action.py
  3. 131 0
      src/cipy/common.py
  4. 0 0
      src/cipy/py.typed
  5. 77 0
      src/cipy/runner.py
  6. 57 0
      src/cipy/workflow.py

+ 49 - 0
src/cipy/__init__.py

@@ -0,0 +1,49 @@
+"""
+Entry point for cipy library, re-exporting all of the default items
+"""
+import typing
+
+import pydantic
+
+from cipy.action import Composite, NodeScript, Script
+from cipy.common import Action, Context, Factory, Inputs, Outputs, Ref, Status
+from cipy.workflow import Job, Workflow
+
+__all__ = [
+    "Action",
+    "Composite",
+    "Context",
+    "Inputs",
+    "Job",
+    "NodeScript",
+    "Outputs",
+    "Script",
+    "Status",
+    "Workflow",
+    "compute",
+    "context",
+    "required",
+]
+
+
+def required() -> typing.Any:
+    """
+    Mark a field as required (no default value), but defer validation.
+    """
+    return pydantic.Field(default=None)
+
+
+def context(arg: str, /) -> typing.Any:
+    """
+    Mark a field as synthesized by reading a value from somewhere in the
+    overall runtime CI context.
+    """
+    return pydantic.Field(default=Ref(arg))
+
+
+def compute(arg: typing.Callable[[Context], typing.Any], /) -> typing.Any:
+    """
+    Mark a field as synthesized from a function such as an if-else condition
+    based on the runtime CI context.
+    """
+    return pydantic.Field(default=Factory(arg))

+ 129 - 0
src/cipy/action.py

@@ -0,0 +1,129 @@
+"""Module containing basic Action definitions, which perform linear operations"""
+import pathlib
+import shutil
+import subprocess
+import tempfile
+
+from enum import StrEnum, auto
+from typing import final
+
+from pydantic import Field, PrivateAttr
+
+import cipy.runner
+from cipy.common import Action, Context, Outputs, Status
+
+
+class Shell(StrEnum):
+    """Enumeration of shells that this tool knows how to run natively"""
+    SH = auto()
+    BASH = auto()
+    PYTHON = auto()
+
+
+class NodeScript(Action):
+    """
+    A special script that is run as a node.js file, with optional post-script
+    for cleaning up the environment.
+    """
+    version: str = Field(default="node24", pattern="node\\d+")
+    main: pathlib.Path
+    post: pathlib.Path | None = None
+
+    @final
+    def run(self, context: Context) -> Status:
+        with cipy.runner.pipe(self):
+            try:
+                subprocess.run(["node", str(self.main)], check=True)
+                return Status.SUCCESS
+            except subprocess.CalledProcessError:
+                return Status.FAILURE
+
+    @final
+    def cleanup(self, context: Context) -> None:
+        if self.post is None:
+            return
+        try:
+            subprocess.run(["node", str(self.post)], check=True)
+        except subprocess.CalledProcessError:
+            pass
+
+
+class Script(Action):
+    """Action descriptor for a generic shell runner"""
+    shell: Shell | None = None
+    script: str
+
+    def command(self, script: str) -> list[str]:
+        """
+        Helper function to transform the shell into a command line.
+        Requires the script file as an argument because some shells may wrap the
+        script instead of simply passing it as a raw argument (Powershell).
+        """
+        if self.shell is None:
+            return [
+                "bash" if shutil.which("bash") else "sh",
+                "-e",
+                str(script),
+            ]
+
+        match self.shell:
+            case Shell.BASH:
+                return [
+                    "bash",
+                    "--noprofile",
+                    "--norc",
+                    "-eo",
+                    "pipefail",
+                    str(script),
+                ]
+            case Shell.PYTHON:
+                return ["python", str(script)]
+            case Shell.SH:
+                return ["sh", "-e", str(script)]
+
+    @final
+    def run(self, context: Context) -> Status:
+        with tempfile.TemporaryFile(mode="w+") as script, cipy.runner.pipe(self):
+            script.write(self.script)
+            try:
+                subprocess.run(self.command(script.name), check=True)
+                return Status.SUCCESS
+            except subprocess.CalledProcessError:
+                return Status.FAILURE
+
+
+class Composite(Action):
+    """
+    Action descriptor for a linear sequence of child actions.
+    Inputs are isolated from the individual steps by default, and Outputs are
+    synthesized from the step outputs by defining an output object.
+    """
+    name: str = ""
+    steps: list[Action] = Field(frozen=True)
+    _counter: int = PrivateAttr(default=0)
+
+    def _tick(self) -> None:
+        self._counter += 1
+
+    def _outputs(self) -> dict[str, Outputs]:
+        return {s.id: s.outputs for s in self.steps if s.id}
+
+    @final
+    def run(self, context: Context) -> Status:
+        status = Status.SKIPPED
+
+        for step in self.steps:
+            status = cipy.runner.run(context, status, step, pre_validate=self._tick)
+
+        with context.extend(steps=self._outputs) as outctx:
+            outctx.fabricate(self, "outputs")
+
+        return status
+
+    @final
+    def cleanup(self, context: Context) -> None:
+        if not hasattr(self, "counter"):
+            # We never ran, so we don't need to run the cleanup
+            return
+        for step in reversed(self.steps[0 : self._counter]):
+            step.cleanup(context)

+ 131 - 0
src/cipy/common.py

@@ -0,0 +1,131 @@
+"""Common objects and base classes in the CI hierarchy"""
+
+import abc
+import os
+
+from contextlib import contextmanager
+from dataclasses import dataclass
+from enum import Enum, auto
+from functools import reduce
+from types import SimpleNamespace
+from typing import Any, Callable, Iterator, Literal, overload
+
+from pydantic import BaseModel, Field
+
+
+class Status(Enum):
+    """Result status of a runner, higher numbers take priority"""
+
+    SKIPPED = auto()
+    SUCCESS = auto()
+    FAILURE = auto()
+    CANCELLED = auto()
+
+
+class Inputs(BaseModel):
+    """Stub class describing input arguments"""
+
+
+class Outputs(BaseModel):
+    """Stub class describing result data"""
+
+
+class Ref(str):
+    """Annotation class describing a reference into Context or another place"""
+
+
+@dataclass
+class Factory:
+    """Annotation class describing a non-trivial synthesized property"""
+
+    __call__: Callable[[Context], Any]
+
+
+class Context(SimpleNamespace):
+    """Wrapper class for the context of the CI runtime"""
+
+    def access(self, ctx: str) -> Any:
+        """Accessor for context state with a dot-separated path"""
+        path = ctx.split(".")
+        if path[0] == "context":
+            return reduce(getattr, path[1:], self)
+
+        if path[0] == "env":
+            assert len(path) == 2
+            return os.environ.get(path[1])
+
+        raise KeyError(ctx)
+
+    @overload
+    def fabricate(self, state: BaseModel, attr: Literal["inputs"]) -> Inputs: ...
+
+    @overload
+    def fabricate(self, state: BaseModel, attr: Literal["outputs"]) -> Outputs: ...
+
+    def fabricate(
+        self, state: BaseModel, attr: Literal["inputs"] | Literal["outputs"]
+    ) -> Inputs | Outputs:
+        """Fabricate and validate an Inputs or Outputs object"""
+        model = getattr(state, attr)
+        if model is None:
+            annotation = state.__pydantic_fields__[attr].annotation
+            assert annotation is not None
+            model = annotation()
+
+        for k, field in model.__pydantic_fields__.items():
+            value = getattr(model, k)
+            if field.annotation is None:
+                continue
+
+            if isinstance(value, Ref):
+                setattr(model, k, self.access(value))
+            elif isinstance(value, Factory):
+                setattr(model, k, value(self))
+
+        _validate(model)
+        setattr(state, attr, model)
+        return model
+
+    @contextmanager
+    def extend(self, **kwargs: Any) -> Iterator[Context]:
+        """Create a new context that inherits an extra property"""
+        try:
+            yield Context(**vars(self), **kwargs)
+        finally:
+            pass
+
+
+class Action(BaseModel, abc.ABC):
+    """Base class describing all individual steps in a CI workflow"""
+
+    name: str
+    id: str = Field(default="", exclude_if=lambda v: not v)
+    inputs: Inputs = Inputs()
+    outputs: Outputs = Outputs()
+
+    # pylint: disable=unused-argument
+    def enabled(self, status: Status, context: Context) -> bool:
+        """Should this action even be run?"""
+        return status == Status.SUCCESS
+
+    @abc.abstractmethod
+    def run(self, context: Context) -> Status:
+        """Run this action, updating outputs internally or via the cipy.runner.run wrapper"""
+
+    def cleanup(self, context: Context) -> None:
+        """Optionally clean up after ourselves"""
+
+
+def _validate(model: BaseModel):
+    """Perform the actual model validation that we sabotaged w/ required() and similar functions"""
+    for k, field in model.__pydantic_fields__.items():
+        attr = getattr(model, k)
+        if field.annotation is None:
+            continue
+
+        if isinstance(attr, (Ref, Factory)):
+            raise TypeError(f"field '{k}' in {type(model)} is unset")
+        if not isinstance(attr, field.annotation):
+            raise TypeError(
+                f"field '{k}' in {type(model)} is of the wrong type (should be {field.annotation})"
+            )

+ 0 - 0
src/cipy/py.typed


+ 77 - 0
src/cipy/runner.py

@@ -0,0 +1,77 @@
+"""
+Common functions for setting up/tearing down environments for running an action.
+"""
+import os
+import re
+import tempfile
+
+from contextlib import contextmanager
+from typing import Any, Callable, Iterator
+
+from cipy.common import Action, Context, Status, _validate
+
+
+@contextmanager
+def environ(*, error_on_override: bool = True, **overrides: Any) -> Iterator[None]:
+    """Override the shell environment for the duration of this context"""
+    unset = {key for key in overrides if key not in os.environ}
+    priors = {key: os.environ[key] for key in overrides if key not in unset}
+
+    if error_on_override and priors:
+        raise ValueError(f"clobbering environment variable(s): {priors.keys()}")
+
+    try:
+        for key, value in overrides.items():
+            os.environ[key] = str(value)
+        yield
+    finally:
+        for key in unset:
+            del os.environ[key]
+        for key, value in priors.items():
+            os.environ[key] = value
+
+
+@contextmanager
+def pipe(action: Action) -> Iterator[None]:
+    """
+    IPC tool for passing inputs and outputs between an Action that is
+    implemented as some manner of script.
+
+    All of the Action.Inputs values will be mapped to environment variables
+    prefixed with "INPUT_".
+
+    Action.Outputs and output environment variables will be writiable into
+    special temporary files, which will then be read into the context.
+    """
+    inputs = {"INPUT_" + re.sub("[ -]", "_", k): v for k, v in action.inputs}
+    with (
+        tempfile.TemporaryFile(mode="w+") as output,
+        tempfile.TemporaryFile(mode="w+") as envfile,
+        environ(CI_OUTPUT=output.name, CI_ENVIRON=envfile.name, **inputs),
+    ):
+        yield
+        # TODO: Compute output
+
+
+def run(
+    context: Context,
+    status: Status,
+    action: Action,
+    *,
+    pre_validate: Callable[[], None] | None = None,
+) -> Status:
+    """
+    General executor for an action - guarantees that we have constructed and
+    validated the Action.Inputs data.
+
+    Allows for a callback hook in between the run finishing and output
+    validation.
+    """
+    action.inputs = context.fabricate(action, "inputs")
+
+    stat = action.run(context)
+    if pre_validate:
+        pre_validate()
+
+    _validate(action.outputs)
+    return stat if stat.value > status.value else status

+ 57 - 0
src/cipy/workflow.py

@@ -0,0 +1,57 @@
+"""Module containing basic Workflow definitions, which perform non-linear operations"""
+from typing import Any, final, override
+
+from pydantic import BaseModel, PrivateAttr
+
+import cipy.runner
+from cipy.common import Action, Context, Outputs, Status
+
+
+class Job(BaseModel):
+    """A wrapper for a graph node with edges"""
+    id: str
+    needs: list[str] = []
+    action: Action
+
+
+class Workflow(Action):
+    """An implementation of a flow-graph based CI workflow"""
+    jobs: list[Job]
+    _depends: dict[str, list[str]] = PrivateAttr()
+
+    @override
+    def model_post_init(self, context: Any, /) -> None:
+        for j in self.jobs:
+            assert all(need in self.jobs for need in j.needs)
+        self._depends = {j.id: list(j.needs) for j in self.jobs}
+
+    def _finished(self, jid: str) -> None:
+        for j in self.jobs:
+            try:
+                j.needs.remove(jid)
+            except ValueError:
+                pass
+
+    def _outputs(self) -> dict[str, Outputs]:
+        return {j.id: j.action.outputs for j in self.jobs}
+
+    @final
+    def run(self, context: Context) -> Status:
+        status = Status.SKIPPED
+        visited = set()
+
+        def _next():
+            return next(
+                iter(j for j in self.jobs if j.id not in visited and not j.needs), None
+            )
+
+        while job := _next():
+            visited.add(job.id)
+            status = cipy.runner.run(
+                context, status, job.action, pre_validate=lambda: self._finished(job.id)
+            )
+
+        with context.extend(needs=self._outputs) as outctx:
+            outctx.fabricate(self, "outputs")
+
+        return status