Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: Backend implementation for server-side image builds #131

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 3 additions & 16 deletions backend/.dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,6 @@ ENV/
env.bak/
venv.bak/

# mkdocs documentation
/site

# mypy
.mypy_cache/
.dmypy.json
Expand Down Expand Up @@ -104,25 +101,15 @@ package.json
yarn.lock
package.lock

# GitLab Pages
public/

# Syft / SBOM Outputs
cyclonedx.json
spdx.json
syft-output.json

.pre-commit-cache
.ruff_cache

# direnv
.envrc
.direnv

#demo-files
data/
.lakectl.yaml
.demovenv

# Generated documentation
public/

# Helm charts
deploy/
3 changes: 2 additions & 1 deletion backend/src/jobq_server/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from fastapi import FastAPI
from kubernetes import config

from jobq_server.routers import jobs
from jobq_server.routers import builds, jobs


@asynccontextmanager
Expand All @@ -21,6 +21,7 @@ async def lifespan(app: FastAPI):
)

app.include_router(jobs.router, prefix="/jobs")
app.include_router(builds.router, prefix="/builds")


@app.get("/health", include_in_schema=False)
Expand Down
153 changes: 153 additions & 0 deletions backend/src/jobq_server/routers/builds.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
import asyncio
import io
import shlex
import tarfile
import tempfile
import uuid
from copy import deepcopy
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
from typing import Annotated

from fastapi import APIRouter, BackgroundTasks, Depends, Form, HTTPException, UploadFile
from fastapi.responses import JSONResponse, StreamingResponse
from pydantic import BaseModel

from jobq_server.utils.assembler import config
from jobq_server.utils.assembler.renderers import RENDERERS
from jobq_server.utils.processes import run_command

router = APIRouter(tags=["Container image builds"])

# In-memory storage for build jobs
build_jobs: dict[str, dict] = {}


class BuildJobStatus(str, Enum): # noqa: UP042
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could use StrEnum instead of Str, Enum server side, right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see this code ending up in the real implementation, but in principle yes :)

QUEUED = "queued"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
FAILED = "failed"


class BuildJob(BaseModel):
id: str
status: BuildJobStatus
logs: list[str]


@dataclass
class BuildOptions:
name: str = Form()
tag: str = Form(default="latest")
platform: str = Form(default="linux/amd64")


def build_image(
job_id: str,
options: BuildOptions,
build_context: UploadFile,
image_spec: UploadFile,
):
build_jobs[job_id]["status"] = BuildJobStatus.IN_PROGRESS

with tempfile.TemporaryDirectory() as tmpdir:
# Extract the build context to the temporary directory
with tarfile.TarFile.open(fileobj=build_context.file, mode="r:gz") as tarf:
tarf.extractall(tmpdir)

image_cfg = config.load_config(image_spec.file)
renderers = [cls(image_cfg) for cls in RENDERERS if cls.accepts(image_cfg)]
dockerfile_content = ""
for r in renderers:
dockerfile_content += r.render() + "\n"

with io.StringIO(dockerfile_content) as dockerfile:
build_jobs[job_id]["logs"] = []

build_cmd = [
"docker",
"buildx",
"build",
"--platform",
options.platform,
"-t",
f"{options.name}:{options.tag}",
"-f",
"-",
str(tmpdir),
]

exit_code, _, _, _ = run_command(
shlex.join(build_cmd),
verbose=True,
stdin=dockerfile,
stdout_stream=build_jobs[job_id]["logs"],
stderr_stream=build_jobs[job_id]["logs"],
)

if exit_code != 0:
build_jobs[job_id]["status"] = BuildJobStatus.FAILED
build_jobs[job_id]["logs"].append("Build failed.")
else:
build_jobs[job_id]["status"] = BuildJobStatus.COMPLETED
build_jobs[job_id]["logs"].append("Build completed successfully.")


@router.post("/build")
async def create_build(
background_tasks: BackgroundTasks,
options: Annotated[BuildOptions, Depends()],
image_spec: UploadFile,
build_context: UploadFile,
):
extension = Path(build_context.filename).suffixes
if extension not in [[".tgz"], [".tar", ".gz"]]:
raise HTTPException(
status_code=400, detail=f"File must be a gzipped TAR archive: {extension}"
)

job_id = str(uuid.uuid4())
build_jobs[job_id] = {"status": "queued", "logs": []}

# Need to deepcopy the uploaded files to avoid issues with the background task not being able to access them
# See https://github.com/fastapi/fastapi/discussions/10936
# and https://github.com/fastapi/fastapi/issues/10857
background_tasks.add_task(
build_image,
job_id,
options,
deepcopy(build_context),
deepcopy(image_spec),
)

return JSONResponse(
content={"job_id": job_id, "status": BuildJobStatus.QUEUED}, status_code=202
)


@router.get("/build/{job_id}", response_model=BuildJob)
async def get_build_status(job_id: str):
if job_id not in build_jobs:
raise HTTPException(status_code=404, detail="Build job not found")

return BuildJob(id=job_id, **build_jobs[job_id])


@router.get("/build/{job_id}/logs")
async def stream_build_logs(job_id: str):
if job_id not in build_jobs:
raise HTTPException(status_code=404, detail="Build job not found")

async def log_generator():
# Asynchronously iterate over the log list, while the builder subprocess appends to it.
# Cannot use an iterator due to the concurrent modification of the list.
last_index = 0
while build_jobs[job_id]["status"] != BuildJobStatus.COMPLETED:
for line in build_jobs[job_id]["logs"][last_index:]:
yield line
last_index = len(build_jobs[job_id]["logs"])
await asyncio.sleep(0.1)

return StreamingResponse(log_generator(), media_type="text/plain")
3 changes: 3 additions & 0 deletions backend/src/jobq_server/utils/assembler/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .config import Config, load_config

__all__ = ["Config", "load_config"]
151 changes: 151 additions & 0 deletions backend/src/jobq_server/utils/assembler/config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
from __future__ import annotations

import re
from dataclasses import field
from io import IOBase
from pathlib import Path
from typing import Annotated

import yaml
from annotated_types import Interval
from jobq.types import AnyPath
from pydantic import BaseModel, Field, field_validator, model_validator
from typing_extensions import TypeAliasType


class DependencySpec(BaseModel):
apt: list[str]
pip: list[str]


def validate_env_mapping(val: list[str]) -> list[str]:
def _validate_entry(val: str) -> str:
parts = val.split("=")

if len(parts) != 2:
raise ValueError(
"Environment variable mapping must be in the form 'KEY=VALUE'"
)

key, value = parts

if not key:
raise ValueError("Environment variable key cannot be empty")
if not value:
raise ValueError("Environment variable value cannot be empty")

# Validate key format to ensure it is a valid shell environment variable name
key_pattern = r"^[a-zA-Z_][a-zA-Z0-9_]*$"
if not re.match(key_pattern, key):
raise ValueError(
"Environment variable key must be a valid shell environment variable name"
)

return val

return [_validate_entry(v) for v in val]


class FilesystemSpec(BaseModel):
# (m.mynter) copy shadows depricated method in BaseModel but we use it to stay with docker nomenclature
copy: dict[str, str] = Field(default_factory=dict) # type: ignore[assignment]
add: dict[str, str] = Field(default_factory=dict)

@model_validator(mode="before")
def preprocess_copy_add(cls, values):
for model_field in ["copy", "add"]:
if model_field in values and isinstance(values[model_field], list):
instruct_dict = {}
for instruct in values[model_field]:
parts = instruct.split(":")
if len(parts) != 2:
raise ValueError(
"Filesystem operation must be in the form 'SOURCE:TARGET'"
)
src, tgt = parts
instruct_dict[src.strip()] = tgt.strip()
values[model_field] = instruct_dict
return values


class ConfigSpec(BaseModel):
env: list[str] = Field(default_factory=list)
arg: list[str] = Field(default_factory=list)
stopsignal: Annotated[int, Interval(ge=1, le=31)] | str | None = None
shell: str | None = None

@model_validator(mode="before")
@classmethod
def preprocess_env_arg(cls, values):
for model_field in ["env", "arg"]:
if model_field in values and isinstance(values[model_field], dict):
values[model_field] = [
f"{k}={v}" for k, v in values[model_field].items()
]
return values

_validate_env = field_validator("env")(validate_env_mapping)
_validate_arg = field_validator("arg")(validate_env_mapping)


class MetaSpec(BaseModel):
labels: list[str] = field(default_factory=list)

@model_validator(mode="before")
@classmethod
def coerce_kv_string(cls, val):
if isinstance(val["labels"], dict):
val["labels"] = [f"{k}={v}" for k, v in val["labels"].items()]
return val

@field_validator("labels")
def _validate_labels(cls, val):
for label in val:
parts = label.split("=")
if len(parts) != 2:
raise ValueError("Label must be in the form 'KEY=VALUE'")

key, value = parts
if not key:
raise ValueError("Label key cannot be empty")
if not value:
raise ValueError("Label value cannot be empty")

return val


Identifier = TypeAliasType("Identifier", Annotated[int, Interval(ge=0, le=65535)])
"""UID/GID identifier type"""


class UserSpec(BaseModel):
name: str = ""
uid: Identifier | None = None
gid: Identifier | None = None
create: bool = True


class BuildSpec(BaseModel):
base_image: str
dependencies: DependencySpec | None = None
user: UserSpec | None = None
config: ConfigSpec | None = None
meta: MetaSpec | None = None
filesystem: FilesystemSpec | None = None
workdir: str | None = None
volumes: list[str] | None = None


class Config(BaseModel):
build: BuildSpec


def load_config(config_source: AnyPath | IOBase) -> Config:
# If config_source is a file object, directly read the YAML content from it
if isinstance(config_source, IOBase):
config_yaml = yaml.safe_load(config_source)
else:
with Path(config_source).open() as f:
config_yaml = yaml.safe_load(f)

return Config(**config_yaml)
Loading