Skip to content

Commit

Permalink
Query and monitor if we have skipped any scheduled jobs (#15256)
Browse files Browse the repository at this point in the history
* add error code to ManualOperationResult

* fix a bug

* support temporal metrics

* metrics in temporal

* use statsd

* wrap otel config to temporal metric export

* use http port 4318 for otlp exporter

* simpilfy to support dd only

* use /v1/metrics for endpoint

* use statsd

* fix

* remove unused func

* wrap up implementation to export temporal metrics to datadog

* use deps.toml to wrap up the dependency

* move to metric client factory

* fix pmd error

* pmd, comment fix

* pr comment fix

* add a new metric to observe abnormal scheduled sycns

* formatting

* add javadoc

* format fix
  • Loading branch information
xiaohansong authored Aug 8, 2022
1 parent 719bb60 commit c582901
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -148,4 +148,49 @@ public static List<Pair<JobStatus, Double>> overallJobRuntimeForTerminalJobsInLa
return pairedRes;
}

/*
* A connection that is not running on schedule is defined in last 24 hours if the number of runs
* are not matching with the number of expected runs according to the schedule settings. Refer to
* playbook for detailed discussion.
*/
public static Long numOfJobsNotRunningOnSchedule(final DSLContext ctx) {
final var countField = "cnt";
final var query = """
SELECT count(1) as cnt FROM ((
SELECT
c.id,
count(*) as cnt
FROM
connection c
LEFT JOIN Jobs j ON j.scope::uuid = c.id
WHERE
c.schedule IS NOT null
AND c.schedule != 'null'
AND j.created_at > now() - interval '24 hours 1 minutes'
AND c.status = 'active'
AND j.config_type = 'sync'
AND c.updated_at < now() - interval '24 hours 1 minutes'
AND cast(c.schedule::jsonb->'timeUnit' as text) = '"hours"'
GROUP BY 1
HAVING count(*) < 24 / cast(c.schedule::jsonb->'units' as integer))
UNION (
SELECT
c.id,
count(*) as cnt
FROM connection c
LEFT JOIN Jobs j ON j.scope::uuid = c.id
WHERE
c.schedule IS NOT null
AND c.schedule != 'null'
AND j.created_at > now() - interval '1 hours 1 minutes'
AND c.status = 'active'
AND j.config_type = 'sync'
AND c.updated_at < now() - interval '1 hours 1 minutes'
AND cast(c.schedule::jsonb->'timeUnit' as text) = '"minutes"'
GROUP BY 1
HAVING count(*) < 60 / cast(c.schedule::jsonb->'units' as integer))) as abnormal_sync_jobs
""";
return ctx.fetch(query).getValues(countField, long.class).get(0).longValue();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ public enum OssMetricsRegistry implements MetricsRegistry {
MetricEmittingApps.METRICS_REPORTER,
"num_active_conn_per_workspace",
"number of active connections per workspace"),
NUM_ABNORMAL_SCHEDULED_SYNCS(
MetricEmittingApps.METRICS_REPORTER,
"num_abnormal_scheduled_syncs",
"number of abnormal syncs that have skipped at least 1 scheduled run recently."),
OLDEST_PENDING_JOB_AGE_SECS(MetricEmittingApps.METRICS_REPORTER,
"oldest_pending_job_age_secs",
"oldest pending job in seconds"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ public enum ToEmit {
MetricClientFactory.getMetricClient().distribution(OssMetricsRegistry.NUM_ACTIVE_CONN_PER_WORKSPACE, count);
}
})),
NUM_ABNORMAL_SCHEDULED_SYNCS(countMetricEmission(() -> {
final var count = ReporterApp.configDatabase.query(MetricQueries::numOfJobsNotRunningOnSchedule);
MetricClientFactory.getMetricClient().gauge(OssMetricsRegistry.NUM_ABNORMAL_SCHEDULED_SYNCS, count);
})),
OVERALL_JOB_RUNTIME_IN_LAST_HOUR_BY_TERMINAL_STATE_SECS(countMetricEmission(() -> {
final var times = ReporterApp.configDatabase.query(MetricQueries::overallJobRuntimeForTerminalJobsInLastHour);
for (Pair<JobStatus, Double> pair : times) {
Expand Down

0 comments on commit c582901

Please sign in to comment.