Skip to content

Dependency Injector on Pipeline Execute #106

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 1 addition & 66 deletions src/sdk/python/rtdip_sdk/pipelines/deploy/databricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,69 +158,4 @@ def launch(self):
)
os.chdir(current_dir)

return True

class DataBricksDeploy(DeployInterface):
'''

'''
pipeline_job: PipelineJob
databricks_job_for_pipeline_job: DatabricksJobForPipelineJob
host: str
token: str

def __init__(self, pipeline_job: PipelineJob, databricks_job_for_pipeline_job: DatabricksJobForPipelineJob, host: str, token: str) -> None:
self.pipeline_job = pipeline_job
self.databricks_job_for_pipeline_job = databricks_job_for_pipeline_job
self.host = host
self.token = token

def deploy(self):
# get Api Client
api_client = ApiClient(host=self.host, token=self.token)
jobs_api = JobsApi(api_client)

# create Databricks Job Tasks
databricks_tasks = []
for task in self.pipeline_job.task_list:
databricks_job_task = DatabricksTask(task_key=task.name, libraries=[], depends_on=[])
if self.databricks_job_for_pipeline_job.databricks_task_for_pipeline_task_list is not None:
databricks_task_for_pipeline_task = next(x for x in self.databricks_job_for_pipeline_job.databricks_task_for_pipeline_task_list if x.name == task.name)
if databricks_task_for_pipeline_task is not None:
databricks_job_task.__dict__.update(databricks_task_for_pipeline_task.__dict__)

databricks_job_task.name = task.name
databricks_job_task.depends_on = task.depends_on_task

# get libraries
for step in task.step_list:
libraries = step.component.libraries()
for pypi_library in libraries.pypi_libraries:
databricks_job_task.libraries.append(DatabricksLibraries(pypi=DatbricksLibrariesPypi(package=pypi_library.to_string(), repo=pypi_library.repo)))
for maven_library in libraries.maven_libraries:
databricks_job_task.libraries.append(DatabricksLibraries(maven=DatabricksLibrariesMaven(coordinates=maven_library.to_string(), repo=maven_library.repo)))
for wheel_library in libraries.pythonwheel_libraries:
databricks_job_task.libraries.append(DatabricksLibraries(whl=wheel_library))

try:
rtdip_version = version("rtdip-sdk")
databricks_job_task.libraries.append(DatabricksLibraries(pypi=DatbricksLibrariesPypi(package="rtdip-sdk=={}".format(rtdip_version))))
except PackageNotFoundError as e:
databricks_job_task.libraries.append(DatabricksLibraries(pypi=DatbricksLibrariesPypi(package="rtdip-sdk")))

databricks_job_task.spark_python_task = DatabricksSparkPythonTask(python_file="dbfs:/python_file.py")
databricks_tasks.append(databricks_job_task)

# create Databricks Job
databricks_job = DatabricksJob(name=self.pipeline_job.name, tasks=databricks_tasks)
databricks_job.__dict__.update(self.databricks_job_for_pipeline_job.__dict__)
databricks_job.__dict__.pop("databricks_task_for_pipeline_task_list", None)

# create Databricks Job
result = jobs_api.create_job(databricks_job.dict(exclude_none=True), version="2.1")
return result

def launch(self, job_id):
api_client = ApiClient(host=self.host, token=self.token)
jobs_api = JobsApi(api_client)
jobs_api.run_now(job_id)
return True
2 changes: 1 addition & 1 deletion src/sdk/python/rtdip_sdk/pipelines/execute/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def _task_setup_dependency_injection(self, step_list: list[PipelineStep]):
for step in step_list:
# setup factory provider for component
provider = providers.Factory(step.component)
attributes = step.component.__annotations__.items()
attributes = getattr(step.component, '__annotations__', {}).items()
# add spark session, if needed
for key, value in attributes:
# if isinstance(value, SparkSession): # TODO: fix this as value does not seem to be an instance of SparkSession
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from src.sdk.python.rtdip_sdk.pipelines.transformers.spark.eventhub import EventhubBodyBinaryToString
from src.sdk.python.rtdip_sdk.pipelines.destinations.spark.delta import SparkDeltaDestination
from src.sdk.python.rtdip_sdk.pipelines.sources.spark.delta_sharing import SparkDeltaSharingSource
from src.sdk.python.rtdip_sdk.pipelines.deploy.databricks import DataBricksDeploy, DatabricksDBXDeploy
from src.sdk.python.rtdip_sdk.pipelines.deploy.databricks import DatabricksDBXDeploy
from src.sdk.python.rtdip_sdk.pipelines.deploy.models.databricks import DatabricksCluster, DatabricksJobCluster, DatabricksJobForPipelineJob, DatabricksTaskForPipelineTask


Expand Down