Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CASSSIDECAR-203: Created Endpoint that Triggers an Immediate Schema Report #198

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
1.0.0
-----
* Create Endpoint that Triggers an Immediate Schema Report (CASSSIDECAR-203)
* Adapt to cluster topology change for restore jobs (CASSSIDECAR-185)
* Fix PeriodicTaskExecutor double execution due to race from reschedule (CASSSIDECAR-210)
* Upgrade Netty to 4.1.118.Final and Vert.x to 4.5.13 Version (CASSSIDECAR-207)
Expand All @@ -8,7 +9,7 @@
* Sidecar schema initialization can be executed on multiple thread (CASSSIDECAR-200)
* Make sidecar operations resilient to down Cassandra nodes (CASSSIDECAR-201)
* Fix Cassandra instance not found error (CASSSIDECAR-192)
* Implemented Schema Reporter for Integration with DataHub (CASSSIDECAR-191)
* Implement Schema Reporter for Integration with DataHub (CASSSIDECAR-191)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not change the existing entries in the CHANGES.txt

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure.

Is there a specific reason you'd like me to keep this typo I myself accidentally introduced in there forever?

Copy link
Contributor

@yifan-c yifan-c Feb 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is mainly avoid amending the changes log. It is more important to have the git history of each line linked to the correct commit.
Btw, it is not really a typo. "Implemented something" described the change as clear.
If you look at the log, there are places where "Adds" and "Add", as well as "Adding" are used. There is no strict rule on the verb's form.

* Add sidecar endpoint to retrieve stream stats (CASSSIDECAR-180)
* Add sidecar endpoint to retrieve cassandra gossip health (CASSSIDECAR-173)
* Fix SidecarSchema stuck at initialization due to ClusterLeaseTask scheduling (CASSSIDECAR-189)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,9 @@ public final class ApiEndpointsV1
public static final String LIST_CDC_SEGMENTS_ROUTE = API_V1 + CDC_PATH + "/segments";
public static final String STREAM_CDC_SEGMENTS_ROUTE = LIST_CDC_SEGMENTS_ROUTE + "/" + SEGMENT_PATH_PARAM;

// Schema Reporting
private static final String REPORT_SCHEMA = "/report-schema";
public static final String REPORT_SCHEMA_ROUTE = API_V1 + REPORT_SCHEMA;

public static final String CONNECTED_CLIENT_STATS_ROUTE = API_V1 + CASSANDRA + "/stats/connected-clients";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ public class BasicPermissions
public static final Permission READ_OPERATIONAL_JOB = new DomainAwarePermission("OPERATIONAL_JOB:READ", OPERATION_SCOPE);
public static final Permission DECOMMISSION_NODE = new DomainAwarePermission("NODE:DECOMMISSION", OPERATION_SCOPE);

// Permissions related to Schema Reporting
public static final Permission REPORT_SCHEMA = new DomainAwarePermission("SCHEMA:REPORT", CLUSTER_SCOPE);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not think an extra permission is required.
The permission needed in order to publish to DataHub is SCHEMA:READ, which already exists. cc: @sarankk

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The permission is not for someone to read the schema, but for someone to trigger the schema report on demand. So I think the permission is different. Would love to hear input from @sarankk

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sentiment here is to have restraint in adding new verbs. Ideally, it should be a fixed set of verb to avoid operational pain.
The reason that READ should work here is that the reporter is reading the cassandra schema. When it publishes (i.e. sends requests to DataHub), the authorization should be enforced by the server (DataHub), not the client (Sidecar).

Copy link
Contributor

@sarankk sarankk Feb 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel having 2 different permissions is better in general, with 1 granting read permission automatically grants them report permission. But in this particular case, since we already have a periodic task to report schema, irrespective of this endpoint, we can use the enable flag for schema reporting to control whether we want to allow reporting or not without adding a separate permission for reporting control?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was planning to get to this tomorrow, but see there's confusion about this one.

Changing permissions to the existing SCHEMA:READ will be logically equivalent to the following statement:

"Everyone who is allowed to see cluster schema is also allowed to perform DoS attacks on Sidecar."

Is that actually true?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need a separate permission to allow a user trigger a schema reporting. We should not conflate the SCHEMA:READ permission with the ability to report schemas

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's also think about the future. We do not want to go down on the route of introducing new verbs for new actions. And this is triggering my concern on it.

I am fine with letting PUBLISH pass. But we should really think about it and be mindful with introducing new verbs.


// cassandra cluster related permissions
public static final Permission READ_SCHEMA = new DomainAwarePermission("SCHEMA:READ", CLUSTER_SCOPE);
public static final Permission READ_SCHEMA_KEYSPACE_SCOPED = new DomainAwarePermission("SCHEMA:READ", KEYSPACE_SCOPE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,13 +113,23 @@ protected SchemaReporter(@NotNull IdentifiersProvider identifiersProvider,
/**
* Public method for converting and reporting the Cassandra schema
*
* @param cluster a {@link Cluster} to extract Cassandra schema from
* @param cluster the {@link Cluster} to extract Cassandra schema from
*/
public void process(@NotNull Cluster cluster)
{
process(cluster.getMetadata());
}

/**
* Public method for converting and reporting the Cassandra schema
*
* @param metadata the {@link Metadata} to extract Cassandra schema from
*/
public void process(@NotNull Metadata metadata)
{
try (Emitter emitter = emitterFactory.emitter())
{
stream(cluster.getMetadata())
stream(metadata)
.forEach(ThrowableUtils.consumer(emitter::emit));
}
catch (Exception exception)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.sidecar.routes;

import java.util.Collections;
import java.util.Set;

import com.google.inject.Inject;
import com.google.inject.Singleton;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.net.SocketAddress;
import io.vertx.ext.auth.authorization.Authorization;
import io.vertx.ext.web.RoutingContext;
import org.apache.cassandra.sidecar.acl.authorization.BasicPermissions;
import org.apache.cassandra.sidecar.concurrent.ExecutorPools;
import org.apache.cassandra.sidecar.datahub.SchemaReporter;
import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

/**
* An implementation of {@link AbstractHandler} used to trigger an immediate,
* synchronous conversion and report of the current schema
*/
@Singleton
public class ReportSchemaHandler extends AbstractHandler<Void> implements AccessProtected
{
@NotNull
private final SchemaReporter schemaReporter;

/**
* Constructs a new instance of {@link ReportSchemaHandler} using the provided instances
* of {@link InstanceMetadataFetcher}, {@link ExecutorPools}, and {@link SchemaReporter}
*
* @param metadata the metadata fetcher
* @param executor executor pools for blocking executions
* @param reporter executor pools for blocking executions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

copy-paste error

*/
@Inject
public ReportSchemaHandler(@NotNull InstanceMetadataFetcher metadata,
@NotNull ExecutorPools executor,
@NotNull SchemaReporter reporter)
{
super(metadata, executor, null);

schemaReporter = reporter;
}

/**
* {@inheritDoc}
*/
@Override
@NotNull
public Set<Authorization> requiredAuthorizations()
{
return Collections.singleton(BasicPermissions.REPORT_SCHEMA.toAuthorization());
}

/**
* {@inheritDoc}
*/
@Override
@Nullable
protected Void extractParamsOrThrow(@NotNull RoutingContext context)
{
return null;
}

/**
* {@inheritDoc}
*/
@Override
protected void handleInternal(@NotNull RoutingContext context,
@NotNull HttpServerRequest http,
@NotNull String host,
@NotNull SocketAddress address,
@Nullable Void request)
{
executorPools.service()
.runBlocking(() -> metadataFetcher.runOnFirstAvailableInstance(instance ->
schemaReporter.process(instance.delegate().metadata())))
.onSuccess(context::json)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The previous block does not return any value. What does the response json look like?

.onFailure(throwable -> processFailure(throwable, context, host, address, request));
}
Comment on lines +95 to +100
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
executorPools.service()
.runBlocking(() -> metadataFetcher.runOnFirstAvailableInstance(instance ->
schemaReporter.process(instance.delegate().metadata())))
.onSuccess(context::json)
.onFailure(throwable -> processFailure(throwable, context, host, address, request));
}
Metadata metadata = metadataFetcher.callOnFirstAvailableInstance(instance -> instance.delegate().metadata());
executorPools.service()
.runBlocking(() -> {
schemaReporter.process(metadata);
})
.onSuccess(v -> context.json(OK_STATUS))
.onFailure(cause -> processFailure(cause, context, host, address, request));

}
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@
import org.apache.cassandra.sidecar.routes.ListOperationalJobsHandler;
import org.apache.cassandra.sidecar.routes.NodeDecommissionHandler;
import org.apache.cassandra.sidecar.routes.OperationalJobHandler;
import org.apache.cassandra.sidecar.routes.ReportSchemaHandler;
import org.apache.cassandra.sidecar.routes.RingHandler;
import org.apache.cassandra.sidecar.routes.RoutingOrder;
import org.apache.cassandra.sidecar.routes.SchemaHandler;
Expand Down Expand Up @@ -341,6 +342,7 @@ public Router vertxRouter(Vertx vertx,
SSTableCleanupHandler ssTableCleanupHandler,
StreamCdcSegmentHandler streamCdcSegmentHandler,
ListCdcDirHandler listCdcDirHandler,
ReportSchemaHandler reportSchemaHandler,
RestoreRequestValidationHandler validateRestoreJobRequest,
DiskSpaceProtectionHandler diskSpaceProtection,
ValidateTableExistenceHandler validateTableExistence,
Expand Down Expand Up @@ -598,6 +600,14 @@ public Router vertxRouter(Vertx vertx,
.handler(streamCdcSegmentHandler)
.build();

// Schema Reporting
protectedRouteBuilderFactory.get()
.router(router)
.method(HttpMethod.GET)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The verb should not be GET. This should either be PUT or POST:

  • PUT is more suitable for idempotent operations
  • POST is more suitable for operations that are not idempotent

Also, this endpoint might be suitable for the operations framework. Something to consider

.endpoint(ApiEndpointsV1.REPORT_SCHEMA_ROUTE)
.handler(reportSchemaHandler)
.build();

return router;
}

Expand Down Expand Up @@ -902,7 +912,8 @@ public IdentifiersProvider identifiersProvider(@NotNull InstanceMetadataFetcher
@NotNull
protected String initialize()
{
return fetcher.callOnFirstAvailableInstance(i -> i.delegate().storageOperations().clusterName());
return fetcher.callOnFirstAvailableInstance(instance ->
instance.delegate().storageOperations().clusterName());
Comment on lines -905 to +916
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please revert the unrelated change.

}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
package org.apache.cassandra.sidecar.utils;

import java.util.List;
import java.util.function.Consumer;
import java.util.function.Function;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -97,13 +97,29 @@ public CassandraAdapterDelegate delegate(@NotNull String host) throws NoSuchCass
}

/**
* Iterate through the local instances and call the function on the first available instance, i.e. no CassandraUnavailableException
* or OperationUnavailableException is thrown for the operations
* Iterate through the local instances and run the {@link Consumer} on the first available one,
* so no {@link CassandraUnavailableException} or {@link OperationUnavailableException} is thrown for the operations
*
* @param consumer a {@link Consumer} that processes {@link InstanceMetadata} and returns no result
* @throws CassandraUnavailableException if all local instances were exhausted
*/
public void runOnFirstAvailableInstance(Consumer<InstanceMetadata> consumer) throws CassandraUnavailableException
{
callOnFirstAvailableInstance(metadata ->
{
consumer.accept(metadata);
return null;
});
}
Comment on lines +100 to +113
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you remove this method? It is unnecessary. It does not retrieve anything from instance metadata fetcher, who is supposed to "fetch something". I have suggested a different implementation in the new handler w/o using this method.


/**
* Iterate through the local instances and call the {@link Function} on the first available one,
* so no {@link CassandraUnavailableException} or {@link OperationUnavailableException} is thrown for the operations
*
* @param function function applies to {@link InstanceMetadata}
* @return function eval result. Null can be returned when all local instances are exhausted
* @param <T> type of the result
* @throws CassandraUnavailableException when all local instances are exhausted.
* @param function a {@link Function} that maps {@link InstanceMetadata} to {@link T}
* @return evaluation result of the {@code function}; can be {@code null} if all local instances were exhausted
* @throws CassandraUnavailableException if all local instances were exhausted
*/
@NotNull
public <T> T callOnFirstAvailableInstance(Function<InstanceMetadata, T> function) throws CassandraUnavailableException
Expand Down
Loading