Skip to content

Commit

Permalink
Addressing comments
Browse files Browse the repository at this point in the history
  • Loading branch information
jyothsnakonisa committed Feb 20, 2025
1 parent d27d864 commit ff860db
Show file tree
Hide file tree
Showing 21 changed files with 176 additions and 147 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -550,18 +550,31 @@ public void streamCdcSegments(SidecarInstance sidecarInstance,
.build(), streamConsumer);
}

/**
* Get configs for all the services in the "configs" table inside sidecar's internal
* keyspace
*
* @return List of services and their corresponding configs
*/
public CompletableFuture<GetServicesConfigPayload> getServiceConfig()
{
return executor.executeRequestAsync(requestBuilder()
.request(new GetServiceConfigRequest())
.build());
.request(new GetServiceConfigRequest())
.build());
}

/**
* Update config for a given service in "configs" table in internal sidecar keyspace
*
* @param serviceConfig service for which the configs are being updated
* @param config the updated config
* @return updated config
*/
public CompletableFuture<PutCdcServiceConfigPayload> putCdcServiceConfig(ServiceConfig serviceConfig, Map<String, String> config)
{
return executor.executeRequestAsync(requestBuilder()
.request(new PutServiceConfigRequest(serviceConfig, new PutCdcServiceConfigPayload(config)))
.build());
.request(new PutServiceConfigRequest(serviceConfig, new PutCdcServiceConfigPayload(config)))
.build());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
*/
package org.apache.cassandra.sidecar.cdc;

import java.time.Duration;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration;
import org.apache.cassandra.sidecar.common.server.utils.MinuteBoundConfiguration;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

Expand All @@ -30,14 +31,9 @@
public interface CdcConfig
{
Logger LOGGER = LoggerFactory.getLogger(CdcConfig.class);

String DEFAULT_JOB_ID = "test-job";
int DEFAULT_MAX_WATERMARKER_SIZE = 400000;

CdcConfig STUB = new CdcConfig()
{
};

/**
* Topic format
*/
Expand Down Expand Up @@ -93,9 +89,9 @@ default String dc()
return "DATACENTER1";
}

default Duration watermarkWindow()
default MinuteBoundConfiguration watermarkWindow()
{
return Duration.ofHours(4);
return MinuteBoundConfiguration.parse("60m");
}

/**
Expand Down Expand Up @@ -142,9 +138,9 @@ default boolean isConfigReady()
return true;
}

default Duration minDelayBetweenMicroBatches()
default MillisecondBoundConfiguration minDelayBetweenMicroBatches()
{
return Duration.ofMillis(1000);
return MillisecondBoundConfiguration.parse("1s");
}

default int maxCommitLogsPerInstance()
Expand Down Expand Up @@ -173,9 +169,9 @@ default boolean persistEnabled()
/**
* @return the delay in millis between persist calls.
*/
default Duration persistDelay()
default MillisecondBoundConfiguration persistDelay()
{
return Duration.ofMillis(1000);
return MillisecondBoundConfiguration.parse("1s");
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@
*/
package org.apache.cassandra.sidecar.cdc;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;

import com.google.common.annotations.VisibleForTesting;
Expand All @@ -35,6 +35,8 @@
import io.vertx.core.Promise;

import org.apache.cassandra.sidecar.common.server.utils.DurationSpec;
import org.apache.cassandra.sidecar.common.server.utils.MillisecondBoundConfiguration;
import org.apache.cassandra.sidecar.common.server.utils.MinuteBoundConfiguration;
import org.apache.cassandra.sidecar.config.CdcConfiguration;
import org.apache.cassandra.sidecar.config.SchemaKeyspaceConfiguration;
import org.apache.cassandra.sidecar.db.CdcConfigAccessor;
Expand Down Expand Up @@ -171,9 +173,9 @@ public boolean failOnKafkaError()
}

@Override
public Duration persistDelay()
public MillisecondBoundConfiguration persistDelay()
{
return Duration.ofMillis(getInt(CDC_PERSIST_DELAY_MILLIS, 1000));
return new MillisecondBoundConfiguration(getInt(CDC_PERSIST_DELAY_MILLIS, 1000), TimeUnit.SECONDS);
}

@Override
Expand All @@ -183,23 +185,22 @@ public String dc()
}

@Override
public Duration watermarkWindow()
public MinuteBoundConfiguration watermarkWindow()
{
// this prop sets the maximum duration age accepted by CDC, any mutations with write timestamps older than
// the watermark window will be dropped with log message "Exclude the update due to out of the allowed time window."
final int seconds = getInt(WATERMARK_WINDOW_KEY, 259200);
return Duration.ofSeconds(seconds);
return new MinuteBoundConfiguration(getInt(WATERMARK_WINDOW_KEY, 259200), TimeUnit.SECONDS);
}

@Override
public Duration minDelayBetweenMicroBatches()
public MillisecondBoundConfiguration minDelayBetweenMicroBatches()
{
// this prop allows us to add a minimum delay between CDC micro batches
// usually if we need to slow down CDC
// e.g. if CDC is started with a large backlog of commit log segments and is working hard to process.
// e.g. or if there is a large data dump or burst of writes that causes high CDC activity.
final long millis = Long.parseLong(cdcConfigMappings.getOrDefault(MICROBATCH_DELAY_KEY, "1000"));
return Duration.ofMillis(millis);
return new MillisecondBoundConfiguration(millis, TimeUnit.MILLISECONDS);
}

@Override
Expand Down Expand Up @@ -249,9 +250,7 @@ public void registerConfigChangeListener(Callable<?> listener)

private Map<String, Object> getAuthConfigs()
{
Map<String, Object> authConfigs = new HashMap<>();
authConfigs.put("pie.queue.kaffe.client.private.key.location", cdcConfiguration.kafkaClientPrivateKeyPath());
return authConfigs;
return new HashMap<>();
}

@VisibleForTesting
Expand Down Expand Up @@ -289,7 +288,7 @@ public void execute(Promise<Void> promise)
}
catch (Throwable e)
{
LOGGER.error(String.format("There was an error with callback %s", listener), e);
LOGGER.error("There was an error with callback {}", listener, e);
}
}
promise.tryComplete();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,9 @@ public interface CdcConfiguration
*/
boolean isEnabled();

String kafkaClientPrivateKeyPath();

/**
*
* @return returns how frequently CDC configs are to be refreshed
*/
MillisecondBoundConfiguration cdcConfigRefreshTime();
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,12 @@ public class CdcConfigurationImpl implements CdcConfiguration
public static final SecondBoundConfiguration DEFAULT_SEGMENT_HARD_LINK_CACHE_EXPIRY =
SecondBoundConfiguration.parse("5m");

protected boolean isEnabled;
protected MillisecondBoundConfiguration cdcConfigRefreshTime;
protected SecondBoundConfiguration segmentHardLinkCacheExpiry;
@JsonProperty(value = IS_ENABLED_PROPERTY)
private final boolean isEnabled;
@JsonProperty(value = CONFIGURATION_REFRESH_TIME_PROPERTY)
private final MillisecondBoundConfiguration cdcConfigRefreshTime;
@JsonProperty(value = SEGMENT_HARD_LINK_CACHE_EXPIRY_PROPERTY)
private SecondBoundConfiguration segmentHardLinkCacheExpiry;


public CdcConfigurationImpl()
Expand Down Expand Up @@ -84,12 +87,6 @@ public SecondBoundConfiguration segmentHardLinkCacheExpiry()
return segmentHardLinkCacheExpiry;
}

@JsonProperty(value = SEGMENT_HARD_LINK_CACHE_EXPIRY_PROPERTY)
public void setSegmentHardLinkCacheExpiry(SecondBoundConfiguration segmentHardlinkCacheExpiry)
{
this.segmentHardLinkCacheExpiry = segmentHardlinkCacheExpiry;
}

/**
* Legacy property {@code segment_hardlink_cache_expiry_in_secs}
*
Expand All @@ -101,12 +98,6 @@ public void setSegmentHardLinkCacheExpiry(SecondBoundConfiguration segmentHardli
public void setSegmentHardLinkCacheExpiryInSecs(long segmentHardlinkCacheExpiryInSecs)
{
LOGGER.warn("'segment_hardlink_cache_expiry_in_secs' is deprecated, use 'segment_hardlink_cache_expiry' instead");
setSegmentHardLinkCacheExpiry(new SecondBoundConfiguration(segmentHardlinkCacheExpiryInSecs, TimeUnit.SECONDS));
}

@Override
public String kafkaClientPrivateKeyPath()
{
return null;
this.segmentHardLinkCacheExpiry = new SecondBoundConfiguration(segmentHardlinkCacheExpiryInSecs, TimeUnit.SECONDS);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@
import org.apache.cassandra.sidecar.common.server.CQLSessionProvider;
import org.apache.cassandra.sidecar.db.schema.ConfigsSchema;
import org.apache.cassandra.sidecar.db.schema.SidecarSchema;
import org.apache.cassandra.sidecar.routes.cdc.ValidServices;
import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
import org.apache.cassandra.sidecar.routes.cdc.Service;

/**
* {@link CdcConfigAccessor} is an accessor class for updating CDC configurations into
Expand All @@ -34,17 +33,16 @@
public class CdcConfigAccessor extends ConfigAccessorImpl
{
@Inject
protected CdcConfigAccessor(InstanceMetadataFetcher instanceMetadataFetcher,
ConfigsSchema configsSchema,
protected CdcConfigAccessor(ConfigsSchema configsSchema,
CQLSessionProvider sessionProvider,
SidecarSchema sidecarSchema)
{
super(instanceMetadataFetcher, configsSchema, sessionProvider, sidecarSchema);
super(configsSchema, sessionProvider, sidecarSchema);
}

@Override
public ValidServices service()
public Service service()
{
return ValidServices.CDC;
return Service.CDC;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,36 @@
*/
public interface ConfigAccessor
{
/**
* Gets the configs of a service
* @return returns configs for the current service
*/
ServiceConfig getConfig();

/**
* Persists configs into the "configs" table for the current service
*
* @param config configs to be persisted
* @return returns updated configs
*/
ServiceConfig storeConfig(final Map<String, String> config);

/**
* Stores configs of the current service if they are not already present
*
* @param config new configs
* @return updated configs
*/
Optional<ServiceConfig> storeConfigIfNotExists(final Map<String, String> config);

/**
* Deletes configs for the given service
*/
void deleteConfig();

/**
* Checks if the schema for configs table is initialized
* @return
*/
boolean isSchemaInitialized();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.cassandra.sidecar.db;

import com.google.inject.Inject;
import org.apache.cassandra.sidecar.routes.cdc.ValidServices;
import org.apache.cassandra.sidecar.routes.cdc.Service;

/**
* Factory for creating config objects based on the service name.
Expand All @@ -38,12 +38,12 @@ public ConfigAccessorFactory(KafkaConfigAccessor kafkaConfigAccessor,

public ConfigAccessor getConfigAccessor(final String service)
{
if (service.equals(ValidServices.KAFKA.serviceName))
if (service.equals(Service.KAFKA.serviceName))
{
return kafkaConfigAccessor;
}

if (service.equals(ValidServices.CDC.serviceName))
if (service.equals(Service.CDC.serviceName))
{
return cdcConfigAccessor;
}
Expand Down
Loading

0 comments on commit ff860db

Please sign in to comment.