From 92112464d828d0cbcf4a0f761feb4ecf265142c0 Mon Sep 17 00:00:00 2001 From: James Berragan Date: Wed, 26 Feb 2025 12:01:53 -0800 Subject: [PATCH] Add CdcRawDirectorySpaceCleaner for tracking and cleaning up the cdc_raw directories --- .../sidecar/config/CdcConfiguration.java | 39 ++ .../config/yaml/CdcConfigurationImpl.java | 180 ++++++++ .../db/SystemViewsDatabaseAccessor.java | 82 ++++ .../sidecar/db/schema/SystemViewsSchema.java | 68 +++ .../cassandra/sidecar/metrics/CdcMetrics.java | 66 +++ .../sidecar/metrics/ServerMetrics.java | 5 + .../sidecar/metrics/ServerMetricsImpl.java | 8 + .../cassandra/sidecar/server/Server.java | 9 +- .../tasks/CdcRawDirectorySpaceCleaner.java | 437 ++++++++++++++++++ .../cassandra/sidecar/utils/CdcUtil.java | 36 +- .../cassandra/sidecar/utils/FileUtils.java | 67 +++ .../CdcRawDirectorySpaceCleanerTest.java | 173 +++++++ .../cassandra/sidecar/utils/CdcUtilTest.java | 84 ++++ .../sidecar/utils/FileUtilsTest.java | 24 + 14 files changed, 1276 insertions(+), 2 deletions(-) create mode 100644 server/src/main/java/org/apache/cassandra/sidecar/db/SystemViewsDatabaseAccessor.java create mode 100644 server/src/main/java/org/apache/cassandra/sidecar/db/schema/SystemViewsSchema.java create mode 100644 server/src/main/java/org/apache/cassandra/sidecar/metrics/CdcMetrics.java create mode 100644 server/src/main/java/org/apache/cassandra/sidecar/tasks/CdcRawDirectorySpaceCleaner.java create mode 100644 server/src/test/java/org/apache/cassandra/sidecar/tasks/CdcRawDirectorySpaceCleanerTest.java create mode 100644 server/src/test/java/org/apache/cassandra/sidecar/utils/CdcUtilTest.java diff --git a/server/src/main/java/org/apache/cassandra/sidecar/config/CdcConfiguration.java b/server/src/main/java/org/apache/cassandra/sidecar/config/CdcConfiguration.java index 7141be972..1b8b1ec53 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/config/CdcConfiguration.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/config/CdcConfiguration.java @@ -17,6 +17,8 @@ */ package org.apache.cassandra.sidecar.config; +import java.time.Duration; + import org.apache.cassandra.sidecar.common.server.utils.SecondBoundConfiguration; /** @@ -28,4 +30,41 @@ public interface CdcConfiguration * @return segment hard link cache expiration time used in {@link org.apache.cassandra.sidecar.cdc.CdcLogCache} */ SecondBoundConfiguration segmentHardLinkCacheExpiry(); + + /* CdcRawDirectorySpaceCleaner Configuration */ + + /** + * @return the cadence at which the CdcRawDirectorySpaceCleaner period task should run to check and clean-up old `cdc_raw` log segments. + */ + SecondBoundConfiguration cdcRawDirectorySpaceCleanerFrequency(); + + /** + * @return `true` if CdcRawDirectorySpaceCleaner should monitor the `cdc_raw` directory and clean up the oldest commit log segments. + */ + boolean enableCdcRawDirectoryRoutineCleanUp(); + + /** + * @return fallback value for maximum directory size in bytes for the `cdc_raw` directory when can't be read from `system_views.settings` table + */ + long fallbackCdcRawDirectoryMaxSizeBytes(); + + /** + * @return max percent usage of the cdc_raw directory before CdcRawDirectorySpaceCleaner starts removing the oldest segments. + */ + float cdcRawDirectoryMaxPercentUsage(); + + /** + * @return the critical time period in seconds that indicates the `cdc_raw` directory is not large enough to buffer this time-window of mutations. + */ + Duration cdcRawDirectoryCriticalBufferWindow(); + + /** + * @return the low time period in seconds that indicates the `cdc_raw` directory is not large enough to buffer this time-window of mutations. + */ + Duration cdcRawDirectoryLowBufferWindow(); + + /** + * @return the time period which the CdcRawDirectorySpaceCleaner should cache the cdc_total_space before refreshing. + */ + Duration cacheMaxUsage(); } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/CdcConfigurationImpl.java b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/CdcConfigurationImpl.java index 41364a815..3485a956c 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/CdcConfigurationImpl.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/config/yaml/CdcConfigurationImpl.java @@ -17,6 +17,8 @@ */ package org.apache.cassandra.sidecar.config.yaml; +import java.time.Duration; +import java.time.temporal.ChronoUnit; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; @@ -36,16 +38,81 @@ public class CdcConfigurationImpl implements CdcConfiguration public static final SecondBoundConfiguration DEFAULT_SEGMENT_HARD_LINK_CACHE_EXPIRY = SecondBoundConfiguration.parse("5m"); + public static final String CDC_RAW_CLEANER_FREQUENCY_PROPERTY = "cdc_raw_cleaner_frequency"; + public static final SecondBoundConfiguration DEFAULT_CDC_RAW_CLEANER_FREQUENCY = + SecondBoundConfiguration.parse("1m"); + + public static final String ENABLE_CDC_RAW_CLEANER_PROPERTY = "enable_cdc_raw_cleaner"; + public static final boolean DEFAULT_ENABLE_CDC_RAW_CLEANER_PROPERTY = true; + + public static final String FALLBACK_CDC_RAW_MAX_DIRECTORY_SIZE_BYTES = "fallback_cdc_raw_max_directory_size_bytes"; + public static final long DEFAULT_FALLBACK_CDC_RAW_MAX_DIRECTORY_SIZE_BYTES = 1L << 31; // 2 GiB + + public static final String CDC_RAW_MAX_DIRECTORY_MAX_PERCENT = "cdc_raw_max_directory_max_percent"; + public static final float DEFAULT_CDC_RAW_MAX_DIRECTORY_MAX_PERCENT = 1.0f; + + public static final String CDC_RAW_MAX_CRITICAL_BUFFER_WINDOW = "cdc_raw_critical_buffer_window"; + public static final Duration DEFAULT_CDC_RAW_MAX_CRITICAL_BUFFER_WINDOW = Duration.of(15, ChronoUnit.MINUTES); + + public static final String CDC_RAW_MAX_LOW_BUFFER_WINDOW = "cdc_raw_low_buffer_window"; + public static final Duration DEFAULT_CDC_RAW_MAX_LOW_BUFFER_WINDOW = Duration.of(60, ChronoUnit.MINUTES); + + public static final String CDC_CACHE_MAX_USAGE_DURATION = "cdc_raw_cache_max_usage_duration"; + public static final Duration DEFAULT_CDC_CACHE_MAX_USAGE_DURATION = Duration.of(15, ChronoUnit.MINUTES); + + protected SecondBoundConfiguration segmentHardLinkCacheExpiry; + protected SecondBoundConfiguration cdcRawCleanerFrequency; + protected boolean enableCdcRawCleaner; + protected long fallbackCdcRawMaxDirectorySize; + protected float cdcRawMaxPercent; + protected Duration cdcRawCriticalBufferWindow; + protected Duration cdcRawLowBufferWindow; + protected Duration cacheMaxUsage; public CdcConfigurationImpl() { this.segmentHardLinkCacheExpiry = DEFAULT_SEGMENT_HARD_LINK_CACHE_EXPIRY; + this.cdcRawCleanerFrequency = DEFAULT_CDC_RAW_CLEANER_FREQUENCY; + this.enableCdcRawCleaner = DEFAULT_ENABLE_CDC_RAW_CLEANER_PROPERTY; + this.fallbackCdcRawMaxDirectorySize = DEFAULT_FALLBACK_CDC_RAW_MAX_DIRECTORY_SIZE_BYTES; + this.cdcRawMaxPercent = DEFAULT_CDC_RAW_MAX_DIRECTORY_MAX_PERCENT; + this.cdcRawCriticalBufferWindow = DEFAULT_CDC_RAW_MAX_CRITICAL_BUFFER_WINDOW; + this.cdcRawLowBufferWindow = DEFAULT_CDC_RAW_MAX_LOW_BUFFER_WINDOW; + this.cacheMaxUsage = DEFAULT_CDC_CACHE_MAX_USAGE_DURATION; } public CdcConfigurationImpl(SecondBoundConfiguration segmentHardLinkCacheExpiry) + { + this( + segmentHardLinkCacheExpiry, + DEFAULT_CDC_RAW_CLEANER_FREQUENCY, + DEFAULT_ENABLE_CDC_RAW_CLEANER_PROPERTY, + DEFAULT_FALLBACK_CDC_RAW_MAX_DIRECTORY_SIZE_BYTES, + DEFAULT_CDC_RAW_MAX_DIRECTORY_MAX_PERCENT, + DEFAULT_CDC_RAW_MAX_CRITICAL_BUFFER_WINDOW, + DEFAULT_CDC_RAW_MAX_LOW_BUFFER_WINDOW, + DEFAULT_CDC_CACHE_MAX_USAGE_DURATION + ); + } + + public CdcConfigurationImpl(SecondBoundConfiguration segmentHardLinkCacheExpiry, + SecondBoundConfiguration cdcRawCleanerFrequency, + boolean enableCdcRawCleaner, + long cdcRawMaxDirectorySize, + float cdcRawMaxPercent, + Duration cdcRawCriticalBufferWindow, + Duration cdcRawLowBufferWindow, + Duration cacheMaxUsage) { this.segmentHardLinkCacheExpiry = segmentHardLinkCacheExpiry; + this.cdcRawCleanerFrequency = cdcRawCleanerFrequency; + this.enableCdcRawCleaner = enableCdcRawCleaner; + this.fallbackCdcRawMaxDirectorySize = cdcRawMaxDirectorySize; + this.cdcRawMaxPercent = cdcRawMaxPercent; + this.cdcRawCriticalBufferWindow = cdcRawCriticalBufferWindow; + this.cdcRawLowBufferWindow = cdcRawLowBufferWindow; + this.cacheMaxUsage = cacheMaxUsage; } @Override @@ -74,4 +141,117 @@ public void setSegmentHardLinkCacheExpiryInSecs(long segmentHardlinkCacheExpiryI LOGGER.warn("'segment_hardlink_cache_expiry_in_secs' is deprecated, use 'segment_hardlink_cache_expiry' instead"); setSegmentHardLinkCacheExpiry(new SecondBoundConfiguration(segmentHardlinkCacheExpiryInSecs, TimeUnit.SECONDS)); } + + /** + * {@inheritDoc} + */ + @Override + @JsonProperty(value = CDC_RAW_CLEANER_FREQUENCY_PROPERTY) + public SecondBoundConfiguration cdcRawDirectorySpaceCleanerFrequency() + { + return cdcRawCleanerFrequency; + } + + @JsonProperty(value = CDC_RAW_CLEANER_FREQUENCY_PROPERTY) + public void setCdcRawDirectorySpaceCleanerFrequency(SecondBoundConfiguration cdcRawCleanerFrequency) + { + this.cdcRawCleanerFrequency = cdcRawCleanerFrequency; + } + + /** + * {@inheritDoc} + */ + @Override + @JsonProperty(value = ENABLE_CDC_RAW_CLEANER_PROPERTY) + public boolean enableCdcRawDirectoryRoutineCleanUp() + { + return enableCdcRawCleaner; + } + + @JsonProperty(value = ENABLE_CDC_RAW_CLEANER_PROPERTY) + public void setEnableCdcRawCleaner(boolean enableCdcRawCleaner) + { + this.enableCdcRawCleaner = enableCdcRawCleaner; + } + + + /** + * {@inheritDoc} + */ + @Override + @JsonProperty(value = FALLBACK_CDC_RAW_MAX_DIRECTORY_SIZE_BYTES) + public long fallbackCdcRawDirectoryMaxSizeBytes() + { + return fallbackCdcRawMaxDirectorySize; + } + + @JsonProperty(value = FALLBACK_CDC_RAW_MAX_DIRECTORY_SIZE_BYTES) + public void setFallbackCdcRawDirectoryMaxSizeBytes(long fallbackCdcRawMaxDirectorySize) + { + this.fallbackCdcRawMaxDirectorySize = fallbackCdcRawMaxDirectorySize; + } + + /** + * {@inheritDoc} + */ + @Override + @JsonProperty(value = CDC_RAW_MAX_DIRECTORY_MAX_PERCENT) + public float cdcRawDirectoryMaxPercentUsage() + { + return cdcRawMaxPercent; + } + + @JsonProperty(value = CDC_RAW_MAX_DIRECTORY_MAX_PERCENT) + public void setCdcRawDirectoryMaxPercentUsage(long cdcRawMaxPercent) + { + this.cdcRawMaxPercent = cdcRawMaxPercent; + } + + /** + * {@inheritDoc} + */ + @Override + @JsonProperty(value = CDC_RAW_MAX_CRITICAL_BUFFER_WINDOW) + public Duration cdcRawDirectoryCriticalBufferWindow() + { + return cdcRawCriticalBufferWindow; + } + + @JsonProperty(value = CDC_RAW_MAX_CRITICAL_BUFFER_WINDOW) + public void setCdcRawDirectoryCriticalBufferWindow(Duration cdcRawCriticalBufferWindow) + { + this.cdcRawCriticalBufferWindow = cdcRawCriticalBufferWindow; + } + + /** + * {@inheritDoc} + */ + @Override + @JsonProperty(value = CDC_RAW_MAX_LOW_BUFFER_WINDOW) + public Duration cdcRawDirectoryLowBufferWindow() + { + return cdcRawLowBufferWindow; + } + + @JsonProperty(value = CDC_RAW_MAX_LOW_BUFFER_WINDOW) + public void setCdcRawDirectoryLowBufferWindow(Duration cdcRawLowBufferWindow) + { + this.cdcRawLowBufferWindow = cdcRawLowBufferWindow; + } + + /** + * {@inheritDoc} + */ + @JsonProperty(value = CDC_CACHE_MAX_USAGE_DURATION) + @Override + public Duration cacheMaxUsage() + { + return cacheMaxUsage; + } + + @JsonProperty(value = CDC_CACHE_MAX_USAGE_DURATION) + public void setCacheMaxUsage(Duration cacheMaxUsage) + { + this.cacheMaxUsage = cacheMaxUsage; + } } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/db/SystemViewsDatabaseAccessor.java b/server/src/main/java/org/apache/cassandra/sidecar/db/SystemViewsDatabaseAccessor.java new file mode 100644 index 000000000..1fa917cc9 --- /dev/null +++ b/server/src/main/java/org/apache/cassandra/sidecar/db/SystemViewsDatabaseAccessor.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.db; + +import com.datastax.driver.core.BoundStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.google.inject.Inject; +import com.google.inject.Singleton; +import org.apache.cassandra.sidecar.common.server.CQLSessionProvider; +import org.apache.cassandra.sidecar.db.schema.SystemViewsSchema; +import org.apache.cassandra.sidecar.exceptions.SchemaUnavailableException; +import org.apache.cassandra.sidecar.utils.FileUtils; +import org.jetbrains.annotations.Nullable; + +/** + * Database Accessor that queries cassandra to get information maintained under system_auth keyspace. + */ +@Singleton +public class SystemViewsDatabaseAccessor extends DatabaseAccessor +{ + static final String YAML_PROP_PREVIOUS = "cdc_total_space_in_mb"; + static final String YAML_PROP_CURRENT = "cdc_total_space"; + + @Inject + public SystemViewsDatabaseAccessor(SystemViewsSchema systemViewsSchema, + CQLSessionProvider sessionProvider) + { + super(systemViewsSchema, sessionProvider); + } + + @Nullable + public Long getCdcTotalSpaceSetting() throws SchemaUnavailableException + { + // attempt to parse old 'cdc_total_space_in_mb' prop + String cdcTotalSpaceInMb = getSetting(YAML_PROP_PREVIOUS); + if (cdcTotalSpaceInMb != null) + { + return FileUtils.mbStringToBytes(cdcTotalSpaceInMb); + } + + // otherwise parse current 'cdc_total_space' prop + String storageStringToBytes = getSetting(YAML_PROP_CURRENT); + if (storageStringToBytes != null) + { + return FileUtils.storageStringToBytes(storageStringToBytes); + } + + return null; + } + + /** + * Load a setting value from the `system_views.settings` table. + * + * @param name name of setting + * @return setting value for a given `name` loaded from the `system_views.settings` table. + */ + @Nullable + public String getSetting(String name) throws SchemaUnavailableException + { + BoundStatement statement = tableSchema.selectSettings().bind(name); + ResultSet result = execute(statement); + Row row = result.one(); + return row != null && !row.isNull(0) ? row.getString(0) : null; + } +} diff --git a/server/src/main/java/org/apache/cassandra/sidecar/db/schema/SystemViewsSchema.java b/server/src/main/java/org/apache/cassandra/sidecar/db/schema/SystemViewsSchema.java new file mode 100644 index 000000000..d1ff2615d --- /dev/null +++ b/server/src/main/java/org/apache/cassandra/sidecar/db/schema/SystemViewsSchema.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.db.schema; + +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.Session; +import com.google.inject.Singleton; +import org.apache.cassandra.sidecar.exceptions.SchemaUnavailableException; +import org.jetbrains.annotations.NotNull; + +/** + * Schema for getting information stored in system_views keyspace. + */ +@Singleton +public class SystemViewsSchema extends CassandraSystemTableSchema +{ + protected static final String SYSTEM_VIEWS_KEYSPACE_NAME = "system_views"; + protected static final String SYSTEM_VIEWS_SETTINGS_TABLE_NAME = "settings"; + private PreparedStatement selectSettings; + + @Override + protected String keyspaceName() + { + return SYSTEM_VIEWS_KEYSPACE_NAME; + } + + @Override + protected String tableName() + { + return SYSTEM_VIEWS_SETTINGS_TABLE_NAME; + } + + public PreparedStatement selectSettings() throws SchemaUnavailableException + { + ensureSchemaAvailable(); + return selectSettings; + } + + @Override + protected void prepareStatements(@NotNull Session session) + { + this.selectSettings = session.prepare("SELECT value FROM system_views.settings WHERE name = ?"); + } + + protected void ensureSchemaAvailable() throws SchemaUnavailableException + { + if (selectSettings == null) + { + throw new SchemaUnavailableException(keyspaceName(), tableName()); + } + } +} diff --git a/server/src/main/java/org/apache/cassandra/sidecar/metrics/CdcMetrics.java b/server/src/main/java/org/apache/cassandra/sidecar/metrics/CdcMetrics.java new file mode 100644 index 000000000..655dc4921 --- /dev/null +++ b/server/src/main/java/org/apache/cassandra/sidecar/metrics/CdcMetrics.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.metrics; + +import java.util.Objects; +import java.util.function.Function; + +import com.codahale.metrics.DefaultSettableGauge; +import com.codahale.metrics.Metric; +import com.codahale.metrics.MetricRegistry; + +import static org.apache.cassandra.sidecar.metrics.ServerMetrics.SERVER_PREFIX; + +/** + * Tracks metrics related to cdc functionality provided by Sidecar. + */ +public class CdcMetrics +{ + public static final String DOMAIN = SERVER_PREFIX + ".Cdc"; + protected final MetricRegistry metricRegistry; + public final NamedMetric cdcRawCleanerFailed; + public final NamedMetric orphanedIdx; + public final NamedMetric> oldestSegmentAge; + public final NamedMetric> totalConsumedCdcBytes; + public final NamedMetric> totalCdcSpaceUsed; + public final NamedMetric deletedSegment; + public final NamedMetric lowCdcRawSpace; + public final NamedMetric criticalCdcRawSpace; + + public CdcMetrics(MetricRegistry metricRegistry) + { + this.metricRegistry = Objects.requireNonNull(metricRegistry, "Metric registry can not be null"); + this.cdcRawCleanerFailed = createMetric("CdcRawCleanerFailed", name -> metricRegistry.gauge(name, DeltaGauge::new)); + this.totalConsumedCdcBytes = createMetric("CdcRawTotalConsumedCdcBytes", name -> metricRegistry.gauge(name, () -> new DefaultSettableGauge<>(0L))); + this.totalCdcSpaceUsed = createMetric("CdcRawTotalCdcSpaceUsed", name -> metricRegistry.gauge(name, () -> new DefaultSettableGauge<>(0L))); + this.orphanedIdx = createMetric("CdcRawOrphanedIdxFile", name -> metricRegistry.gauge(name, DeltaGauge::new)); + this.deletedSegment = createMetric("CdcRawDeletedSegment", name -> metricRegistry.gauge(name, DeltaGauge::new)); + this.oldestSegmentAge = createMetric("CdcRawOldestSegmentAgeSeconds", name -> metricRegistry.gauge(name, () -> new DefaultSettableGauge<>(0))); + this.lowCdcRawSpace = createMetric("CdcRawLowSpace", name -> metricRegistry.gauge(name, DeltaGauge::new)); + this.criticalCdcRawSpace = createMetric("CdcRawCriticalSpace", name -> metricRegistry.gauge(name, DeltaGauge::new)); + } + + private NamedMetric createMetric(String simpleName, Function metricCreator) + { + return NamedMetric.builder(metricCreator) + .withDomain(DOMAIN) + .withName(simpleName) + .build(); + } +} diff --git a/server/src/main/java/org/apache/cassandra/sidecar/metrics/ServerMetrics.java b/server/src/main/java/org/apache/cassandra/sidecar/metrics/ServerMetrics.java index 0fb32eb41..1514dd456 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/metrics/ServerMetrics.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/metrics/ServerMetrics.java @@ -58,4 +58,9 @@ public interface ServerMetrics * @return metrics related to coordination functionality that are tracked */ CoordinationMetrics coordination(); + + /** + * @return metrics tracked by server for cdc functionality. + */ + CdcMetrics cdc(); } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/metrics/ServerMetricsImpl.java b/server/src/main/java/org/apache/cassandra/sidecar/metrics/ServerMetricsImpl.java index 75b0a2482..f415cb869 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/metrics/ServerMetricsImpl.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/metrics/ServerMetricsImpl.java @@ -34,6 +34,7 @@ public class ServerMetricsImpl implements ServerMetrics protected final SchemaMetrics schemaMetrics; protected final CacheMetrics cacheMetrics; protected final CoordinationMetrics coordinationMetrics; + protected final CdcMetrics cdcMetrics; public ServerMetricsImpl(MetricRegistry metricRegistry) { @@ -45,6 +46,7 @@ public ServerMetricsImpl(MetricRegistry metricRegistry) this.schemaMetrics = new SchemaMetrics(metricRegistry); this.cacheMetrics = new CacheMetrics(metricRegistry); this.coordinationMetrics = new CoordinationMetrics(metricRegistry); + this.cdcMetrics = new CdcMetrics(metricRegistry); } @Override @@ -82,4 +84,10 @@ public CoordinationMetrics coordination() { return coordinationMetrics; } + + @Override + public CdcMetrics cdc() + { + return cdcMetrics; + } } diff --git a/server/src/main/java/org/apache/cassandra/sidecar/server/Server.java b/server/src/main/java/org/apache/cassandra/sidecar/server/Server.java index e8da3f73f..a09204a8a 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/server/Server.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/server/Server.java @@ -54,10 +54,12 @@ import org.apache.cassandra.sidecar.config.SidecarConfiguration; import org.apache.cassandra.sidecar.config.SslConfiguration; import org.apache.cassandra.sidecar.metrics.SidecarMetrics; +import org.apache.cassandra.sidecar.tasks.CdcRawDirectorySpaceCleaner; import org.apache.cassandra.sidecar.tasks.HealthCheckPeriodicTask; import org.apache.cassandra.sidecar.tasks.KeyStoreCheckPeriodicTask; import org.apache.cassandra.sidecar.tasks.PeriodicTaskExecutor; import org.apache.cassandra.sidecar.utils.SslUtils; +import org.apache.cassandra.sidecar.utils.TimeProvider; import org.jetbrains.annotations.VisibleForTesting; import static org.apache.cassandra.sidecar.server.SidecarServerEvents.ON_ALL_CASSANDRA_CQL_READY; @@ -79,6 +81,7 @@ public class Server protected final PeriodicTaskExecutor periodicTaskExecutor; protected final HttpServerOptionsProvider optionsProvider; protected final SidecarMetrics metrics; + protected final CdcRawDirectorySpaceCleaner cdcRawDirectorySpaceCleaner; protected final List deployedServerVerticles = new CopyOnWriteArrayList<>(); // Keeps track of all the Cassandra instance identifiers where CQL is ready private final Set cqlReadyInstanceIds = Collections.synchronizedSet(new HashSet<>()); @@ -91,7 +94,8 @@ public Server(Vertx vertx, ExecutorPools executorPools, PeriodicTaskExecutor periodicTaskExecutor, HttpServerOptionsProvider optionsProvider, - SidecarMetrics metrics) + SidecarMetrics metrics, + CdcRawDirectorySpaceCleaner cdcRawDirectorySpaceCleaner) { this.vertx = vertx; this.executorPools = executorPools; @@ -101,6 +105,7 @@ public Server(Vertx vertx, this.periodicTaskExecutor = periodicTaskExecutor; this.optionsProvider = optionsProvider; this.metrics = metrics; + this.cdcRawDirectorySpaceCleaner = cdcRawDirectorySpaceCleaner; } /** @@ -307,7 +312,9 @@ protected Future scheduleInternalPeriodicTasks(String deploymentId) instancesMetadata, executorPools, metrics); + periodicTaskExecutor.schedule(healthCheckPeriodicTask); + periodicTaskExecutor.schedule(cdcRawDirectorySpaceCleaner); vertx.eventBus().localConsumer(ON_SERVER_STOP.address(), message -> periodicTaskExecutor.unschedule(healthCheckPeriodicTask)); diff --git a/server/src/main/java/org/apache/cassandra/sidecar/tasks/CdcRawDirectorySpaceCleaner.java b/server/src/main/java/org/apache/cassandra/sidecar/tasks/CdcRawDirectorySpaceCleaner.java new file mode 100644 index 000000000..4e8e2bcae --- /dev/null +++ b/server/src/main/java/org/apache/cassandra/sidecar/tasks/CdcRawDirectorySpaceCleaner.java @@ -0,0 +1,437 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.tasks; + +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import com.google.common.collect.Sets; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.inject.Inject; +import com.google.inject.Singleton; +import io.vertx.core.Promise; +import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; +import org.apache.cassandra.sidecar.common.server.utils.DurationSpec; +import org.apache.cassandra.sidecar.config.CdcConfiguration; +import org.apache.cassandra.sidecar.db.SystemViewsDatabaseAccessor; +import org.apache.cassandra.sidecar.exceptions.SchemaUnavailableException; +import org.apache.cassandra.sidecar.metrics.CdcMetrics; +import org.apache.cassandra.sidecar.metrics.SidecarMetrics; +import org.apache.cassandra.sidecar.utils.CdcUtil; +import org.apache.cassandra.sidecar.utils.FileUtils; +import org.apache.cassandra.sidecar.utils.TimeProvider; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.cassandra.sidecar.utils.CdcUtil.isLogFile; +import static org.apache.cassandra.sidecar.utils.CdcUtil.parseSegmentId; + +/** + * PeriodTask to monitor and remove the oldest commit log segments in the `cdc_raw` directory when the space used hits the `cdc_total_space` limit set in the yaml file. + */ +@Singleton +public class CdcRawDirectorySpaceCleaner implements PeriodicTask +{ + private static final Logger LOGGER = LoggerFactory.getLogger(CdcRawDirectorySpaceCleaner.class); + + public static final String CDC_DIR_NAME = "cdc_raw"; + + private final TimeProvider timeProvider; + private final SystemViewsDatabaseAccessor systemViewsDatabaseAccessor; + private final CdcConfiguration cdcConfiguration; + private final InstanceMetadata instanceMetadata; + private final CdcMetrics cdcMetrics; + + @Nullable + private volatile Long maxUsageBytes = null; // lazily loaded from system_views.settings if available + private volatile Long maxUsageLastReadNanos = null; + // cdc file -> file size in bytes. It memorizes the file set of the last time the checker runs. + private volatile Map priorCdcFiles = new HashMap<>(); + + @Inject + public CdcRawDirectorySpaceCleaner(TimeProvider timeProvider, + SystemViewsDatabaseAccessor systemViewsDatabaseAccessor, + CdcConfiguration cdcConfiguration, + InstanceMetadata instanceMetadata, + SidecarMetrics metrics) + { + this.timeProvider = timeProvider; + this.systemViewsDatabaseAccessor = systemViewsDatabaseAccessor; + this.cdcConfiguration = cdcConfiguration; + this.instanceMetadata = instanceMetadata; + this.cdcMetrics = metrics.server().cdc(); + } + + @Override + public DurationSpec delay() + { + return cdcConfiguration.cdcRawDirectorySpaceCleanerFrequency(); + } + + @Override + public void execute(Promise promise) + { + try + { + routineCleanUp(); + promise.tryComplete(); + } + catch (Throwable t) + { + LOGGER.warn("Failed to perform routine clean-up of cdc_raw directory", t); + cdcMetrics.cdcRawCleanerFailed.metric.update(1L); + promise.fail(t); + } + } + + /** + * @return true if we need to refresh the cached `cdc_total_space` value. + */ + protected boolean shouldRefreshCachedMaxUsage() + { + return maxUsageLastReadNanos == null || + (System.nanoTime() - maxUsageLastReadNanos) >= cdcConfiguration.cacheMaxUsage().toNanos(); + } + + protected long maxUsage() + { + if (!shouldRefreshCachedMaxUsage()) + { + return Objects.requireNonNull(maxUsageBytes, "maxUsageBytes cannot be null if maxUsageLastReadNanos is non-null"); + } + + try + { + Long newValue = systemViewsDatabaseAccessor.getCdcTotalSpaceSetting(); + if (newValue != null) + { + if (!newValue.equals(maxUsageBytes)) + { + LOGGER.info("Change in cdc_total_space from system_views.settings prev={} latest={}", maxUsageBytes, newValue); + this.maxUsageBytes = newValue; + this.maxUsageLastReadNanos = System.nanoTime(); + return newValue; + } + } + } + catch (SchemaUnavailableException e) + { + LOGGER.debug("Could not read cdc_total_space from system_views.settings", e); + } + catch (Throwable t) + { + LOGGER.warn("Error reading cdc_total_space from system_views.settings", t); + } + + LOGGER.warn("Could not read cdc_total_space from system_views.settings, falling back to props"); + return cdcConfiguration.fallbackCdcRawDirectoryMaxSizeBytes(); + } + + protected void routineCleanUp() + { + if (!cdcConfiguration.enableCdcRawDirectoryRoutineCleanUp()) + { + LOGGER.debug("Skipping CdcRawDirectorySpaceCleaner: feature is disabled "); + return; + } + + List dataDirectories = instanceMetadata.dataDirs(); + dataDirectories.stream() + .map(dir -> new File(dir, CDC_DIR_NAME)) + .forEach(this::cleanUpCdcRawDirectory); + } + + protected void cleanUpCdcRawDirectory(File cdcRawDirectory) + { + if (!cdcRawDirectory.exists() || !cdcRawDirectory.isDirectory()) + { + LOGGER.debug("Skipping CdcRawDirectorySpaceCleaner: CDC directory does not exist: " + cdcRawDirectory); + return; + } + + List segmentFiles = Optional + .ofNullable(cdcRawDirectory.listFiles(this::validSegmentFilter)) + .map(files -> Arrays.stream(files) + .map(CdcRawSegmentFile::new) + .filter(CdcRawSegmentFile::indexExists) + .collect(Collectors.toList()) + ) + .orElseGet(List::of); + publishCdcStats(segmentFiles); + if (segmentFiles.size() < 2) + { + LOGGER.debug("Skipping cdc data cleaner routine cleanup: No cdc data or only one single cdc segment is found."); + return; + } + + long directorySize = FileUtils.directorySize(cdcRawDirectory); + long upperLimitBytes = (long) (maxUsage() * cdcConfiguration.cdcRawDirectoryMaxPercentUsage()); + // Sort the files by segmentId to delete commit log segments in write order + // The latest file is the current active segment, but it could be created before the retention duration, e.g. slow data ingress + Collections.sort(segmentFiles); + long nowInMillis = timeProvider.currentTimeMillis(); + + // track the age of the oldest commit log segment to give indication of the time-window buffer available + cdcMetrics.oldestSegmentAge.metric.setValue((int) MILLISECONDS.toMinutes(nowInMillis - segmentFiles.get(0).lastModified())); + + if (directorySize > upperLimitBytes) + { + if (segmentFiles.get(0).segmentId > segmentFiles.get(1).segmentId) + { + LOGGER.error("Cdc segments sorted incorrectly {} before {}", segmentFiles.get(0).segmentId, segmentFiles.get(1).segmentId); + } + + long criticalMillis = cdcConfiguration.cdcRawDirectoryCriticalBufferWindow().toMillis(); + long lowMillis = cdcConfiguration.cdcRawDirectoryLowBufferWindow().toMillis(); + + int i = 0; + while (i < segmentFiles.size() - 1 && directorySize > upperLimitBytes) + { + CdcRawSegmentFile segment = segmentFiles.get(i); + long ageMillis = nowInMillis - segment.lastModified(); + + if (ageMillis < criticalMillis) + { + LOGGER.error("Insufficient Cdc buffer size to maintain {}-minute window segment={} maxSize={} ageMinutes={}", + MILLISECONDS.toMinutes(criticalMillis), segment, upperLimitBytes, MILLISECONDS.toMinutes(ageMillis)); + cdcMetrics.criticalCdcRawSpace.metric.update(1); + } + else if (ageMillis < lowMillis) + { + LOGGER.warn("Insufficient Cdc buffer size to maintain {}-minute window segment={} maxSize={} ageMinutes={}", + MILLISECONDS.toMinutes(lowMillis), segment, upperLimitBytes, MILLISECONDS.toMinutes(ageMillis)); + cdcMetrics.lowCdcRawSpace.metric.update(1); + } + long length = 0; + try + { + length = deleteSegment(segment); + cdcMetrics.deletedSegment.metric.update(length); + } + catch (IOException e) + { + LOGGER.warn("Failed to delete cdc segment", e); + } + directorySize -= length; + i++; + } + } + + try + { + cleanupOrphanedIdxFiles(cdcRawDirectory); + } + catch (IOException e) + { + LOGGER.warn("Failed to clean up orphaned idx files", e); + } + } + + protected boolean validSegmentFilter(File file) + { + return file.isFile() && isLogFile(file.getName()); + } + + protected long deleteSegment(CdcRawSegmentFile segment) throws IOException + { + final long numBytes = segment.length() + segment.indexLength(); + LOGGER.info("Deleting Cdc segment path={} lastModified={} numBytes={}", segment, segment.lastModified(), numBytes); + Files.deleteIfExists(segment.path()); + Files.deleteIfExists(segment.indexPath()); + return numBytes; + } + + // runs optionally if detects orphaned and old index files + private void cleanupOrphanedIdxFiles(File cdcDir) throws IOException + { + final File[] indexFiles = cdcDir.listFiles(f -> f.isFile() && CdcUtil.isValidIdxFile(f.getName())); + if (indexFiles == null || indexFiles.length == 0) + return; // exit early when finding no index files + + final File[] cdcSegments = cdcDir.listFiles(f -> f.isFile() && CdcUtil.isLogFile(f.getName())); + Set cdcFileNames = Set.of(); + if (cdcSegments != null) + { + cdcFileNames = new HashSet<>(cdcSegments.length); + for (File f : cdcSegments) + { + cdcFileNames.add(f.getName()); + } + } + + // now, delete all old index files that have no corresponding log files. + for (File idxFile : indexFiles) + { + final String cdcFileName = CdcUtil.idxToLogFileName(idxFile.getName()); + if (!cdcFileNames.contains(cdcFileName)) + { // found an orphaned index file + LOGGER.warn("Orphaned Cdc idx file found with no corresponding Cdc segment path={}", idxFile.toPath()); + cdcMetrics.orphanedIdx.metric.update(1L); + Files.deleteIfExists(idxFile.toPath()); + } + } + } + + private void publishCdcStats(@Nullable List cdcFiles) + { + // no cdc data consumed or exist + boolean noCdcFiles = cdcFiles == null || cdcFiles.isEmpty(); + if (noCdcFiles && priorCdcFiles.isEmpty()) + return; + + Map currentFiles; + long totalCurrentBytes = 0L; + if (noCdcFiles) + { + currentFiles = new HashMap<>(); + } + else + { + currentFiles = new HashMap<>(cdcFiles.size()); + for (CdcRawSegmentFile segment : cdcFiles) + { + if (segment.exists()) + { + long len = segment.length(); + currentFiles.put(segment, len); + totalCurrentBytes += len; + } + } + } + + // skip publishing. there is no cdc data consumed and no data exist. + if (totalCurrentBytes == 0L && priorCdcFiles.isEmpty()) + { + priorCdcFiles = currentFiles; + return; + } + + // consumed files is the files exist in the prior round but now are deleted. + Set consumedFiles = Sets.difference(priorCdcFiles.keySet(), currentFiles.keySet()); + long totalConsumedBytes = consumedFiles.stream().map(priorCdcFiles::get).reduce(0L, Long::sum); + priorCdcFiles.clear(); + priorCdcFiles = currentFiles; + cdcMetrics.totalConsumedCdcBytes.metric.setValue(totalConsumedBytes); + cdcMetrics.totalCdcSpaceUsed.metric.setValue(totalCurrentBytes); + } + + /** + * Helper class for the CdcRawDirectorySpaceCleaner to track log segment files and associated idx file in the cdc_raw directory + */ + protected static class CdcRawSegmentFile implements Comparable + { + private final File file; + private final File indexFile; + private final long segmentId; + private final long len; + + CdcRawSegmentFile(File logFile) + { + this.file = logFile; + final String name = logFile.getName(); + this.segmentId = parseSegmentId(name); + this.len = logFile.length(); + this.indexFile = CdcUtil.getIdxFile(logFile); + } + + public boolean exists() + { + return file.exists(); + } + + public boolean indexExists() + { + return indexFile.exists(); + } + + public long length() + { + return len; + } + + public long indexLength() + { + return indexFile.length(); + } + + public long lastModified() + { + return file.lastModified(); + } + + public Path path() + { + return file.toPath(); + } + + public Path indexPath() + { + return indexFile.toPath(); + } + + @Override + public int compareTo(@NotNull CdcRawSegmentFile o) + { + return Long.compare(segmentId, o.segmentId); + } + + @Override + public int hashCode() + { + return file.hashCode(); + } + + @Override + public boolean equals(Object other) + { + if (this == other) + { + return true; + } + if (other == null || this.getClass() != other.getClass()) + { + return false; + } + + CdcRawSegmentFile that = (CdcRawSegmentFile) other; + return file.equals(that.file); + } + + @Override + public String toString() + { + return file.getAbsolutePath(); + } + } +} diff --git a/server/src/main/java/org/apache/cassandra/sidecar/utils/CdcUtil.java b/server/src/main/java/org/apache/cassandra/sidecar/utils/CdcUtil.java index 91aa30297..d7ca8d9d1 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/utils/CdcUtil.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/utils/CdcUtil.java @@ -22,6 +22,7 @@ import java.io.IOException; import java.nio.file.Files; import java.util.List; +import java.util.regex.Matcher; import java.util.regex.Pattern; /** @@ -37,7 +38,8 @@ public final class CdcUtil private static final int IDX_FILE_EXTENSION_LENGTH = IDX_FILE_EXTENSION.length(); private static final String LOG_FILE_COMPLETE_INDICATOR = "COMPLETED"; private static final String FILENAME_EXTENSION = "(" + IDX_FILE_EXTENSION + "|" + LOG_FILE_EXTENSION + ")"; - private static final Pattern SEGMENT_PATTERN = Pattern.compile(FILENAME_PREFIX + "((\\d+)(" + SEPARATOR + "\\d+)?)" + FILENAME_EXTENSION); + static final Pattern SEGMENT_PATTERN = Pattern.compile(FILENAME_PREFIX + "(?:\\d+" + SEPARATOR + ")?" + "(\\d+)" + FILENAME_EXTENSION); + public static final Pattern IDX_FILE_PATTERN = Pattern.compile(FILENAME_PREFIX + "(?:\\d+" + SEPARATOR + ")?" + "(\\d+)" + IDX_FILE_EXTENSION); private static final int READ_INDEX_FILE_MAX_RETRY = 5; @@ -102,6 +104,28 @@ public static CdcIndex parseIndexFile(File indexFile, long segmentFileLength) th return new CdcIndex(latestPosition, isCompleted); } + /** + * @param idxFileName Commit log segment idx filename + * @return log segment filename for associated idx file + */ + public static String idxToLogFileName(String idxFileName) + { + return idxFileName.substring(0, idxFileName.length() - IDX_FILE_EXTENSION.length()) + LOG_FILE_EXTENSION; + } + + public static long parseSegmentId(String name) + { + final Matcher matcher = SEGMENT_PATTERN.matcher(name); + if (matcher.matches()) + { + return Long.parseLong(matcher.group(1)); + } + else + { + throw new IllegalStateException("Invalid CommitLog name: " + name); + } + } + /** * Class representing Cdc index */ @@ -119,6 +143,7 @@ public CdcIndex(long latestFlushPosition, boolean isCompleted) /** * Validate for the cdc (log or index) file name.see {@link SEGMENT_PATTERN} for the format + * * @param fileName name of the file * @return true if the name is valid; otherwise, false */ @@ -132,6 +157,15 @@ public static boolean isLogFile(String fileName) return isValid(fileName) && fileName.endsWith(LOG_FILE_EXTENSION); } + /** + * @param idxFileName name of the file. + * @return true if the filename is a valid cdc_raw log segment idx file + */ + public static boolean isValidIdxFile(String idxFileName) + { + return IDX_FILE_PATTERN.matcher(idxFileName).matches(); + } + public static boolean isIndexFile(String fileName) { return isValid(fileName) && matchIndexExtension(fileName); diff --git a/server/src/main/java/org/apache/cassandra/sidecar/utils/FileUtils.java b/server/src/main/java/org/apache/cassandra/sidecar/utils/FileUtils.java index 51172e83e..d8bddb75b 100644 --- a/server/src/main/java/org/apache/cassandra/sidecar/utils/FileUtils.java +++ b/server/src/main/java/org/apache/cassandra/sidecar/utils/FileUtils.java @@ -18,11 +18,19 @@ package org.apache.cassandra.sidecar.utils; +import java.io.File; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.jetbrains.annotations.Nullable; + /** * Encompasses utilities for files */ public class FileUtils { + private static final Pattern STORAGE_UNIT_PATTERN = Pattern.compile("(\\d+)(GiB|MiB|KiB|B)?"); + /** * Resolves the home directory from the input {@code directory} string when the string begins with {@code ~}. * @@ -35,4 +43,63 @@ public static String maybeResolveHomeDirectory(String directory) return directory; return System.getProperty("user.home") + directory.substring(1); } + + /** + * @param directory the directory + * @return the size in bytes of all files in a directory, non-recursively. + */ + public static long directorySize(File directory) + { + long size = 0; + final File[] files = directory.listFiles(); + if (files != null) + { + for (File file : files) + { + if (file.isFile()) + { + size += file.length(); + } + } + } + return size; + } + + public static long mbStringToBytes(String str) + { + return Long.parseLong(str) * (1 << 20); // note the prop name uses 'mb' but Cassandra parses as MiB + } + + @Nullable + public static Long storageStringToBytes(String str) + { + final Matcher matcher = STORAGE_UNIT_PATTERN.matcher(str); + if (matcher.find()) + { + return Long.parseLong(matcher.group(1)) * storageUnitToBytes(matcher.group(2)); + } + return null; + } + + public static long storageUnitToBytes(String unit) + { + if (unit == null) + { + return 1; + } + + switch (unit) + { + case "GiB": + return 1 << 30; + case "MiB": + return 1 << 20; + case "KiB": + return 1024; + case "": + case "B": + return 1; + } + throw new IllegalStateException("Unexpected data storage unit: " + unit); + } } diff --git a/server/src/test/java/org/apache/cassandra/sidecar/tasks/CdcRawDirectorySpaceCleanerTest.java b/server/src/test/java/org/apache/cassandra/sidecar/tasks/CdcRawDirectorySpaceCleanerTest.java new file mode 100644 index 000000000..692648381 --- /dev/null +++ b/server/src/test/java/org/apache/cassandra/sidecar/tasks/CdcRawDirectorySpaceCleanerTest.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.tasks; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import com.google.common.util.concurrent.Uninterruptibles; +import org.apache.commons.lang3.RandomUtils; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import com.codahale.metrics.MetricRegistry; +import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata; +import org.apache.cassandra.sidecar.config.CdcConfiguration; +import org.apache.cassandra.sidecar.config.yaml.CdcConfigurationImpl; +import org.apache.cassandra.sidecar.db.SystemViewsDatabaseAccessor; +import org.apache.cassandra.sidecar.metrics.CdcMetrics; +import org.apache.cassandra.sidecar.metrics.ServerMetrics; +import org.apache.cassandra.sidecar.metrics.SidecarMetrics; +import org.apache.cassandra.sidecar.utils.CdcUtil; +import org.apache.cassandra.sidecar.utils.TimeProvider; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Unit tests for the {@link CdcRawDirectorySpaceCleaner} + */ +public class CdcRawDirectorySpaceCleanerTest +{ + private static final MetricRegistry METRIC_REGISTRY = new MetricRegistry(); + private static final String TEST_SEGMENT_FILE_NAME_1 = "CommitLog-2-1250512736956320000.log"; + private static final String TEST_SEGMENT_FILE_NAME_2 = "CommitLog-2-1260512736956320000.log"; + private static final String TEST_SEGMENT_FILE_NAME_3 = "CommitLog-2-1340512736956320000.log"; + private static final String TEST_ORPHANED_SEGMENT_FILE_NAME = "CommitLog-2-1240512736956320000.log"; + private static final String TEST_INTACT_SEGMENT_FILE_NAME = "CommitLog-2-1340512736959990000.log"; + + @Test + public void testCdcRawDirectorySpaceCleaner(@TempDir Path tempDir) throws IOException + { + TimeProvider timeProvider = TimeProvider.DEFAULT_TIME_PROVIDER; + SystemViewsDatabaseAccessor systemViewsDatabaseAccessor = mock(SystemViewsDatabaseAccessor.class); + when(systemViewsDatabaseAccessor.getSetting(eq("cdc_total_space"))).thenReturn("1MiB"); + when(systemViewsDatabaseAccessor.getCdcTotalSpaceSetting()).thenCallRealMethod(); + CdcConfiguration cdcConfiguration = new CdcConfigurationImpl(); + + InstanceMetadata instanceMetadata = mockInstanceMetadata(tempDir); + SidecarMetrics sidecarMetrics = mock(SidecarMetrics.class); + ServerMetrics serverMetrics = mock(ServerMetrics.class); + CdcMetrics cdcMetrics = new CdcMetrics(METRIC_REGISTRY); + when(sidecarMetrics.server()).thenReturn(serverMetrics); + when(serverMetrics.cdc()).thenReturn(cdcMetrics); + CdcRawDirectorySpaceCleaner cleaner = new CdcRawDirectorySpaceCleaner(timeProvider, systemViewsDatabaseAccessor, cdcConfiguration, instanceMetadata, sidecarMetrics); + + checkExists(tempDir, TEST_ORPHANED_SEGMENT_FILE_NAME, true, false); + checkExists(tempDir, TEST_SEGMENT_FILE_NAME_1); + checkExists(tempDir, TEST_SEGMENT_FILE_NAME_2); + checkExists(tempDir, TEST_SEGMENT_FILE_NAME_3); + checkExists(tempDir, TEST_INTACT_SEGMENT_FILE_NAME, false, true); + + assertEquals(0L, cdcMetrics.criticalCdcRawSpace.metric.getValue()); + assertEquals(0L, cdcMetrics.orphanedIdx.metric.getValue()); + assertEquals(0L, cdcMetrics.deletedSegment.metric.getValue()); + + cleaner.routineCleanUp(); + + // earliest cdc segment should be deleted along with orphaned idx file + checkNotExists(tempDir, TEST_ORPHANED_SEGMENT_FILE_NAME); + checkNotExists(tempDir, TEST_SEGMENT_FILE_NAME_1); + + // latest cdc segments should still exist as long as we have free buffer space + checkExists(tempDir, TEST_SEGMENT_FILE_NAME_2); + checkExists(tempDir, TEST_SEGMENT_FILE_NAME_3); + checkExists(tempDir, TEST_INTACT_SEGMENT_FILE_NAME, false, true); + + // verify metrics match expected + assertEquals(1L, cdcMetrics.criticalCdcRawSpace.metric.getValue()); + assertEquals(1L, cdcMetrics.orphanedIdx.metric.getValue()); + assertTrue(cdcMetrics.totalCdcSpaceUsed.metric.getValue() > 2097152L); + assertTrue(cdcMetrics.deletedSegment.metric.getValue() > 2097152L); + assertEquals(0, cdcMetrics.oldestSegmentAge.metric.getValue()); + + // delete all cdc files, in order to test the scenario that we do not have current cdc file, but have cdc file in the prior round. + // We do not expect all CDC file to be cleaned up in a running system. But test it for robustness. + Files.deleteIfExists(Paths.get(tempDir.toString(), CdcRawDirectorySpaceCleaner.CDC_DIR_NAME, TEST_INTACT_SEGMENT_FILE_NAME)); + cleaner.routineCleanUp(); // it should run fine. + } + + /* test utils */ + + private static InstanceMetadata mockInstanceMetadata(Path tempDir) throws IOException + { + InstanceMetadata instanceMetadata = mock(InstanceMetadata.class); + + File cdcDir = Files.createDirectory(tempDir.resolve(CdcRawDirectorySpaceCleaner.CDC_DIR_NAME)).toFile(); + writeCdcSegment(cdcDir, TEST_ORPHANED_SEGMENT_FILE_NAME, 67108864, true, true, false); + writeCdcSegment(cdcDir, TEST_SEGMENT_FILE_NAME_1, 2097152, true); + writeCdcSegment(cdcDir, TEST_SEGMENT_FILE_NAME_2, 524288, true); + writeCdcSegment(cdcDir, TEST_SEGMENT_FILE_NAME_3, 1024, false); + + Uninterruptibles.sleepUninterruptibly(10, TimeUnit.MILLISECONDS); + writeCdcSegment(cdcDir, TEST_INTACT_SEGMENT_FILE_NAME, RandomUtils.nextInt(128, 256), false, false, true); + + when(instanceMetadata.dataDirs()).thenReturn(List.of(cdcDir.getParent())); + return instanceMetadata; + } + + private static void writeCdcSegment(File cdcDir, String filename, int size, boolean complete) throws IOException + { + writeCdcSegment(cdcDir, filename, size, complete, false, false); + } + + private static void writeCdcSegment(File cdcDir, String filename, int size, boolean complete, boolean orphaned, boolean intact) throws IOException + { + if (!orphaned) + { + final File f1 = new File(cdcDir, filename); + assertTrue(f1.createNewFile()); + Files.write(f1.toPath(), RandomUtils.nextBytes(size)); + } + + if (!intact) + { + final File f2 = new File(cdcDir, CdcUtil.getIdxFileName(filename)); + assertTrue(f2.createNewFile()); + Files.write(f2.toPath(), (size + (complete ? "\nCOMPLETED" : "")).getBytes(StandardCharsets.UTF_8)); + } + } + + private void checkExists(Path tempDir, String logFileName) + { + checkExists(tempDir, logFileName, false, false); + } + + private void checkExists(Path tempDir, String logFileName, boolean orphaned, boolean intact) + { + assertEquals(!orphaned, Files.exists(Paths.get(tempDir.toString(), CdcRawDirectorySpaceCleaner.CDC_DIR_NAME, logFileName))); + assertEquals(!intact, Files.exists(Paths.get(tempDir.toString(), CdcRawDirectorySpaceCleaner.CDC_DIR_NAME, CdcUtil.getIdxFileName(logFileName)))); + } + + private void checkNotExists(Path tempDir, String logFileName) + { + assertFalse(Files.exists(Paths.get(tempDir.toString(), CdcRawDirectorySpaceCleaner.CDC_DIR_NAME, logFileName))); + assertFalse(Files.exists(Paths.get(tempDir.toString(), CdcRawDirectorySpaceCleaner.CDC_DIR_NAME, CdcUtil.getIdxFileName(logFileName)))); + } +} diff --git a/server/src/test/java/org/apache/cassandra/sidecar/utils/CdcUtilTest.java b/server/src/test/java/org/apache/cassandra/sidecar/utils/CdcUtilTest.java new file mode 100644 index 000000000..4645b9306 --- /dev/null +++ b/server/src/test/java/org/apache/cassandra/sidecar/utils/CdcUtilTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.sidecar.utils; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Unit tests for the {@link CdcUtil} + */ +public class CdcUtilTest +{ + @Test + public void testMatcher() + { + assertTrue(CdcUtil.SEGMENT_PATTERN.matcher("CommitLog-7-1689642717704.log").matches()); + assertTrue(CdcUtil.SEGMENT_PATTERN.matcher("CommitLog-12345.log").matches()); + assertTrue(CdcUtil.SEGMENT_PATTERN.matcher("CommitLog-2-1340512736956320000.log").matches()); + assertTrue(CdcUtil.SEGMENT_PATTERN.matcher("CommitLog-2-1340512736959990000.log").matches()); + assertTrue(CdcUtil.isValid("CommitLog-7-1689642717704.log")); + assertTrue(CdcUtil.isValid("CommitLog-12345.log")); + assertTrue(CdcUtil.isValid("CommitLog-2-1340512736956320000.log")); + assertTrue(CdcUtil.isValid("CommitLog-2-1340512736959990000.log")); + assertTrue(CdcUtil.isLogFile("CommitLog-7-1689642717704.log")); + assertTrue(CdcUtil.isLogFile("CommitLog-12345.log")); + assertTrue(CdcUtil.isLogFile("CommitLog-2-1340512736956320000.log")); + assertTrue(CdcUtil.isLogFile("CommitLog-2-1340512736959990000.log")); + assertTrue(CdcUtil.IDX_FILE_PATTERN.matcher("CommitLog-7-1689642717704_cdc.idx").matches()); + assertTrue(CdcUtil.IDX_FILE_PATTERN.matcher("CommitLog-12345_cdc.idx").matches()); + assertTrue(CdcUtil.IDX_FILE_PATTERN.matcher("CommitLog-2-1240512736956320000_cdc.idx").matches()); + assertTrue(CdcUtil.IDX_FILE_PATTERN.matcher("CommitLog-2-1340512736956320000_cdc.idx").matches()); + + assertFalse(CdcUtil.isValid("CommitLog-abc.log")); + assertFalse(CdcUtil.isValid("abc-7-1689642717704.log")); + assertFalse(CdcUtil.isValid("abc-1689642717704.log")); + assertFalse(CdcUtil.isLogFile("CommitLog-abc.log")); + assertFalse(CdcUtil.isLogFile("abc-7-1689642717704.log")); + assertFalse(CdcUtil.isLogFile("abc-1689642717704.log")); + assertFalse(CdcUtil.isLogFile("CommitLog-7-1689642717704")); + assertFalse(CdcUtil.isLogFile("CommitLog-12345")); + assertFalse(CdcUtil.isLogFile("CommitLog-2-1340512736956320000")); + assertFalse(CdcUtil.isLogFile("CommitLog-2-1340512736959990000")); + } + + @Test + public void testExtractSegmentIdMatcher() + { + assertEquals(12345L, CdcUtil.parseSegmentId("CommitLog-12345.log")); + assertEquals(1689642717704L, CdcUtil.parseSegmentId("CommitLog-7-1689642717704.log")); + assertEquals(1340512736956320000L, CdcUtil.parseSegmentId("CommitLog-2-1340512736956320000.log")); + assertEquals(1340512736959990000L, CdcUtil.parseSegmentId("CommitLog-2-1340512736959990000.log")); + assertEquals(12345L, CdcUtil.parseSegmentId("CommitLog-6-12345.log")); + assertEquals(1646094405659L, CdcUtil.parseSegmentId("CommitLog-7-1646094405659.log")); + assertEquals(1646094405659L, CdcUtil.parseSegmentId("CommitLog-1646094405659.log")); + } + + @Test + public void testIdxToLogFileName() + { + assertEquals("CommitLog-7-1689642717704.log", CdcUtil.idxToLogFileName("CommitLog-7-1689642717704_cdc.idx")); + assertEquals("CommitLog-12345.log", CdcUtil.idxToLogFileName("CommitLog-12345_cdc.idx")); + assertEquals("CommitLog-2-1240512736956320000.log", CdcUtil.idxToLogFileName("CommitLog-2-1240512736956320000_cdc.idx")); + assertEquals("CommitLog-2-1340512736956320000.log", CdcUtil.idxToLogFileName("CommitLog-2-1340512736956320000_cdc.idx")); + } +} diff --git a/server/src/test/java/org/apache/cassandra/sidecar/utils/FileUtilsTest.java b/server/src/test/java/org/apache/cassandra/sidecar/utils/FileUtilsTest.java index eba7be166..f545d5299 100644 --- a/server/src/test/java/org/apache/cassandra/sidecar/utils/FileUtilsTest.java +++ b/server/src/test/java/org/apache/cassandra/sidecar/utils/FileUtilsTest.java @@ -18,8 +18,11 @@ package org.apache.cassandra.sidecar.utils; +import java.util.Objects; + import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.assertj.core.api.Assertions.assertThat; /** @@ -37,4 +40,25 @@ void testMaybeResolveHomeDirectory() assertThat(FileUtils.maybeResolveHomeDirectory("~/.ccm")).isEqualTo(System.getProperty("user.home") + "/.ccm"); assertThat(FileUtils.maybeResolveHomeDirectory("/dev/null")).isEqualTo("/dev/null"); } + + @Test + public void testStorageStringPatterns() + { + assertEquals(1, FileUtils.storageUnitToBytes("B")); + assertEquals(1024, FileUtils.storageUnitToBytes("KiB")); + assertEquals(1048576, FileUtils.storageUnitToBytes("MiB")); + assertEquals(1073741824, FileUtils.storageUnitToBytes("GiB")); + + assertEquals(1048576L, FileUtils.mbStringToBytes("1")); + assertEquals(524288000L, FileUtils.mbStringToBytes("500")); + + assertEquals(1L, Objects.requireNonNull(FileUtils.storageStringToBytes("1")).longValue()); + assertEquals(1L, Objects.requireNonNull(FileUtils.storageStringToBytes("1B")).longValue()); + assertEquals(500L, Objects.requireNonNull(FileUtils.storageStringToBytes("500B")).longValue()); + assertEquals(1024, Objects.requireNonNull(FileUtils.storageStringToBytes("1KiB")).longValue()); + assertEquals(1048576, Objects.requireNonNull(FileUtils.storageStringToBytes("1024KiB")).longValue()); + assertEquals(1048576, Objects.requireNonNull(FileUtils.storageStringToBytes("1MiB")).longValue()); + assertEquals(4294967296L, Objects.requireNonNull(FileUtils.storageStringToBytes("4096MiB")).longValue()); + assertEquals(536870912000L, Objects.requireNonNull(FileUtils.storageStringToBytes("500GiB")).longValue()); + } }