|
|
@@ -4,7 +4,7 @@ from typing import Any, final, override
|
|
|
|
|
|
from pydantic import BaseModel, PrivateAttr
|
|
|
|
|
|
-from cipy.common import Action, Context, Outputs, Status, _validate
|
|
|
+from cipy.common import Action, Context, Results, Status, _validate
|
|
|
|
|
|
|
|
|
class Job(BaseModel):
|
|
|
@@ -34,9 +34,6 @@ class Workflow(Action):
|
|
|
except ValueError:
|
|
|
pass
|
|
|
|
|
|
- def _outputs(self) -> dict[str, Outputs]:
|
|
|
- return {j.id: j.action.outputs for j in self.jobs}
|
|
|
-
|
|
|
@final
|
|
|
def run(self, context: Context) -> Status:
|
|
|
status = Status.SKIPPED
|
|
|
@@ -47,16 +44,22 @@ class Workflow(Action):
|
|
|
iter(j for j in self.jobs if j.id not in visited and not j.needs), None
|
|
|
)
|
|
|
|
|
|
- while job := _next():
|
|
|
- visited.add(job.id)
|
|
|
- if not job.action.enabled(status, context):
|
|
|
- status |= Status.SKIPPED
|
|
|
- continue
|
|
|
- status |= job.action.run(context)
|
|
|
- self._finished(job.id)
|
|
|
- _validate(job.action.outputs)
|
|
|
+ with context.extend(needs=Results()) as outctx:
|
|
|
+ while job := _next():
|
|
|
+ visited.add(job.id)
|
|
|
+
|
|
|
+ if job.action.enabled(status, context):
|
|
|
+ stat = job.action.run(context)
|
|
|
+ outctx.needs[job.id] = Results.Item(
|
|
|
+ stat, _validate(job.action.outputs)
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ stat = Status.SKIPPED
|
|
|
+ outctx.needs[job.id] = Results.Item(Status.SKIPPED)
|
|
|
+
|
|
|
+ status |= stat
|
|
|
+ self._finished(job.id)
|
|
|
|
|
|
- with context.extend(needs=self._outputs) as outctx:
|
|
|
outctx.fabricate(self, "outputs")
|
|
|
|
|
|
return status
|