Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add UI, PostgreSQL, Minio and S3 relations #6

Merged
merged 8 commits into from
Jun 4, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 55 additions & 8 deletions charmcraft.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,47 @@ requires:
interface: postgresql_client
limit: 1

object-storage:
interface: object-storage
schema:
v1:
provides:
type: object
properties:
access-key:
type: string
namespace:
type:
- string
- 'null'
port:
type: number
secret-key:
type: string
secure:
type: boolean
service:
type: string
required:
- access-key
- port
- secret-key
- secure
- service
versions: [v1]
__schema_source: https://raw.githubusercontent.com/canonical/operator-schemas/master/object-storage.yaml

s3-parameters:
interface: s3
limit: 1
optional: true

provides:
airbyte-server:
interface: airbyte-server
optional: true
limit: 1

# (Optional) Configuration options for the charm
# This config section defines charm config options, and populates the Configure
# tab on Charmhub.
Expand All @@ -55,28 +96,34 @@ config:
description: |
Storage type for logs.

Acceptable values are: "minio"
default: "minio"
Acceptable values are: "MINIO", "S3" (AWS)
default: "MINIO"
type: string

storage-bucket-log:
description: Temporal server host.
storage-bucket-logs:
description: Name of logs storage bucket.
default: "airbyte-dev-logs"
type: string

logs-ttl:
description: |
Number of days until logs are purged from object storage.
default: 30
type: int

storage-bucket-state:
description: Name of state storage bucker.
default: "state-storage"
description: Name of state storage bucket.
default: "airbyte-state-storage"
type: string

storage-bucket-activity-payload:
description: Name of activity payload storage bucket.
default: "payload-storage"
default: "airbyte-payload-storage"
type: string

storage-bucket-workload-output:
description: Name of workload output storage bucket.
default: "state-storage"
default: "airbyte-state-storage"
type: string

pod-running-ttl-minutes:
Expand Down
3 changes: 3 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
ops ~= 2.5
pydantic==1.10.12
boto3==1.34.31
serialized-data-interface==0.7.0
charmed-kubeflow-chisme==0.3.0
228 changes: 113 additions & 115 deletions src/charm.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,37 +8,31 @@
from pathlib import Path

import ops
from botocore.exceptions import ClientError
from charm_helpers import create_env
from charms.data_platform_libs.v0.data_models import TypedCharmBase
from charms.data_platform_libs.v0.database_requires import DatabaseRequires
from charms.data_platform_libs.v0.s3 import S3Requirer
from literals import (
BUCKET_CONFIGS,
CONNECTOR_BUILDER_SERVER_API_PORT,
CONTAINERS,
INTERNAL_API_PORT,
LOGS_BUCKET_CONFIG,
REQUIRED_S3_PARAMETERS,
)
from log import log_event_handler
from ops.model import ActiveStatus, BlockedStatus, WaitingStatus
from relations.airbyte_ui import AirbyteServer
from relations.minio import MinioRelation
from relations.postgresql import PostgresqlRelation
from relations.s3 import S3Integrator
from s3_helpers import S3Client
from state import State
from structured_config import CharmConfig
from structured_config import CharmConfig, StorageType

logger = logging.getLogger(__name__)

CONTAINERS = [
"airbyte-api-server",
"airbyte-bootloader",
"airbyte-connector-builder-server",
"airbyte-cron",
"airbyte-pod-sweeper",
"airbyte-server",
"airbyte-worker",
]
CONNECTOR_BUILDER_SERVER_API_PORT = 80
INTERNAL_API_PORT = 8001
AIRBYTE_API_PORT = 8006
WORKLOAD_API_PORT = 8007

# TODO (kelkawi-a): perform up check on the following ports for each container
# airbyte-api-server: 8006
# airbyte-bootloader: None
# airbyte-connector-builder-server: 8080
# airbyte-cron: 9001
# airbyte-pod-sweeper: None
# airbyte-server: 8001
# airbyte-worker: 9000


def get_pebble_layer(application_name, context):
return {
Expand Down Expand Up @@ -67,15 +61,40 @@ def __init__(self, *args):
self._state = State(self.app, lambda: self.model.get_relation("peer"))

self.framework.observe(self.on.config_changed, self._on_config_changed)
self.framework.observe(self.on.peer_relation_changed, self._on_peer_relation_changed)

# Handle postgresql relation.
self.db = DatabaseRequires(
self, relation_name="db", database_name="airbyte-k8s_db", extra_user_roles="admin"
)
self.postgresql = PostgresqlRelation(self)

self.minio = MinioRelation(self)

# Handle S3 integrator relation
self.s3_client = S3Requirer(self, "s3-parameters")
self.s3_relation = S3Integrator(self)

for container in CONTAINERS:
# Handle UI relation
self.airbyte_ui = AirbyteServer(self)

for container in list(CONTAINERS.keys()):
self.framework.observe(self.on[container].pebble_ready, self._on_pebble_ready)

@log_event_handler(logger)
def _on_pebble_ready(self, event: ops.PebbleReadyEvent):
"""Handle pebble-ready event."""
self._update(event)

@log_event_handler(logger)
def _on_peer_relation_changed(self, event):
"""Handle peer relation changed event.

Args:
event: The event triggered when the relation changed.
"""
self._update(event)

@log_event_handler(logger)
def _on_config_changed(self, event):
"""Handle changed configuration.
Expand All @@ -86,105 +105,82 @@ def _on_config_changed(self, event):
self.unit.status = WaitingStatus("configuring application")
self._update(event)

# TODO (kelkawi-a): Potentially move this to helpers.py later on
def _create_env(self):
db_conn = self._state.database_connections["db"]

host = db_conn["host"]
port = db_conn["port"]
db_name = db_conn["dbname"]
db_url = f"jdbc:postgresql://{host}:{port}/{db_name}"

# TODO (kelkawi-a): modify some of these values to grab data from relations instead
return {
"API_URL": "/api/v1/",
"AIRBYTE_VERSION": "0.57.3",
"AIRBYTE_EDITION": "community",
"AUTO_DETECT_SCHEMA": "true",
"DATABASE_URL": db_url,
"DATABASE_USER": db_conn["user"],
"DATABASE_PASSWORD": db_conn["password"],
"DATABASE_DB": db_name,
"DATABASE_HOST": host,
"DATABASE_PORT": port,
"WORKSPACE_ROOT": "/workspace",
"CONFIG_ROOT": "/configs",
"TEMPORAL_HOST": self.config["temporal-host"],
"WORKER_LOGS_STORAGE_TYPE": self.config["storage-type"].value,
"WORKER_STATE_STORAGE_TYPE": self.config["storage-type"].value,
"STORAGE_TYPE": self.config["storage-type"].value,
"STORAGE_BUCKET_ACTIVITY_PAYLOAD": "payload-storage",
"STORAGE_BUCKET_LOG": self.config["storage-bucket-log"],
"STORAGE_BUCKET_STATE": self.config["storage-bucket-state"],
"STORAGE_BUCKET_WORKLOAD_OUTPUT": self.config["storage-bucket-workload-output"],
"CONFIGS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION": "0.40.23.002",
"JOBS_DATABASE_MINIMUM_FLYWAY_MIGRATION_VERSION": "0.40.26.001",
"MICRONAUT_ENVIRONMENTS": "control-plane",
"WORKERS_MICRONAUT_ENVIRONMENTS": "control-plane",
"CRON_MICRONAUT_ENVIRONMENTS": "control-plane",
"INTERNAL_API_HOST": f"localhost:{INTERNAL_API_PORT}",
"LOG_LEVEL": self.config["log-level"].value,
"MICROMETER_METRICS_ENABLED": "false",
"KEYCLOAK_INTERNAL_HOST": "localhost",
"KEYCLOAK_DATABASE_URL": db_url + "?currentSchema=keycloak",
"WEBAPP_URL": "airbyte-ui-k8s:8080",
"WORKER_ENVIRONMENT": "kubernetes",
"WORKSPACE_DOCKER_MOUNT": "airbyte_workspace",
"SECRET_PERSISTENCE": "TESTING_CONFIG_DB_TABLE",
"CONNECTOR_BUILDER_SERVER_API_HOST": f"localhost:{CONNECTOR_BUILDER_SERVER_API_PORT}",
"S3_LOG_BUCKET_REGION": "",
"MINIO_ENDPOINT": "http://minio:9000",
"S3_LOG_BUCKET": "airbyte-dev-logs",
"S3_PATH_STYLE_ACCESS": "true",
"SHOULD_RUN_NOTIFY_WORKFLOWS": "true",
"STATE_STORAGE_MINIO_ACCESS_KEY": "minio",
"STATE_STORAGE_MINIO_SECRET_ACCESS_KEY": "minio123",
"STATE_STORAGE_MINIO_BUCKET_NAME": "state-storage",
"STATE_STORAGE_MINIO_ENDPOINT": "http://minio:9000",
"AIRBYTE_API_HOST": "localhost:8006",
"CONNECTOR_BUILDER_API_URL": "/connector-builder-api",
"WORKLOAD_API_HOST": "localhost:8007",
"WORKLOAD_API_URL": "localhost:8007",
"TEMPORAL_WORKER_PORTS": "9001,9002,9003,9004,9005,9006,9007,9008,9009,9010,9011,9012,9013,9014,9015,9016,9017,9018,9019,9020,9021,9022,9023,9024,9025,9026,9027,9028,9029,9030",
"AWS_ACCESS_KEY_ID": "minio",
"AWS_SECRET_ACCESS_KEY": "minio123",
"JOB_KUBE_SERVICEACCOUNT": "airbyte-k8s",
"JOB_KUBE_NAMESPACE": "dev-airbyte",
"RUNNING_TTL_MINUTES": self.config["pod-running-ttl-minutes"],
"SUCCEEDED_TTL_MINUTES": self.config["pod-successful-ttl-minutes"],
"UNSUCCESSFUL_TTL_MINUTES": self.config["pod-unsuccessful-ttl-minutes"],
"HTTP_PROXY": "http://squid.internal:3128",
"HTTPS_PROXY": "http://squid.internal:3128",
"NO_PROXY": "127.0.0.1,localhost,::1,10.0.0.0/8,172.16.0.0/12,192.168.0.0/16,.canonical.com,.launchpad.net,.internal,.jujucharms.com,temporal-k8s,minio",
"http_proxy": "http://squid.internal:3128",
"https_proxy": "http://squid.internal:3128",
"no_proxy": "10.0.0.0/8,localhost,127.0.0.1,.internal,.cluster.local,.local,.svc,airbyte-*,temporal-k8s,minio",
"JAVA_TOOL_OPTIONS": "-Dhttp.proxyHost=squid.internal -Dhttp.proxyPort=3128 -Dhttps.proxyHost=squid.internal -Dhttps.proxyPort=3128 -Dhttp.nonProxyHosts=10.0.0.0|localhost|127.0.0.1|*.internal|*.cluster.local|*.local|*.svc|airbyte-*|temporal-k8s|minio",
"JOB_DEFAULT_ENV_http_proxy": "http://squid.internal:3128",
"JOB_DEFAULT_ENV_https_proxy": "http://squid.internal:3128",
"JOB_DEFAULT_ENV_no_proxy": "127.0.0.1,localhost,::1,10.0.0.0/8,172.16.0.0/12,192.168.0.0/16,.canonical.com,.launchpad.net,.internal,.jujucharms.com,temporal-k8s,minio",
"JOB_DEFAULT_ENV_HTTP_PROXY": "http://squid.internal:3128",
"JOB_DEFAULT_ENV_HTTPS_PROXY": "http://squid.internal:3128",
"JOB_DEFAULT_ENV_NO_PROXY": "127.0.0.1,localhost,::1,10.0.0.0/8,172.16.0.0/12,192.168.0.0/16,.canonical.com,.launchpad.net,.internal,.jujucharms.com,temporal-k8s,minio",
"JOB_DEFAULT_ENV_JAVA_TOOL_OPTIONS": "-Dhttp.proxyHost=squid.internal -Dhttp.proxyPort=3128 -Dhttps.proxyHost=squid.internal -Dhttps.proxyPort=3128 -Dhttp.nonProxyHosts=10.0.0.0|localhost|127.0.0.1|*.internal|*.cluster.local|*.local|*.svc|airbyte-*|temporal-k8s|minio",
}
def _check_missing_params(self, params, required_params):
"""Validate that all required properties were extracted.

Args:
params: dictionary of parameters extracted from relation.
required_params: list of required parameters.

Returns:
list: List of OpenFGA parameters that are not set in state.
"""
missing_params = []
for key in required_params:
if params.get(key) is None:
missing_params.append(key)
return missing_params

def _validate(self):
"""Validate that configuration and relations are valid and ready.

Raises:
ValueError: in case of invalid configuration.
"""
# Validate peer relation
if not self._state.is_ready():
raise ValueError("peer relation not ready")

# Validate db relation
if self._state.database_connection is None:
raise ValueError("database relation not ready")

# Validate db relation
if self._state.minio is None and self._state.s3 is None:
raise ValueError("minio/s3 relation not ready")

# Validate S3 relation.
if self._state.s3:
missing_params = self._check_missing_params(self._state.s3, REQUIRED_S3_PARAMETERS)
if len(missing_params) > 0:
raise ValueError(f"s3:missing parameters {missing_params!r}")

def _update(self, event):
"""Update the Temporal server configuration and replan its execution.

Args:
event: The event triggered when the relation changed.
"""
# TODO (kelkawi-a): populate database connection from PostgreSQL relation
if not self._state.database_connections:
self.unit.status = BlockedStatus("missing db relation")
try:
self._validate()
except ValueError as err:
self.unit.status = BlockedStatus(str(err))
return

s3_parameters = self._state.s3
if self.config["storage-type"] == StorageType.minio:
s3_parameters = self._state.minio

try:
s3_client = S3Client(s3_parameters)

for bucket_config in BUCKET_CONFIGS:
bucket = self.config[bucket_config]
s3_client.create_bucket_if_not_exists(bucket)

logs_ttl = int(self.config["logs-ttl"])
s3_client.set_bucket_lifecycle_policy(
bucket=self.config[LOGS_BUCKET_CONFIG], ttl=logs_ttl
)
except (ClientError, ValueError) as e:
logging.info(f"Error creating bucket and setting lifecycle policy: {e}")
self.unit.status = BlockedStatus(f"failed to create buckets: {str(e)}")
return

env = self._create_env()
self.model.unit.set_ports(CONNECTOR_BUILDER_SERVER_API_PORT)
self.model.unit.set_ports(INTERNAL_API_PORT)
env = create_env(self.model.name, self.app.name, self.config, self._state)
self.model.unit.set_ports(INTERNAL_API_PORT, CONNECTOR_BUILDER_SERVER_API_PORT)

for service in CONTAINERS:
for service in list(CONTAINERS.keys()):
container = self.unit.get_container(service)
if not container.can_connect():
event.defer()
Expand All @@ -196,7 +192,7 @@ def _update(self, event):
with open(script_path, "r") as file_source:
logger.info("pushing pod-sweeper script...")
container.push(
"airbyte-app/bin/pod-sweeper.sh",
f"/airbyte-app/bin/{service}",
file_source,
make_dirs=True,
permissions=0o755,
Expand All @@ -207,6 +203,8 @@ def _update(self, event):
container.replan()

self.unit.status = ActiveStatus()
if self.unit.is_leader():
self.airbyte_ui._provide_server_status()


if __name__ == "__main__": # pragma: nocover
Expand Down
Loading