Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MET-5394 mcs prepared for on-line migration (new table added) #370

Merged
merged 3 commits into from
Aug 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 23 additions & 3 deletions docker/cassandra/cqls/mcs_setup.cql
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ CREATE TABLE representation_versions (
PRIMARY KEY (cloud_id, schema_id, version_id)
);

CREATE TABLE data_set_assignments_by_revision_id_v2 (
CREATE TABLE data_set_assignments_by_revision_id_v1 (
provider_id varchar,
dataset_id varchar,
bucket_id timeuuid,
Expand All @@ -56,13 +56,33 @@ CREATE TABLE data_set_assignments_by_revision_id_v2 (
revision_timestamp timestamp,
representation_id varchar,
cloud_id varchar,
version_id timeuuid,
published boolean,
acceptance boolean,
mark_deleted boolean,
PRIMARY KEY ((provider_id, dataset_id, bucket_id), revision_provider_id, revision_name, revision_timestamp, representation_id, cloud_id, version_id)
PRIMARY KEY ((provider_id, dataset_id, bucket_id), revision_provider_id, revision_name, revision_timestamp, representation_id, cloud_id)
)WITH comment='Retrieve cloud Ids based on a known provider_id, dataset_id, revision_id';

CREATE TABLE data_set_assignments_by_revision_id_v2
(
provider_id varchar,
dataset_id varchar,
bucket_id timeuuid,
revision_provider_id varchar,
revision_name varchar,
revision_timestamp timestamp,
representation_id varchar,
cloud_id varchar,
version_id timeuuid,
mark_deleted boolean,
PRIMARY KEY ((provider_id, dataset_id, bucket_id),
revision_provider_id,
revision_name,
revision_timestamp,
representation_id,
cloud_id,
version_id
) )WITH comment='Retrieve cloud Ids based on a known provider_id, dataset_id, revision_id';

CREATE TABLE data_set_assignments_by_revision_id_buckets (
object_id varchar,
bucket_id timeuuid,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,40 @@ public class CassandraDataSetDAO {
private PreparedStatement getDataSetStatement;
private PreparedStatement getDataSetsForRepresentationVersionStatement;
private PreparedStatement getOneDataSetForRepresentationStatement;
private PreparedStatement hasProvidedRepresentationNameStatement;
private PreparedStatement addDataSetsRevisionStatement;
/* Temporary added statement that will store rows in the additional table during migration of
data_set_assignments_by_revision_id_v1
to
data_set_assignments_by_revision_id_v2 */
private PreparedStatement addDataSetsRevisionStatementMigrationTable;
private PreparedStatement getDataSetsRevisionStatement;
private PreparedStatement removeDataSetsRevisionStatement;

/* Temporary added statement that will remove rows from the additional table during migration of
data_set_assignments_by_revision_id_v1
to
data_set_assignments_by_revision_id_v2 */
private PreparedStatement removeDataSetsRevisionStatementMigrationTable;

/**
* Constructor for the class
*
* @param connectionProvider connection provider for DB
*/
public CassandraDataSetDAO(CassandraConnectionProvider connectionProvider) {
this.connectionProvider = connectionProvider;
}

/**
* Reads assignments for the given dataset;
*
* @param providerDataSetId concatenation of providerId and datasetId
* @param bucketId bucket identifier
* @param state paging state
* @param limit limit of results
*
* @return slice of the results
*/
public ResultSlice<DatasetAssignment> getDataSetAssignments(String providerDataSetId, String bucketId, PagingState state,
int limit) {
List<DatasetAssignment> assignments = new ArrayList<>();
Expand Down Expand Up @@ -101,6 +126,15 @@ public ResultSlice<DatasetAssignment> getDataSetAssignments(String providerDataS

}

/**
* Inserts new row to the <b><i>data_set_assignments_by_representations</i></b> table.
*
* @param providerDataSetId concatenated provider and datasetId
* @param schema representation version
* @param recordId cloud identifier
* @param versionId representation version
* @param timestamp time of assignment
*/
public void addAssignmentByRepresentationVersion(String providerDataSetId, String schema, String recordId, UUID versionId,
Date timestamp) {
BoundStatement boundStatement = addAssignmentByRepresentationStatement.bind(recordId, schema, versionId, providerDataSetId,
Expand All @@ -109,6 +143,17 @@ public void addAssignmentByRepresentationVersion(String providerDataSetId, Strin
QueryTracer.logConsistencyLevel(boundStatement, rs);
}

/**
* Inserts new row to the <b><i>data_set_assignments_by_data_set</i></b> table.
*
* @param providerId dataset provider
* @param dataSetId dataset name
* @param bucketId bucket identifier
* @param recordId cloud identifier
* @param schema representation name
* @param now time of assignment
* @param versionId representation version
*/
public void addAssignment(String providerId, String dataSetId, String bucketId, String recordId, String schema, Date now,
UUID versionId) {
String providerDataSetId = createProviderDataSetId(providerId, dataSetId);
Expand All @@ -125,6 +170,9 @@ public void addAssignment(String providerId, String dataSetId, String bucketId,
* @param schemaId representation schema
* @param version representation version (might be null)
* @return list of data set ids
*
* @throws NoHostAvailableException in case of Cassandra issues
* @throws QueryExecutionException in case of Cassandra issues
*/
public Collection<CompoundDataSetId> getDataSetAssignments(String cloudId, String schemaId, String version)
throws NoHostAvailableException, QueryExecutionException {
Expand All @@ -148,6 +196,9 @@ public Collection<CompoundDataSetId> getDataSetAssignments(String cloudId, Strin
* @param cloudId record id
* @param schemaId representation schema
* @return one dataset
*
* @throws NoHostAvailableException in case of Cassandra issues
* @throws QueryExecutionException in case of Cassandra issues
*/
public Optional<CompoundDataSetId> getOneDataSetFor(String cloudId, String schemaId)
throws NoHostAvailableException, QueryExecutionException {
Expand All @@ -172,6 +223,9 @@ public Optional<CompoundDataSetId> getOneDataSetFor(String cloudId, String schem
* @param providerId data set owner's (provider's) id
* @param dataSetId data set id
* @return data set
*
* @throws NoHostAvailableException in case of Cassandra issues
* @throws QueryExecutionException in case of Cassandra issues
*/
public DataSet getDataSet(String providerId, String dataSetId) throws NoHostAvailableException, QueryExecutionException {

Expand All @@ -189,6 +243,17 @@ public DataSet getDataSet(String providerId, String dataSetId) throws NoHostAvai
return ds;
}

/**
* Deletes row from <b><i>data_set_assignments_by_data_set</i></b> table
*
* @param recordId cloud identifier
* @param schema representation name
* @param versionId representation version
* @param providerDataSetId concatenated provider and datasetId
* @param bucket bucket identifier
* @return if operation was applied
*
*/
public boolean removeDatasetAssignment(String recordId, String schema, String versionId, String providerDataSetId,
Bucket bucket) {
BoundStatement boundStatement = removeAssignmentStatement.bind(
Expand All @@ -198,6 +263,13 @@ public boolean removeDatasetAssignment(String recordId, String schema, String ve
return rs.wasApplied();
}

/**
* Deletes row from <b><i>data_set_assignments_by_representations</i></b> table
* @param providerDataSetId concatenated providerId and datasetId
* @param cloudId cloud identifier
* @param schema representation name
* @param versionId representation version
*/
public void removeAssignmentByRepresentation(String providerDataSetId, String cloudId, String schema, String versionId) {
BoundStatement boundStatement = removeAssignmentByRepresentationsStatement.bind(
cloudId, schema, UUID.fromString(versionId), providerDataSetId);
Expand All @@ -214,6 +286,9 @@ public void removeAssignmentByRepresentation(String providerDataSetId, String cl
* @param description description of data set.
* @param creationTime creation date
* @return created (or updated) data set.
*
* @throws NoHostAvailableException in case of Cassandra issues
* @throws QueryExecutionException in case of Cassandra issues
*/
public DataSet createDataSet(String providerId, String dataSetId, String description, Date creationTime)
throws NoHostAvailableException, QueryExecutionException {
Expand All @@ -236,6 +311,9 @@ public DataSet createDataSet(String providerId, String dataSetId, String descrip
* Might be null.
* @param limit max size of returned data set list.
* @return list of data sets.
*
* @throws NoHostAvailableException in case of Cassandra issues
* @throws QueryExecutionException in case of Cassandra issues
*/
public List<DataSet> getDataSets(String providerId, String thresholdDatasetId, int limit)
throws NoHostAvailableException, QueryExecutionException {
Expand Down Expand Up @@ -263,41 +341,71 @@ public List<DataSet> getDataSets(String providerId, String thresholdDatasetId, i
*
* @param providerId data set owner's (provider's) id
* @param dataSetId data set id
*
* @throws NoHostAvailableException in case of Cassandra issues
* @throws QueryExecutionException in case of Cassandra issues
*/
public void deleteDataSet(String providerId, String dataSetId) throws NoHostAvailableException, QueryExecutionException {
BoundStatement boundStatement = deleteDataSetStatement.bind(providerId, dataSetId);
connectionProvider.getSession().execute(boundStatement);
}

public boolean datasetBucketHasAnyAssignment(String representationName, String providerDatasetId, Bucket bucket) {
BoundStatement boundStatement = hasProvidedRepresentationNameStatement.bind(
providerDatasetId, UUID.fromString(bucket.getBucketId()), representationName);

ResultSet rs = connectionProvider.getSession().execute(boundStatement);
QueryTracer.logConsistencyLevel(boundStatement, rs);
return rs.one() != null;
}

/**
* Addes row to the <b><i>data_set_assignments_by_revision_id_v1</i></b> table.
* @param providerId dataset provider
* @param datasetId dataset name
* @param bucketId bucket identifier
* @param revision revision definition
* @param representationName representation name
* @param cloudId cloud identifier
* @param versionId representation version
*/
public void addDataSetsRevision(String providerId, String datasetId, String bucketId, Revision revision,
String representationName, String cloudId, String version_id) {
String representationName, String cloudId, String versionId) {
BoundStatement boundStatement = addDataSetsRevisionStatement.bind(
providerId, datasetId, UUID.fromString(bucketId), revision.getRevisionProviderId(),
revision.getRevisionName(), revision.getCreationTimeStamp(), representationName, cloudId,
UUID.fromString(version_id),
revision.isPublished(), revision.isAcceptance(), revision.isDeleted());

BoundStatement boundStatementForTempTable = addDataSetsRevisionStatementMigrationTable.bind(
providerId, datasetId, UUID.fromString(bucketId), revision.getRevisionProviderId(),
revision.getRevisionName(), revision.getCreationTimeStamp(), representationName, cloudId,
UUID.fromString(versionId), revision.isDeleted());

ResultSet rs = connectionProvider.getSession().execute(boundStatement);
ResultSet rsTest = connectionProvider.getSession().execute(boundStatementForTempTable);
QueryTracer.logConsistencyLevel(boundStatement, rs);
QueryTracer.logConsistencyLevel(boundStatement, rsTest);
}

/**
* Deletes row from <b><i>data_set_assignments_by_revision_id_v1</i></b> table.
* @param providerId dataset provider
* @param datasetId dataset name
* @param bucketId bucket identifier
* @param revision revision definition
* @param representationName representation name
* @param cloudId cloud identifier
* @param versionId representation version
*
* @return if operation was applied
*/
public boolean removeDataSetRevision(String providerId, String datasetId, String bucketId, Revision revision,
String representationName, String cloudId, String version_id) {
String representationName, String cloudId, String versionId) {
BoundStatement boundStatement = removeDataSetsRevisionStatement.bind(
providerId, datasetId, UUID.fromString(bucketId), revision.getRevisionProviderId(),
revision.getRevisionName(), revision.getCreationTimeStamp(), representationName, cloudId, UUID.fromString(version_id));
revision.getRevisionName(), revision.getCreationTimeStamp(), representationName, cloudId);

BoundStatement boundStatementForTempTable = removeDataSetsRevisionStatementMigrationTable.bind(
providerId, datasetId, UUID.fromString(bucketId), revision.getRevisionProviderId(),
revision.getRevisionName(), revision.getCreationTimeStamp(), representationName, cloudId, UUID.fromString(versionId));

ResultSet rs = connectionProvider.getSession().execute(boundStatement);
ResultSet rsTemp = connectionProvider.getSession().execute(boundStatementForTempTable);

QueryTracer.logConsistencyLevel(boundStatement, rs);
QueryTracer.logConsistencyLevel(boundStatement, rsTemp);

return rs.wasApplied();
}

Expand Down Expand Up @@ -410,24 +518,25 @@ private void prepareStatements() {
"WHERE cloud_id = ? AND schema_id = ? LIMIT 1;"
);

hasProvidedRepresentationNameStatement = connectionProvider.getSession().prepare(
"SELECT schema_id, cloud_id " +
"FROM data_set_assignments_by_data_set " +
"WHERE provider_dataset_id = ? AND bucket_id = ? AND schema_id = ? " +
"LIMIT 1;"
);

addDataSetsRevisionStatement = connectionProvider.getSession().prepare(
"INSERT " +
"INTO data_set_assignments_by_revision_id_v2 (provider_id, dataset_id, bucket_id, " +
"INTO data_set_assignments_by_revision_id_v1 (provider_id, dataset_id, bucket_id, " +
"revision_provider_id, revision_name, revision_timestamp, " +
"representation_id, cloud_id, version_id, published, acceptance, mark_deleted) " +
"VALUES (?,?,?,?,?,?,?,?,?,?,?,?);"
"representation_id, cloud_id, published, acceptance, mark_deleted) " +
"VALUES (?,?,?,?,?,?,?,?,?,?,?);"
);

addDataSetsRevisionStatementMigrationTable = connectionProvider.getSession().prepare(
"INSERT " +
"INTO data_set_assignments_by_revision_id_v2 (provider_id, dataset_id, bucket_id, " +
"revision_provider_id, revision_name, revision_timestamp, " +
"representation_id, cloud_id, version_id, mark_deleted) " +
"VALUES (?,?,?,?,?,?,?,?,?,?);"
);

removeDataSetsRevisionStatement = connectionProvider.getSession().prepare(
"DELETE " +
"FROM data_set_assignments_by_revision_id_v2 " +
"FROM data_set_assignments_by_revision_id_v1 " +
"WHERE provider_id = ? " +
"AND dataset_id = ? " +
"AND bucket_id = ? " +
Expand All @@ -436,10 +545,24 @@ private void prepareStatements() {
"AND revision_timestamp = ? " +
"AND representation_id = ? " +
"AND cloud_id = ? " +
"AND version_id = ? " +
"IF EXISTS;"
);

removeDataSetsRevisionStatementMigrationTable = connectionProvider.getSession().prepare(
"DELETE " +
"FROM data_set_assignments_by_revision_id_v2 " +
"WHERE provider_id = ? " +
"AND dataset_id = ? " +
"AND bucket_id = ? " +
"AND revision_provider_id = ? " +
"AND revision_name = ? " +
"AND revision_timestamp = ? " +
"AND representation_id = ? " +
"AND cloud_id = ? " +
"AND version_id = ? " +
"IF EXISTS;"
);

getDataSetsRevisionStatement = connectionProvider.getSession().prepare(//
"SELECT cloud_id, version_id, published, acceptance, mark_deleted " +
"FROM data_set_assignments_by_revision_id_v2 " +
Expand Down