-
Notifications
You must be signed in to change notification settings - Fork 4.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix record count and add acceptance test to the new scheduler #9487
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,6 +4,7 @@ | |
|
||
package io.airbyte.workers; | ||
|
||
import io.airbyte.analytics.Deployment; | ||
import io.airbyte.analytics.TrackingClient; | ||
import io.airbyte.analytics.TrackingClientSingleton; | ||
import io.airbyte.commons.features.EnvVariableFeatureFlags; | ||
|
@@ -24,11 +25,14 @@ | |
import io.airbyte.db.instance.jobs.JobsDatabaseInstance; | ||
import io.airbyte.scheduler.persistence.DefaultJobCreator; | ||
import io.airbyte.scheduler.persistence.DefaultJobPersistence; | ||
import io.airbyte.scheduler.persistence.JobCreator; | ||
import io.airbyte.scheduler.persistence.JobNotifier; | ||
import io.airbyte.scheduler.persistence.JobPersistence; | ||
import io.airbyte.scheduler.persistence.WorkspaceHelper; | ||
import io.airbyte.scheduler.persistence.job_factory.DefaultSyncJobFactory; | ||
import io.airbyte.scheduler.persistence.job_factory.OAuthConfigSupplier; | ||
import io.airbyte.scheduler.persistence.job_factory.SyncJobFactory; | ||
import io.airbyte.scheduler.persistence.job_tracker.JobTracker; | ||
import io.airbyte.workers.helper.ConnectionHelper; | ||
import io.airbyte.workers.process.DockerProcessFactory; | ||
import io.airbyte.workers.process.KubePortManagerSingleton; | ||
|
@@ -100,6 +104,8 @@ public class WorkerApp { | |
private final Configs configs; | ||
private final ConnectionHelper connectionHelper; | ||
private final boolean containerOrchestratorEnabled; | ||
private final JobNotifier jobNotifier; | ||
private final JobTracker jobTracker; | ||
|
||
public void start() { | ||
final Map<String, String> mdc = MDC.getCopyOfContextMap(); | ||
|
@@ -187,6 +193,8 @@ public void start() { | |
|
||
syncWorker.registerActivitiesImplementations(replicationActivity, normalizationActivity, dbtTransformationActivity, persistStateActivity); | ||
|
||
final JobCreator jobCreator = new DefaultJobCreator(jobPersistence, configRepository); | ||
|
||
final Worker connectionUpdaterWorker = | ||
factory.newWorker(TemporalJobType.CONNECTION_UPDATER.toString(), getWorkerOptions(maxWorkers.getMaxSyncWorkers())); | ||
connectionUpdaterWorker.registerWorkflowImplementationTypes(ConnectionManagerWorkflowImpl.class, SyncWorkflowImpl.class); | ||
|
@@ -198,7 +206,11 @@ public void start() { | |
jobPersistence, | ||
temporalWorkerRunFactory, | ||
workerEnvironment, | ||
logConfigs), | ||
logConfigs, | ||
jobNotifier, | ||
jobTracker, | ||
configRepository, | ||
jobCreator), | ||
new ConfigFetchActivityImpl(configRepository, jobPersistence, configs, () -> Instant.now().getEpochSecond()), | ||
new ConnectionDeletionActivityImpl(connectionHelper), | ||
replicationActivity, | ||
|
@@ -345,6 +357,12 @@ public static void main(final String[] args) throws IOException, InterruptedExce | |
.getInitialized(); | ||
|
||
final JobPersistence jobPersistence = new DefaultJobPersistence(jobDatabase); | ||
TrackingClientSingleton.initialize( | ||
configs.getTrackingStrategy(), | ||
new Deployment(configs.getDeploymentMode(), jobPersistence.getDeployment().orElseThrow(), configs.getWorkerEnvironment()), | ||
configs.getAirbyteRole(), | ||
configs.getAirbyteVersion(), | ||
configRepository); | ||
final TrackingClient trackingClient = TrackingClientSingleton.get(); | ||
final SyncJobFactory jobFactory = new DefaultSyncJobFactory( | ||
new DefaultJobCreator(jobPersistence, configRepository), | ||
|
@@ -372,6 +390,14 @@ public static void main(final String[] args) throws IOException, InterruptedExce | |
workspaceHelper, | ||
workerConfigs); | ||
|
||
final JobNotifier jobNotifier = new JobNotifier( | ||
configs.getWebappUrl(), | ||
configRepository, | ||
workspaceHelper, | ||
TrackingClientSingleton.get()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not seeing this be initialised? Am I missing something? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is initialized in the get method
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah I was referring to properly initialising it with this call https://github.com/airbytehq/airbyte/blob/master/airbyte-analytics/src/main/java/io/airbyte/analytics/TrackingClientSingleton.java#L39. e.g. https://github.com/airbytehq/airbyte/blob/master/airbyte-scheduler/app/src/main/java/io/airbyte/scheduler/app/SchedulerApp.java#L259 I think this is needed otherwise we won't be sending events to Segment. If you look at the Scheduler code, the linked code is called before we call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for catching it, Done |
||
|
||
final JobTracker jobTracker = new JobTracker(configRepository, jobPersistence, trackingClient); | ||
|
||
new WorkerApp( | ||
workspaceRoot, | ||
jobProcessFactory, | ||
|
@@ -392,7 +418,9 @@ public static void main(final String[] args) throws IOException, InterruptedExce | |
temporalWorkerRunFactory, | ||
configs, | ||
connectionHelper, | ||
configs.getContainerOrchestratorEnabled()).start(); | ||
configs.getContainerOrchestratorEnabled(), | ||
jobNotifier, | ||
jobTracker).start(); | ||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done