Forráskód Böngészése

refactor: separate out some files to change the model around a bit

Sam Jaffe 4 hete
szülő
commit
1b9320305f
7 módosított fájl, 255 hozzáadás és 228 törlés
  1. 14 7
      src/cipy/__init__.py
  2. 31 71
      src/cipy/action.py
  3. 4 146
      src/cipy/common.py
  4. 120 0
      src/cipy/context.py
  5. 3 3
      src/cipy/runner.py
  6. 81 0
      src/cipy/script.py
  7. 2 1
      src/cipy/workflow.py

+ 14 - 7
src/cipy/__init__.py

@@ -9,8 +9,10 @@ import sys
 
 import pydantic
 
-from cipy.action import Call, Composite, NodeScript, Script
-from cipy.common import Context, Factory, Inputs, Outputs, Ref
+from cipy.action import Action, Call, Composite
+from cipy.common import Inputs, Outputs, Ref
+from cipy.context import Context, Factory
+from cipy.script import NodeScript, Script
 from cipy.shell import Shell
 from cipy.status import Status
 from cipy.workflow import Job, Matrix, MatrixParams, Workflow
@@ -31,19 +33,24 @@ logging.basicConfig(
 )
 
 __all__ = [
+    # Basic Datamodels
+    "Inputs",
+    "Outputs",
+    "Context",
+    "Status",
+    # Actions (Linear)
+    "Action",
     "Call",
     "Composite",
-    "Context",
-    "Inputs",
-    "Job",
     "Matrix",
     "MatrixParams",
     "NodeScript",
-    "Outputs",
     "Script",
     "Shell",
-    "Status",
+    # Workflow (Non-Linear)
     "Workflow",
+    "Job",
+    # Helpers
     "compute",
     "context",
     "outputs",

+ 31 - 71
src/cipy/action.py

@@ -1,22 +1,42 @@
 """Module containing basic Action definitions, which perform linear operations"""
 
-import pathlib
-import tempfile
+import abc
+import logging
 
-from textwrap import dedent
 from typing import Any, ClassVar, final
 
-from chevron import render  # type: ignore[import-untyped]
-from colored import Fore, Style
-from pydantic import Field, PrivateAttr
+from pydantic import BaseModel, Field, PrivateAttr
 
-import cipy.runner
 from cipy import settings
-from cipy.common import Action, Context, Factory, Ref, Results
-from cipy.shell import Shell
+from cipy.common import Inputs, Outputs
+from cipy.context import Context, Results, Value
 from cipy.status import Status
 
-from . import _io
+
+class Action(BaseModel, abc.ABC):
+    """Base class describing all individual steps in a CI workflow"""
+
+    name: str
+    id: str = Field(default="", exclude_if=lambda v: not v)
+    inputs: Inputs = Inputs()
+    outputs: Outputs = Outputs()
+
+    @property
+    def logger(self) -> logging.Logger:
+        """Get this class's logger"""
+        return logging.getLogger(self.__class__.__name__)
+
+    # pylint: disable=unused-argument
+    def enabled(self, status: Status, context: Context) -> bool:
+        """Should this action even be run?"""
+        return status.value <= Status.SUCCESS.value
+
+    @abc.abstractmethod
+    def run(self, context: Context) -> Status:
+        """Run this action, updating outputs internally or via the cipy.runner.run wrapper"""
+
+    def cleanup(self, context: Context) -> None:
+        """Optionally clean up after ourselves"""
 
 
 class Call(Action, extra="allow"):
@@ -52,7 +72,7 @@ class Call(Action, extra="allow"):
 
     name: str = ""
     using: Action
-    __pydantic_extra__: dict[str, bool | int | float | str | Ref | Factory]
+    __pydantic_extra__: dict[str, Value]
 
     def __init__(self, using: Action, /, **kwargs: Any) -> None:
         super().__init__(using=using, **kwargs)  # type: ignore
@@ -76,66 +96,6 @@ class Call(Action, extra="allow"):
         self.using.cleanup(context)
 
 
-class NodeScript(Action):
-    """
-    A special script that is run as a node.js file, with optional post-script
-    for cleaning up the environment.
-    """
-
-    version: str = Field(default="node24", pattern="node\\d+")
-    main: pathlib.Path
-    post: pathlib.Path | None = None
-
-    @final
-    @cipy.runner.ipc
-    @cipy.runner.preamble
-    def run(self, context: Context) -> Status:
-        return cipy.runner.run(self, ["node", str(self.main)])
-
-    @final
-    def cleanup(self, context: Context) -> None:
-        if self.post is None:
-            return
-        cipy.runner.run(self, ["node", str(self.post)])
-
-
-class Script(Action):
-    """Action descriptor for a generic shell runner"""
-
-    _GREEN: ClassVar[str] = Fore.dark_sea_green_4b + Style.BOLD
-
-    shell: Shell = Shell.DEFAULT
-    name: str = ""
-    script: str
-
-    def model_post_init(self, context: Any, /) -> None:
-        self.script = dedent(self.script).strip("\n")
-        if not self.name:
-            self.name = f"a {self.shell} script"
-
-    @final
-    @cipy.runner.ipc
-    def run(self, context: Context) -> Status:
-        with _io.capture_stderr() as stderr:
-            script = render(self.script, context, warn=True)
-            if stderr.getvalue():
-                self.logger.error(stderr.getvalue())
-                return Status.FAILURE
-            lines = script.splitlines()
-
-        with cipy.runner.logging_group(self, lines[0], ci_only=True):
-            if settings.INTERACTIVE:
-                self.logger.warning("Run %s", self.name)
-            for line in lines:
-                self.logger.info("%s%s%s", Script._GREEN, line, Style.reset)
-            self.logger.info("shell: %s", " ".join(self.shell.command("{0}")))
-
-        with tempfile.NamedTemporaryFile(suffix=self.shell.extension()) as file:
-            file.write(script.encode("utf-8"))
-            file.flush()
-            return cipy.runner.run(self, self.shell.command(file.name))
-
-
 class Composite(Action):
     """
     Action descriptor for a linear sequence of child actions.

+ 4 - 146
src/cipy/common.py

@@ -1,23 +1,9 @@
-"""Common objects and base classes in the CI hierarchy"""
+"""Common objects in the CI hierarchy"""
 
-import abc
-import dataclasses
-import logging
-import os
-
-from contextlib import contextmanager
-from functools import reduce
-from types import SimpleNamespace, NoneType
-from typing import Annotated, Any, Callable, Iterator, Literal, Self, final, overload
+from dataclasses import dataclass
+from typing import Annotated, Self, final
 
 from pydantic import BaseModel, Field
-from pydantic_core import PydanticUndefined
-
-from cipy.status import Status
-
-type Scalar = bool | int | float | str
-type Computed = Ref | Factory
-type Value = Scalar | Computed
 
 
 class Inputs(BaseModel):
@@ -33,7 +19,7 @@ class Outputs(BaseModel):
         return self.model_validate(self, extra="forbid")
 
 
-@dataclasses.dataclass
+@dataclass
 class Ref:
     """Annotation class describing a reference into Context or another place"""
 
@@ -43,131 +29,3 @@ class Ref:
         self.path = pathstr.split(".")
         if not self.path:
             raise ValueError("References must be of the form A.B.C etc.")
-
-
-@dataclasses.dataclass
-class Factory:
-    """Annotation class describing a non-trivial synthesized property"""
-
-    __call__: Callable[[Context], Scalar | None]
-
-
-class Results(SimpleNamespace):
-    """
-    Holder object for tracking the result of an action for Composite/Workflow actions
-    """
-
-    @dataclasses.dataclass
-    class Item:
-        """Result of a single action that needs to be tracked"""
-
-        conclusion: Status = Status.NOT_RUN
-        outputs: Outputs = dataclasses.field(default_factory=Outputs)
-
-    def __contains__(self, subscript: str) -> bool:
-        return hasattr(self, subscript)
-
-    def __setitem__(self, subscript: str, value: Results.Item) -> None:
-        if subscript:
-            self.__setattr__(subscript, value)
-
-    def __getitem__(self, subscript: str) -> Results.Item:
-        return self.__getattribute__(subscript)
-
-
-class Context(SimpleNamespace):
-    """Wrapper class for the context of the CI runtime"""
-
-    def __call__(self, arg: Value | None) -> Scalar | None:
-        """Accessor for context state with a dot-separated path"""
-        if arg is None:
-            return None
-
-        if isinstance(arg, Factory):
-            return arg(self)
-
-        if not isinstance(arg, Ref):
-            return arg
-
-        if arg.path[0] == "env":
-            assert len(arg.path) == 2
-            return os.environ.get(arg.path[1])
-
-        return reduce(  # type: ignore[return-value]
-            lambda o, a: o[a] if isinstance(o, dict) else getattr(o, a), arg.path, self
-        )
-
-    @overload
-    def fabricate(
-        self,
-        state: BaseModel,
-        attr: Literal["inputs"],
-        extra: dict[str, Any] | None = None,
-    ) -> Inputs: ...
-
-    @overload
-    def fabricate(
-        self,
-        state: BaseModel,
-        attr: Literal["outputs"],
-        extra: dict[str, Any] | None = None,
-    ) -> Outputs: ...
-
-    def fabricate(
-        self,
-        state: BaseModel,
-        attr: Literal["inputs"] | Literal["outputs"],
-        extra: dict[str, Ref | Factory] | None = None,
-    ) -> Inputs | Outputs:
-        """Fabricate and validate an Inputs or Outputs object"""
-        if extra is None:
-            extra = {}
-
-        annotation = state.__pydantic_fields__[attr].annotation
-        assert annotation is not None
-
-        fields: dict[str, Any] = {}
-        if (model := getattr(state, attr)) is not None:
-            annotation = model.__class__
-            fields = vars(model)
-
-        for name, fld in annotation.__pydantic_fields__.items():
-            if name in extra:
-                fields[name] = self(extra[name])
-            elif fld.default is not PydanticUndefined:
-                fields[name] = self(fld.default)
-
-        model = annotation(**fields)
-        setattr(state, attr, model)
-        return model
-
-    @contextmanager
-    def extend(self, **kwargs: Any) -> Iterator[Context]:
-        """Create a new context that inherits an extra property"""
-        yield Context(**vars(self), **kwargs)
-
-
-class Action(BaseModel, abc.ABC):
-    """Base class describing all individual steps in a CI workflow"""
-
-    name: str
-    id: str = Field(default="", exclude_if=lambda v: not v)
-    inputs: Inputs = Inputs()
-    outputs: Outputs = Outputs()
-
-    @property
-    def logger(self) -> logging.Logger:
-        """Get this class's logger"""
-        return logging.getLogger(self.__class__.__name__)
-
-    # pylint: disable=unused-argument
-    def enabled(self, status: Status, context: Context) -> bool:
-        """Should this action even be run?"""
-        return status.value <= Status.SUCCESS.value
-
-    @abc.abstractmethod
-    def run(self, context: Context) -> Status:
-        """Run this action, updating outputs internally or via the cipy.runner.run wrapper"""
-
-    def cleanup(self, context: Context) -> None:
-        """Optionally clean up after ourselves"""

+ 120 - 0
src/cipy/context.py

@@ -0,0 +1,120 @@
+"""Classes for managing the context of a CI run"""
+import os
+
+from contextlib import contextmanager
+from dataclasses import dataclass, field
+from functools import reduce
+from types import SimpleNamespace
+from typing import Any, Callable, Iterator, Literal, overload
+
+from pydantic import BaseModel
+from pydantic_core import PydanticUndefined
+
+from cipy.common import Inputs, Outputs, Ref
+from cipy.status import Status
+
+type Scalar = bool | int | float | str
+type Computed = Ref | Factory
+type Value = Scalar | Computed
+
+
+@dataclass
+class Factory:
+    """Annotation class describing a non-trivial synthesized property"""
+
+    __call__: Callable[[Context], Scalar | None]
+
+
+class Results(SimpleNamespace):
+    """
+    Holder object for tracking the result of an action for Composite/Workflow actions
+    """
+
+    @dataclass
+    class Item:
+        """Result of a single action that needs to be tracked"""
+
+        conclusion: Status = Status.NOT_RUN
+        outputs: Outputs = field(default_factory=Outputs)
+
+    def __contains__(self, subscript: str) -> bool:
+        return hasattr(self, subscript)
+
+    def __setitem__(self, subscript: str, value: Results.Item) -> None:
+        if subscript:
+            self.__setattr__(subscript, value)
+
+    def __getitem__(self, subscript: str) -> Results.Item:
+        return self.__getattribute__(subscript)
+
+
+class Context(SimpleNamespace):
+    """Wrapper class for the context of the CI runtime"""
+
+    def __call__(self, arg: Value | None) -> Scalar | None:
+        """Accessor for context state with a dot-separated path"""
+        if arg is None:
+            return None
+
+        if isinstance(arg, Factory):
+            return arg(self)
+
+        if not isinstance(arg, Ref):
+            return arg
+
+        if arg.path[0] == "env":
+            assert len(arg.path) == 2
+            return os.environ.get(arg.path[1])
+
+        return reduce(  # type: ignore[return-value]
+            lambda o, a: o[a] if isinstance(o, dict) else getattr(o, a), arg.path, self
+        )
+
+    @overload
+    def fabricate(
+        self,
+        state: BaseModel,
+        attr: Literal["inputs"],
+        extra: dict[str, Any] | None = None,
+    ) -> Inputs: ...
+
+    @overload
+    def fabricate(
+        self,
+        state: BaseModel,
+        attr: Literal["outputs"],
+        extra: dict[str, Any] | None = None,
+    ) -> Outputs: ...
+
+    def fabricate(
+        self,
+        state: BaseModel,
+        attr: Literal["inputs"] | Literal["outputs"],
+        extra: dict[str, Ref | Factory] | None = None,
+    ) -> Inputs | Outputs:
+        """Fabricate and validate an Inputs or Outputs object"""
+        if extra is None:
+            extra = {}
+
+        annotation = state.__pydantic_fields__[attr].annotation
+        assert annotation is not None
+
+        fields: dict[str, Any] = {}
+        if (model := getattr(state, attr)) is not None:
+            annotation = model.__class__
+            fields = vars(model)
+
+        for name, fld in annotation.__pydantic_fields__.items():
+            if name in extra:
+                fields[name] = self(extra[name])
+            elif fld.default is not PydanticUndefined:
+                fields[name] = self(fld.default)
+
+        model = annotation(**fields)
+        setattr(state, attr, model)
+        return model
+
+    @contextmanager
+    def extend(self, **kwargs: Any) -> Iterator[Context]:
+        """Create a new context that inherits an extra property"""
+        yield Context(**vars(self), **kwargs)

+ 3 - 3
src/cipy/runner.py

@@ -13,12 +13,12 @@ from typing import Any, Callable, Iterator, Literal, Sequence, TypeVar, overload
 
 from dotenv import dotenv_values
 
-import cipy.common
+import cipy.action
 from cipy import settings
-from cipy.common import Context
+from cipy.context import Context
 from cipy.status import Status
 
-Action = TypeVar("Action", bound=cipy.common.Action)
+Action = TypeVar("Action", bound=cipy.action.Action)
 type Run[Action] = Callable[[Action, Context], Status]
 
 

+ 81 - 0
src/cipy/script.py

@@ -0,0 +1,81 @@
+"""Script-oriented Actions"""
+
+import pathlib
+import tempfile
+
+from textwrap import dedent
+from typing import Any, ClassVar, final
+from colored import Fore, Style
+
+from chevron import render  # type: ignore[import-untyped]
+from pydantic import Field, PrivateAttr
+
+import cipy.runner
+
+from cipy import settings
+from cipy.action import Action
+from cipy.context import Context
+from cipy.status import Status
+from cipy.shell import Shell
+
+from . import _io
+
+
+class NodeScript(Action):
+    """
+    A special script that is run as a node.js file, with optional post-script
+    for cleaning up the environment.
+    """
+
+    version: str = Field(default="node24", pattern="node\\d+")
+    main: pathlib.Path
+    post: pathlib.Path | None = None
+
+    @final
+    @cipy.runner.ipc
+    @cipy.runner.preamble
+    def run(self, context: Context) -> Status:
+        return cipy.runner.run(self, ["node", str(self.main)])
+
+    @final
+    def cleanup(self, context: Context) -> None:
+        if self.post is None:
+            return
+        cipy.runner.run(self, ["node", str(self.post)])
+
+
+class Script(Action):
+    """Action descriptor for a generic shell runner"""
+
+    _GREEN: ClassVar[str] = Fore.dark_sea_green_4b + Style.BOLD
+
+    shell: Shell = Shell.DEFAULT
+    name: str = ""
+    script: str
+
+    def model_post_init(self, context: Any, /) -> None:
+        self.script = dedent(self.script).strip("\n")
+        if not self.name:
+            self.name = f"a {self.shell} script"
+
+    @final
+    @cipy.runner.ipc
+    def run(self, context: Context) -> Status:
+        with _io.capture_stderr() as stderr:
+            script = render(self.script, context, warn=True)
+            if stderr.getvalue():
+                self.logger.error(stderr.getvalue())
+                return Status.FAILURE
+            lines = script.splitlines()
+
+        with cipy.runner.logging_group(self, lines[0], ci_only=True):
+            if settings.INTERACTIVE:
+                self.logger.warning("Run %s", self.name)
+            for line in lines:
+                self.logger.info("%s%s%s", Script._GREEN, line, Style.reset)
+            self.logger.info("shell: %s", " ".join(self.shell.command("{0}")))
+
+        with tempfile.NamedTemporaryFile(suffix=self.shell.extension()) as file:
+            file.write(script.encode("utf-8"))
+            file.flush()
+            return cipy.runner.run(self, self.shell.command(file.name))

+ 2 - 1
src/cipy/workflow.py

@@ -9,7 +9,8 @@ from pydantic import BaseModel, PrivateAttr
 
 import cipy.runner
 
-from cipy.common import Action, Context, Results, Scalar, Value
+from cipy.action import Action
+from cipy.context import Context, Results, Scalar, Value
 from cipy.status import Status