|
@@ -7,6 +7,7 @@ import tempfile
|
|
|
from textwrap import dedent
|
|
from textwrap import dedent
|
|
|
from typing import Any, ClassVar, final
|
|
from typing import Any, ClassVar, final
|
|
|
|
|
|
|
|
|
|
+from chevron import render # type: ignore[import-untyped]
|
|
|
from colored import Fore, Style
|
|
from colored import Fore, Style
|
|
|
from pydantic import Field, PrivateAttr
|
|
from pydantic import Field, PrivateAttr
|
|
|
|
|
|
|
@@ -15,6 +16,8 @@ from cipy import settings
|
|
|
from cipy.common import Action, Context, Factory, Ref, Results, Status, _validate
|
|
from cipy.common import Action, Context, Factory, Ref, Results, Status, _validate
|
|
|
from cipy.shell import Shell
|
|
from cipy.shell import Shell
|
|
|
|
|
|
|
|
|
|
+from . import _io
|
|
|
|
|
+
|
|
|
|
|
|
|
|
class Call(Action, extra="allow"):
|
|
class Call(Action, extra="allow"):
|
|
|
"""
|
|
"""
|
|
@@ -111,28 +114,34 @@ class Script(Action):
|
|
|
shell: Shell = Shell.DEFAULT
|
|
shell: Shell = Shell.DEFAULT
|
|
|
name: str = ""
|
|
name: str = ""
|
|
|
script: str
|
|
script: str
|
|
|
- _lines: list[str] = PrivateAttr()
|
|
|
|
|
|
|
|
|
|
def model_post_init(self, context: Any, /) -> None:
|
|
def model_post_init(self, context: Any, /) -> None:
|
|
|
self.script = dedent(self.script).strip("\n")
|
|
self.script = dedent(self.script).strip("\n")
|
|
|
- self._lines = self.script.splitlines()
|
|
|
|
|
if not self.name:
|
|
if not self.name:
|
|
|
self.name = f"a {self.shell} script"
|
|
self.name = f"a {self.shell} script"
|
|
|
|
|
|
|
|
@final
|
|
@final
|
|
|
@cipy.runner.ipc
|
|
@cipy.runner.ipc
|
|
|
def run(self, context: Context) -> Status:
|
|
def run(self, context: Context) -> Status:
|
|
|
- with cipy.runner.logging_group(self, self._lines[0], ci_only=True):
|
|
|
|
|
|
|
+ with _io.capture_stderr() as stderr:
|
|
|
|
|
+ script = render(self.script, context, warn=True)
|
|
|
|
|
+ if stderr.getvalue():
|
|
|
|
|
+ self.logger.error(stderr.getvalue())
|
|
|
|
|
+ return Status.FAILURE
|
|
|
|
|
+ lines = script.splitlines()
|
|
|
|
|
+
|
|
|
|
|
+ with cipy.runner.logging_group(self, lines[0], ci_only=True):
|
|
|
if settings.INTERACTIVE:
|
|
if settings.INTERACTIVE:
|
|
|
self.logger.warning("Run %s", self.name)
|
|
self.logger.warning("Run %s", self.name)
|
|
|
- for line in self._lines:
|
|
|
|
|
|
|
+ for line in lines:
|
|
|
self.logger.info("%s%s%s", Script._GREEN, line, Style.reset)
|
|
self.logger.info("%s%s%s", Script._GREEN, line, Style.reset)
|
|
|
self.logger.info("shell: %s", " ".join(self.shell.command("{0}")))
|
|
self.logger.info("shell: %s", " ".join(self.shell.command("{0}")))
|
|
|
|
|
|
|
|
- with tempfile.NamedTemporaryFile(suffix=self.shell.extension()) as script:
|
|
|
|
|
- script.write(self.script.encode("utf-8"))
|
|
|
|
|
|
|
+ with tempfile.NamedTemporaryFile(suffix=self.shell.extension()) as file:
|
|
|
|
|
+ file.write(script.encode("utf-8"))
|
|
|
|
|
+ file.flush()
|
|
|
try:
|
|
try:
|
|
|
- subprocess.run(self.shell.command(script.name), check=True)
|
|
|
|
|
|
|
+ subprocess.run(self.shell.command(file.name), check=True)
|
|
|
return Status.SUCCESS
|
|
return Status.SUCCESS
|
|
|
except subprocess.CalledProcessError:
|
|
except subprocess.CalledProcessError:
|
|
|
return Status.FAILURE
|
|
return Status.FAILURE
|