Skip to content

Commit

Permalink
Define DbRetentionJob(Jdbi, DbRetentionConfig) (#2549)
Browse files Browse the repository at this point in the history
* Define `DbRetentionJob(Jdbi, DbRetentionConfig)`

Signed-off-by: wslulciuc <willy@datakin.com>

* Define `DbRetentionJob(Jdbi, DbRetentionConfig)`

Signed-off-by: wslulciuc <willy@datakin.com>

* continued: Define `DbRetentionJob(Jdbi, DbRetentionConfig)`

Signed-off-by: wslulciuc <willy@datakin.com>

* continued: Resolve merge conflicts

Signed-off-by: wslulciuc <willy@datakin.com>

* Add oss license header

Signed-off-by: wslulciuc <willy@datakin.com>

---------

Signed-off-by: wslulciuc <willy@datakin.com>
  • Loading branch information
wslulciuc authored Jul 24, 2023
1 parent a515c91 commit 3aa0b5b
Show file tree
Hide file tree
Showing 4 changed files with 145 additions and 24 deletions.
8 changes: 1 addition & 7 deletions api/src/main/java/marquez/MarquezApp.java
Original file line number Diff line number Diff line change
Expand Up @@ -137,13 +137,7 @@ public void run(@NonNull MarquezConfig config, @NonNull Environment env) {
// Add scheduled jobs to lifecycle.
if (config.hasDbRetentionPolicy()) {
// Add job to apply retention policy to database.
env.lifecycle()
.manage(
new DbRetentionJob(
jdbi,
config.getDbRetention().getFrequencyMins(),
config.getDbRetention().getNumberOfRowsPerBatch(),
config.getDbRetention().getRetentionDays()));
env.lifecycle().manage(new DbRetentionJob(jdbi, config.getDbRetention()));
}
}

Expand Down
18 changes: 13 additions & 5 deletions api/src/main/java/marquez/jobs/DbRetentionConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,22 @@
import static marquez.db.DbRetention.DEFAULT_NUMBER_OF_ROWS_PER_BATCH;
import static marquez.db.DbRetention.DEFAULT_RETENTION_DAYS;

import javax.validation.constraints.Positive;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
import lombok.NoArgsConstructor;
import lombok.Value;

/** Configuration for {@link DbRetentionJob}. */
public final class DbRetentionConfig {
@Builder
@NoArgsConstructor
@AllArgsConstructor
@Value
public class DbRetentionConfig {
public static final int DEFAULT_FREQUENCY_MINS = 15;

@Getter @Setter private int frequencyMins = DEFAULT_FREQUENCY_MINS;
@Getter @Setter private int numberOfRowsPerBatch = DEFAULT_NUMBER_OF_ROWS_PER_BATCH;
@Getter @Setter private int retentionDays = DEFAULT_RETENTION_DAYS;
@Builder.Default @Getter @Positive int frequencyMins = DEFAULT_FREQUENCY_MINS;
@Builder.Default @Getter @Positive int numberOfRowsPerBatch = DEFAULT_NUMBER_OF_ROWS_PER_BATCH;
@Builder.Default @Getter @Positive int retentionDays = DEFAULT_RETENTION_DAYS;
}
18 changes: 6 additions & 12 deletions api/src/main/java/marquez/jobs/DbRetentionJob.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@

package marquez.jobs;

import static com.google.common.base.Preconditions.checkArgument;

import com.google.common.util.concurrent.AbstractScheduledService;
import io.dropwizard.lifecycle.Managed;
import java.time.Duration;
Expand Down Expand Up @@ -43,21 +41,17 @@ public class DbRetentionJob extends AbstractScheduledService implements Managed
* of {@code retentionDays}.
*/
public DbRetentionJob(
@NonNull final Jdbi jdbi,
final int frequencyMins,
final int numberOfRowsPerBatch,
final int retentionDays) {
checkArgument(frequencyMins > 0, "'frequencyMins' must be > 0");
checkArgument(numberOfRowsPerBatch > 0, "'numberOfRowsPerBatch' must be > 0");
checkArgument(retentionDays > 0, "'retentionDays' must be > 0");
this.numberOfRowsPerBatch = numberOfRowsPerBatch;
this.retentionDays = retentionDays;
@NonNull final Jdbi jdbi, @NonNull final DbRetentionConfig dbRetentionConfig) {
this.numberOfRowsPerBatch = dbRetentionConfig.getNumberOfRowsPerBatch();
this.retentionDays = dbRetentionConfig.getRetentionDays();

// Open connection.
this.jdbi = jdbi;

// Define fixed schedule with no delay.
this.fixedRateScheduler =
Scheduler.newFixedRateSchedule(NO_DELAY, Duration.ofMinutes(frequencyMins));
Scheduler.newFixedRateSchedule(
NO_DELAY, Duration.ofMinutes(dbRetentionConfig.getFrequencyMins()));
}

@Override
Expand Down
125 changes: 125 additions & 0 deletions api/src/test/java/marquez/jobs/DbRetentionConfigTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
* Copyright 2018-2023 contributors to the Marquez project
* SPDX-License-Identifier: Apache-2.0
*/

package marquez.jobs;

import static marquez.db.DbRetention.DEFAULT_NUMBER_OF_ROWS_PER_BATCH;
import static marquez.db.DbRetention.DEFAULT_RETENTION_DAYS;
import static marquez.jobs.DbRetentionConfig.DEFAULT_FREQUENCY_MINS;
import static org.assertj.core.api.Assertions.assertThat;

import java.util.Set;
import javax.validation.ConstraintViolation;
import javax.validation.Validation;
import javax.validation.Validator;
import org.junit.jupiter.api.Test;

/** The test suite for {@link DbRetentionConfig}. */
public class DbRetentionConfigTest {
private static final Validator VALIDATOR =
Validation.buildDefaultValidatorFactory().getValidator();

@Test
public void testNewDbRetentionConfig_withDefaultsOnly() {
final DbRetentionConfig configWithDefaults = new DbRetentionConfig();

assertThat(configWithDefaults.getFrequencyMins()).isEqualTo(DEFAULT_FREQUENCY_MINS);
assertThat(configWithDefaults.getNumberOfRowsPerBatch())
.isEqualTo(DEFAULT_NUMBER_OF_ROWS_PER_BATCH);
assertThat(configWithDefaults.getRetentionDays()).isEqualTo(DEFAULT_RETENTION_DAYS);
}

@Test
public void testNewDbRetentionConfig_overrideFrequencyMins() {
final int frequencyMinsOverride = 5;
final DbRetentionConfig configWithFrequencyMinsOverride =
DbRetentionConfig.builder().frequencyMins(frequencyMinsOverride).build();

// No constraint violations.
final Set<ConstraintViolation<DbRetentionConfig>> violations =
VALIDATOR.validate(configWithFrequencyMinsOverride);
assertThat(violations).isEmpty();

assertThat(configWithFrequencyMinsOverride.getFrequencyMins()).isEqualTo(frequencyMinsOverride);
assertThat(configWithFrequencyMinsOverride.getNumberOfRowsPerBatch())
.isEqualTo(DEFAULT_NUMBER_OF_ROWS_PER_BATCH);
assertThat(configWithFrequencyMinsOverride.getRetentionDays())
.isEqualTo(DEFAULT_RETENTION_DAYS);
}

@Test
public void testNewDbRetentionConfig_overrideNumberOfRowsPerBatch() {
final int numberOfRowsPerBatchOverride = 25;
final DbRetentionConfig configWithNumberOfRowsPerBatchOverride =
DbRetentionConfig.builder().numberOfRowsPerBatch(numberOfRowsPerBatchOverride).build();

// No constraint violations.
final Set<ConstraintViolation<DbRetentionConfig>> violations =
VALIDATOR.validate(configWithNumberOfRowsPerBatchOverride);
assertThat(violations).isEmpty();

assertThat(configWithNumberOfRowsPerBatchOverride.getFrequencyMins())
.isEqualTo(DEFAULT_FREQUENCY_MINS);
assertThat(configWithNumberOfRowsPerBatchOverride.getNumberOfRowsPerBatch())
.isEqualTo(numberOfRowsPerBatchOverride);
assertThat(configWithNumberOfRowsPerBatchOverride.getRetentionDays())
.isEqualTo(DEFAULT_RETENTION_DAYS);
}

@Test
public void testNewDbRetentionConfig_overrideRetentionDays() {
final int retentionDaysOverride = 14;
final DbRetentionConfig configWithNumberOfRowsPerBatchOverride =
DbRetentionConfig.builder().retentionDays(retentionDaysOverride).build();

// No constraint violations.
final Set<ConstraintViolation<DbRetentionConfig>> violations =
VALIDATOR.validate(configWithNumberOfRowsPerBatchOverride);
assertThat(violations).isEmpty();

assertThat(configWithNumberOfRowsPerBatchOverride.getFrequencyMins())
.isEqualTo(DEFAULT_FREQUENCY_MINS);
assertThat(configWithNumberOfRowsPerBatchOverride.getNumberOfRowsPerBatch())
.isEqualTo(DEFAULT_NUMBER_OF_ROWS_PER_BATCH);
assertThat(configWithNumberOfRowsPerBatchOverride.getRetentionDays())
.isEqualTo(retentionDaysOverride);
}

@Test
public void testNewDbRetentionConfig_negativeFrequencyMins() {
final int negativeFrequencyMins = -5;

final DbRetentionConfig configWithNegativeFrequencyMins =
DbRetentionConfig.builder().frequencyMins(negativeFrequencyMins).build();

final Set<ConstraintViolation<DbRetentionConfig>> violations =
VALIDATOR.validate(configWithNegativeFrequencyMins);
assertThat(violations).hasSize(1);
}

@Test
public void testNewDbRetentionConfig_negativeNumberOfRowsPerBatch() {
final int negativeNumberOfRowsPerBatch = -25;

final DbRetentionConfig configWithNegativeNumberOfRowsPerBatch =
DbRetentionConfig.builder().numberOfRowsPerBatch(negativeNumberOfRowsPerBatch).build();

final Set<ConstraintViolation<DbRetentionConfig>> violations =
VALIDATOR.validate(configWithNegativeNumberOfRowsPerBatch);
assertThat(violations).hasSize(1);
}

@Test
public void testNewDbRetentionConfig_negativeRetentionDays() {
final int negativeRetentionDays = -14;

final DbRetentionConfig configWithNegativeRetentionDays =
DbRetentionConfig.builder().retentionDays(negativeRetentionDays).build();

final Set<ConstraintViolation<DbRetentionConfig>> violations =
VALIDATOR.validate(configWithNegativeRetentionDays);
assertThat(violations).hasSize(1);
}
}

0 comments on commit 3aa0b5b

Please sign in to comment.