Quellcode durchsuchen

refactor: change from pipe (contextmanager) to ipc (decorator)

Sam Jaffe vor 1 Monat
Ursprung
Commit
af28d9636a
5 geänderte Dateien mit 58 neuen und 46 gelöschten Zeilen
  1. 1 0
      pyproject.toml
  2. 16 9
      src/cipy/action.py
  3. 4 0
      src/cipy/common.py
  4. 30 32
      src/cipy/runner.py
  5. 7 5
      src/cipy/workflow.py

+ 1 - 0
pyproject.toml

@@ -6,6 +6,7 @@ readme = "README.md"
 requires-python = ">=3.14"
 requires-python = ">=3.14"
 dependencies = [
 dependencies = [
     "pydantic>=2.12.5",
     "pydantic>=2.12.5",
+    "python-dotenv>=1.2.2",
 ]
 ]
 
 
 [dependency-groups]
 [dependency-groups]

+ 16 - 9
src/cipy/action.py

@@ -10,7 +10,7 @@ from typing import final
 from pydantic import Field, PrivateAttr
 from pydantic import Field, PrivateAttr
 
 
 import cipy.runner
 import cipy.runner
-from cipy.common import Action, Context, Outputs, Status
+from cipy.common import Action, Context, Outputs, Status, _validate
 
 
 
 
 class Shell(StrEnum):
 class Shell(StrEnum):
@@ -30,13 +30,13 @@ class NodeScript(Action):
     post: pathlib.Path | None = None
     post: pathlib.Path | None = None
 
 
     @final
     @final
+    @cipy.runner.ipc
     def run(self, context: Context) -> Status:
     def run(self, context: Context) -> Status:
-        with cipy.runner.pipe(self):
-            try:
-                subprocess.run(["node", str(self.main)], check=True)
-                return Status.SUCCESS
-            except subprocess.CalledProcessError:
-                return Status.FAILURE
+        try:
+            subprocess.run(["node", str(self.main)], check=True)
+            return Status.SUCCESS
+        except subprocess.CalledProcessError:
+            return Status.FAILURE
 
 
     @final
     @final
     def cleanup(self, context: Context) -> None:
     def cleanup(self, context: Context) -> None:
@@ -82,8 +82,9 @@ class Script(Action):
                 return ["sh", "-e", str(script)]
                 return ["sh", "-e", str(script)]
 
 
     @final
     @final
+    @cipy.runner.ipc
     def run(self, context: Context) -> Status:
     def run(self, context: Context) -> Status:
-        with tempfile.TemporaryFile(mode="w+") as script, cipy.runner.pipe(self):
+        with tempfile.TemporaryFile(mode="w+") as script:
             script.write(self.script)
             script.write(self.script)
             try:
             try:
                 subprocess.run(self.command(script.name), check=True)
                 subprocess.run(self.command(script.name), check=True)
@@ -113,7 +114,13 @@ class Composite(Action):
         status = Status.SKIPPED
         status = Status.SKIPPED
 
 
         for step in self.steps:
         for step in self.steps:
-            status = cipy.runner.run(context, status, step, pre_validate=self._tick)
+            if not step.enabled(status, context):
+                status |= Status.SKIPPED
+                continue
+
+            status |= step.run(context)
+            self._counter += 1
+            _validate(step.outputs)
 
 
         with context.extend(steps=self._outputs) as outctx:
         with context.extend(steps=self._outputs) as outctx:
             outctx.fabricate(self, "outputs")
             outctx.fabricate(self, "outputs")

+ 4 - 0
src/cipy/common.py

@@ -16,11 +16,15 @@ from pydantic import BaseModel, Field
 class Status(Enum):
 class Status(Enum):
     """Result status of a runner, higher numbers take priority"""
     """Result status of a runner, higher numbers take priority"""
 
 
+    NOT_RUN = auto()
     SKIPPED = auto()
     SKIPPED = auto()
     SUCCESS = auto()
     SUCCESS = auto()
     FAILURE = auto()
     FAILURE = auto()
     CANCELLED = auto()
     CANCELLED = auto()
 
 
+    def __ior__(self, other: Status) -> Status:
+        return self if self.value > other.value else other
+
 
 
 class Inputs(BaseModel):
 class Inputs(BaseModel):
     """Stub class describing input arguments"""
     """Stub class describing input arguments"""

+ 30 - 32
src/cipy/runner.py

@@ -1,14 +1,20 @@
 """
 """
 Common functions for setting up/tearing down environments for running an action.
 Common functions for setting up/tearing down environments for running an action.
 """
 """
+
+import functools
 import os
 import os
-import re
 import tempfile
 import tempfile
 
 
 from contextlib import contextmanager
 from contextlib import contextmanager
-from typing import Any, Callable, Iterator
+from typing import Any, Callable, Iterator, TypeVar, overload
+
+from dotenv import dotenv_values
 
 
-from cipy.common import Action, Context, Status, _validate
+import cipy.common
+from cipy.common import Context, Status, _validate
+
+Action = TypeVar("Action", bound=cipy.common.Action)
 
 
 
 
 @contextmanager
 @contextmanager
@@ -31,8 +37,9 @@ def environ(*, error_on_override: bool = True, **overrides: Any) -> Iterator[Non
             os.environ[key] = value
             os.environ[key] = value
 
 
 
 
-@contextmanager
-def pipe(action: Action) -> Iterator[None]:
+def ipc(
+    func: Callable[[Action, Context], Status],
+) -> Callable[[Action, Context], Status]:
     """
     """
     IPC tool for passing inputs and outputs between an Action that is
     IPC tool for passing inputs and outputs between an Action that is
     implemented as some manner of script.
     implemented as some manner of script.
@@ -43,35 +50,26 @@ def pipe(action: Action) -> Iterator[None]:
     Action.Outputs and output environment variables will be writiable into
     Action.Outputs and output environment variables will be writiable into
     special temporary files, which will then be read into the context.
     special temporary files, which will then be read into the context.
     """
     """
-    inputs = {"INPUT_" + re.sub("[ -]", "_", k): v for k, v in action.inputs}
-    with (
-        tempfile.TemporaryFile(mode="w+") as output,
-        tempfile.TemporaryFile(mode="w+") as envfile,
-        environ(CI_OUTPUT=output.name, CI_ENVIRON=envfile.name, **inputs),
-    ):
-        yield
-        # TODO: Compute output
 
 
+    @functools.wraps(func)
+    def wrapper(self: Action, context: Context) -> Status:
+        inputs = {f"INPUT_{k}": v for k, v in self.inputs}
+        with (
+            tempfile.TemporaryFile(mode="w+") as output,
+            tempfile.TemporaryFile(mode="w+") as envfile,
+            environ(CI_OUTPUT=output.name, CI_ENVIRON=envfile.name, **inputs),
+        ):
+            rval = func(self, context)
 
 
-def run(
-    context: Context,
-    status: Status,
-    action: Action,
-    *,
-    pre_validate: Callable[[], None] | None = None,
-) -> Status:
-    """
-    General executor for an action - guarantees that we have constructed and
-    validated the Action.Inputs data.
+            if self.outputs is None:
+                self.outputs = self.__pydantic_fields__["outputs"].annotation()
 
 
-    Allows for a callback hook in between the run finishing and output
-    validation.
-    """
-    action.inputs = context.fabricate(action, "inputs")
+            outdata = dotenv_values(output.name)
+            for k, field in self.outputs.__pydantic_fields__.items():
+                if k in outdata:
+                    setattr(self.outputs, k, outdata[k])
 
 
-    stat = action.run(context)
-    if pre_validate:
-        pre_validate()
+            _validate(self.outputs)
+        return rval
 
 
-    _validate(action.outputs)
-    return stat if stat.value > status.value else status
+    return wrapper

+ 7 - 5
src/cipy/workflow.py

@@ -3,8 +3,7 @@ from typing import Any, final, override
 
 
 from pydantic import BaseModel, PrivateAttr
 from pydantic import BaseModel, PrivateAttr
 
 
-import cipy.runner
-from cipy.common import Action, Context, Outputs, Status
+from cipy.common import Action, Context, Outputs, Status, _validate
 
 
 
 
 class Job(BaseModel):
 class Job(BaseModel):
@@ -47,9 +46,12 @@ class Workflow(Action):
 
 
         while job := _next():
         while job := _next():
             visited.add(job.id)
             visited.add(job.id)
-            status = cipy.runner.run(
-                context, status, job.action, pre_validate=lambda: self._finished(job.id)
-            )
+            if not job.action.enabled(status, context):
+                status |= Status.SKIPPED
+                continue
+            status |= job.action.run(context)
+            self._finished(job.id)
+            _validate(job.action.outputs)
 
 
         with context.extend(needs=self._outputs) as outctx:
         with context.extend(needs=self._outputs) as outctx:
             outctx.fabricate(self, "outputs")
             outctx.fabricate(self, "outputs")