Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor spark-processor service #18

Merged
merged 15 commits into from
Feb 14, 2023
File renamed without changes.
16 changes: 16 additions & 0 deletions common/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import json
from pathlib import Path


def read_json(file_path: str | Path) -> dict:
with open(file_path, "r", encoding="utf-8") as json_file:
return json.load(json_file)


def string_to_bytes(size):
units = {"b": 1, "k": 1024, "m": 1024**2, "g": 1024**3, "t": 1024**4}
size = size.lower().strip()
num = float(size[:-1])
unit = size[-1]
value_in_bytes = int(num * units[unit])
return value_in_bytes
Empty file added configs/__init__.py
Empty file.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
events_config = {
{
"SparkListenerApplicationStart": {
"application_start_time": ["Timestamp"]
},
Expand All @@ -12,13 +12,13 @@
},
"SparkListenerTaskEnd": {
"executor_id": ["Task Info", "Executor ID"],
"executor_cpu_time": ["Task Metrics", "Executor CPU Time"],
"cpu_time": ["Task Metrics", "Executor CPU Time"],
"bytes_read": ["Task Metrics", "Input Metrics", "Bytes Read"],
"records_read": ["Task Metrics", "Input Metrics", "Records Read"],
"bytes_written": ["Task Metrics", "Output Metrics", "Bytes Written"],
"records_written": ["Task Metrics", "Output Metrics", "Records Written"],
"local_bytes_read": ["Task Metrics", "Shuffle Read Metrics", "Local Bytes Read"],
"remote_bytes_read": ["Task Metrics", "Shuffle Read Metrics", "Remote Bytes Read"],
"shuffle_local_bytes_read": ["Task Metrics", "Shuffle Read Metrics", "Local Bytes Read"],
"shuffle_remote_bytes_read": ["Task Metrics", "Shuffle Read Metrics", "Remote Bytes Read"],
"shuffle_bytes_written": ["Task Metrics", "Shuffle Write Metrics", "Shuffle Bytes Written"],
"jvm_memory": ["Task Executor Metrics", "ProcessTreeJVMRSSMemory"],
"python_memory": ["Task Executor Metrics", "ProcessTreePythonRSSMemory"],
Expand Down
2 changes: 1 addition & 1 deletion spark_endpoint/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

from common.config import app_config
from common.logger import get_logger
from common.models import RawEvent, db_session
from common.db_models import RawEvent, db_session

logger = get_logger()

Expand Down
35 changes: 26 additions & 9 deletions spark_job_processor/app.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,50 @@
import json
import os

from common.config import app_config
from kafka import KafkaConsumer
from sqlalchemy import select

from spark_job_processor.processor import process_message
from common.config import app_config
from common.logger import get_logger
from common.db_models import RawEvent, SparkJobRun, db_session
from spark_job_processor.events_processor import EventsProcessor

TOPIC_NAME = "JOB_RUN_EVENT"

ENVIRONMENT = os.getenv('ENVIRONMENT', 'development')
ENVIRONMENT = os.getenv("ENVIRONMENT", "development")
CONFIG = app_config[ENVIRONMENT]
logger = get_logger()


def get_events_from_db(job_run_id: str):
stmt = select(RawEvent).where(RawEvent.job_run_id == job_run_id)
return db_session.scalars(stmt)


def insert_spark_job_run(data: dict):
spark_job_run = SparkJobRun(**data)
db_session.add(spark_job_run)
db_session.commit()


def run():
consumer = KafkaConsumer(
CONFIG.KAFKA_TOPIC_NAME,
bootstrap_servers=[CONFIG.KAFKA_BOOTSTRAP_SERVERS],
value_deserializer=lambda m: json.loads(m.decode('utf-8'))
value_deserializer=lambda m: json.loads(m.decode("utf-8")),
)
logger.info('Starting to consume messages')
logger.info("Starting to consume messages")
events_processor = EventsProcessor()

for msg in consumer:
try:
logger.info('Received message: {}'.format(msg.value))
process_message(**msg.value)
logger.info(f"Received message: {msg.value}")
events = [dict(raw_event.event) for raw_event in get_events_from_db(msg.value['job_run_id'])]
application_data = events_processor.process_events(events, **msg.value)
insert_spark_job_run(application_data)
except Exception as e:
logger.error('Processor error: {}'.format(e), exc_info=True)
logger.error("Processor error: {}".format(e), exc_info=True)


if __name__ == '__main__':
if __name__ == "__main__":
run()
60 changes: 60 additions & 0 deletions spark_job_processor/events_processor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
from spark_job_processor.events_resolver import EventsResolver


class EventsProcessor:
def __init__(self) -> None:
self.resolver: EventsResolver = EventsResolver()

def process_events(
self,
events: list[dict],
job_run_id,
job_id,
pipeline_id=None,
pipeline_run_id=None,
) -> dict:
application = self.resolver.events_resolver(events)
application.set_totals()
total_cpu_time_used = round(
application.executor_total.task_total.cpu_time / 1e9, 4
)

peak_memory_usage = 0
if application.memory_per_executor:
peak_memory_usage = round(
(
application.executor_total.task_total.total_memory
/ application.memory_per_executor
)
* 100,
4,
)

cpu_utilization = 0
if application.executor_total.cpu_uptime:
cpu_utilization = round(
(total_cpu_time_used / application.executor_total.cpu_uptime)
* 100,
4,
)

processed_data = {
"id": job_run_id,
"job_id": job_id,
"pipeline_id": pipeline_id,
"pipeline_run_id": pipeline_run_id,
"cpu_utilization": cpu_utilization,
"total_cpu_time_used": total_cpu_time_used,
"num_of_executors": len(application.executors),
"total_memory_per_executor": application.memory_per_executor,
"total_bytes_read": application.executor_total.task_total.bytes_read,
"total_shuffle_bytes_read": application.executor_total.task_total.total_shuffle_bytes_read,
"total_bytes_written": application.executor_total.task_total.bytes_written,
"total_shuffle_bytes_written": application.executor_total.task_total.shuffle_bytes_written,
"total_cpu_uptime": application.executor_total.cpu_uptime,
"peak_memory_usage": peak_memory_usage,
"total_cores_num": application.executor_total.num_cores,
"start_time": application.start_time,
"end_time": application.end_time,
}
return processed_data
96 changes: 96 additions & 0 deletions spark_job_processor/events_resolver.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
from datetime import datetime
from pathlib import Path

from common.logger import get_logger
from common.utils import string_to_bytes
from spark_job_processor.events_resolver_base import EventsResolverBase
from spark_job_processor.models import Application, Task

logger = get_logger()


CONFIG_FILE_PATH = Path("configs/events_config.json")


class EventsResolver(EventsResolverBase):
def __init__(self) -> None:
super().__init__(path_events_config=CONFIG_FILE_PATH, event_field_name="Event")

def events_resolver(self, events: list[dict]) -> Application:
application = Application()
for event in events:
match event["Event"]:
case "SparkListenerApplicationStart":
app_start_timestamp = self.find_value_in_event(
event, "application_start_time"
)
application.start_time = datetime.utcfromtimestamp(
app_start_timestamp / 1000.0
)

case "SparkListenerApplicationEnd":
app_end_timestamp = self.find_value_in_event(
event, "application_end_time"
)
application.end_time = datetime.utcfromtimestamp(
app_end_timestamp / 1000.0
)

case "SparkListenerExecutorAdded":
executor_start_timestamp = self.find_value_in_event(
event, "executor_start_time"
)
executor_id = self.find_value_in_event(event, "executor_id")
_executor = application.executors[executor_id]
_executor.start_time = datetime.utcfromtimestamp(
executor_start_timestamp / 1000.0
)
_executor.num_cores = self.find_value_in_event(
event, "cores_num"
)

case "SparkListenerTaskEnd":
_task_data = {
field: self.find_value_in_event(event, field)
for field in self.events_config["SparkListenerTaskEnd"]
}
executor_id = _task_data.pop("executor_id")
_task = Task(**_task_data)
application.executors[executor_id].tasks.append(_task)

case "SparkListenerExecutorRemoved" | "SparkListenerExecutorCompleted":
executor_id = self.find_value_in_event(event, "executor_id")
executor_end_timestamp = self.find_value_in_event(
event, "executor_end_time"
)
application.executors[executor_id][
"end_time"
] = datetime.utcfromtimestamp(executor_end_timestamp / 1000.0)

case "SparkListenerEnvironmentUpdate":
try:
executor_memory = string_to_bytes(
self.find_value_in_event(event, "executor_memory")
)
memory_overhead_factor = float(
self.find_value_in_event(event, "memory_overhead_factor")
)
application.memory_per_executor = executor_memory * (
1 + memory_overhead_factor
)
except Exception as exc:
logger.error(
"Failed to parse executor memory from event: %s",
exc,
exc_info=True,
)

self.post_processing(application)
return application

@staticmethod
def post_processing(application: Application):
if application.end_time:
for executor in application.executors.values():
if executor.end_time is None:
executor.end_time = application.end_time
19 changes: 19 additions & 0 deletions spark_job_processor/events_resolver_base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from abc import ABC, abstractmethod
from pathlib import Path

from common.utils import read_json


class EventsResolverBase(ABC):
def __init__(self, path_events_config: str | Path, event_field_name: str) -> None:
self.events_config = read_json(path_events_config)
self.event_field_name = event_field_name

@abstractmethod
def events_resolver(self, events: list[dict]):
"""Resolving the events"""

def find_value_in_event(self, event: str, field_name: str):
for value in self.events_config[event[self.event_field_name]][field_name]:
event = event[value]
return event
Loading