Skip to content

Commit

Permalink
MET-5394 mcs prepared for on-line migration (new table added) (#370)
Browse files Browse the repository at this point in the history
* MET-5394 mcs prepared for on-line migration (new table added)

* MET-5394 two revision tags removed (acceptance,published)

* MET-5394 some sonar warnings removed for CassandraDataSetDAO.java
  • Loading branch information
pWoz authored Aug 22, 2023
1 parent 04c6760 commit 7325311
Show file tree
Hide file tree
Showing 2 changed files with 172 additions and 29 deletions.
26 changes: 23 additions & 3 deletions docker/cassandra/cqls/mcs_setup.cql
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ CREATE TABLE representation_versions (
PRIMARY KEY (cloud_id, schema_id, version_id)
);

CREATE TABLE data_set_assignments_by_revision_id_v2 (
CREATE TABLE data_set_assignments_by_revision_id_v1 (
provider_id varchar,
dataset_id varchar,
bucket_id timeuuid,
Expand All @@ -56,13 +56,33 @@ CREATE TABLE data_set_assignments_by_revision_id_v2 (
revision_timestamp timestamp,
representation_id varchar,
cloud_id varchar,
version_id timeuuid,
published boolean,
acceptance boolean,
mark_deleted boolean,
PRIMARY KEY ((provider_id, dataset_id, bucket_id), revision_provider_id, revision_name, revision_timestamp, representation_id, cloud_id, version_id)
PRIMARY KEY ((provider_id, dataset_id, bucket_id), revision_provider_id, revision_name, revision_timestamp, representation_id, cloud_id)
)WITH comment='Retrieve cloud Ids based on a known provider_id, dataset_id, revision_id';

CREATE TABLE data_set_assignments_by_revision_id_v2
(
provider_id varchar,
dataset_id varchar,
bucket_id timeuuid,
revision_provider_id varchar,
revision_name varchar,
revision_timestamp timestamp,
representation_id varchar,
cloud_id varchar,
version_id timeuuid,
mark_deleted boolean,
PRIMARY KEY ((provider_id, dataset_id, bucket_id),
revision_provider_id,
revision_name,
revision_timestamp,
representation_id,
cloud_id,
version_id
) )WITH comment='Retrieve cloud Ids based on a known provider_id, dataset_id, revision_id';

CREATE TABLE data_set_assignments_by_revision_id_buckets (
object_id varchar,
bucket_id timeuuid,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,40 @@ public class CassandraDataSetDAO {
private PreparedStatement getDataSetStatement;
private PreparedStatement getDataSetsForRepresentationVersionStatement;
private PreparedStatement getOneDataSetForRepresentationStatement;
private PreparedStatement hasProvidedRepresentationNameStatement;
private PreparedStatement addDataSetsRevisionStatement;
/* Temporary added statement that will store rows in the additional table during migration of
data_set_assignments_by_revision_id_v1
to
data_set_assignments_by_revision_id_v2 */
private PreparedStatement addDataSetsRevisionStatementMigrationTable;
private PreparedStatement getDataSetsRevisionStatement;
private PreparedStatement removeDataSetsRevisionStatement;

/* Temporary added statement that will remove rows from the additional table during migration of
data_set_assignments_by_revision_id_v1
to
data_set_assignments_by_revision_id_v2 */
private PreparedStatement removeDataSetsRevisionStatementMigrationTable;

/**
* Constructor for the class
*
* @param connectionProvider connection provider for DB
*/
public CassandraDataSetDAO(CassandraConnectionProvider connectionProvider) {
this.connectionProvider = connectionProvider;
}

/**
* Reads assignments for the given dataset;
*
* @param providerDataSetId concatenation of providerId and datasetId
* @param bucketId bucket identifier
* @param state paging state
* @param limit limit of results
*
* @return slice of the results
*/
public ResultSlice<DatasetAssignment> getDataSetAssignments(String providerDataSetId, String bucketId, PagingState state,
int limit) {
List<DatasetAssignment> assignments = new ArrayList<>();
Expand Down Expand Up @@ -101,6 +126,15 @@ public ResultSlice<DatasetAssignment> getDataSetAssignments(String providerDataS

}

/**
* Inserts new row to the <b><i>data_set_assignments_by_representations</i></b> table.
*
* @param providerDataSetId concatenated provider and datasetId
* @param schema representation version
* @param recordId cloud identifier
* @param versionId representation version
* @param timestamp time of assignment
*/
public void addAssignmentByRepresentationVersion(String providerDataSetId, String schema, String recordId, UUID versionId,
Date timestamp) {
BoundStatement boundStatement = addAssignmentByRepresentationStatement.bind(recordId, schema, versionId, providerDataSetId,
Expand All @@ -109,6 +143,17 @@ public void addAssignmentByRepresentationVersion(String providerDataSetId, Strin
QueryTracer.logConsistencyLevel(boundStatement, rs);
}

/**
* Inserts new row to the <b><i>data_set_assignments_by_data_set</i></b> table.
*
* @param providerId dataset provider
* @param dataSetId dataset name
* @param bucketId bucket identifier
* @param recordId cloud identifier
* @param schema representation name
* @param now time of assignment
* @param versionId representation version
*/
public void addAssignment(String providerId, String dataSetId, String bucketId, String recordId, String schema, Date now,
UUID versionId) {
String providerDataSetId = createProviderDataSetId(providerId, dataSetId);
Expand All @@ -125,6 +170,9 @@ public void addAssignment(String providerId, String dataSetId, String bucketId,
* @param schemaId representation schema
* @param version representation version (might be null)
* @return list of data set ids
*
* @throws NoHostAvailableException in case of Cassandra issues
* @throws QueryExecutionException in case of Cassandra issues
*/
public Collection<CompoundDataSetId> getDataSetAssignments(String cloudId, String schemaId, String version)
throws NoHostAvailableException, QueryExecutionException {
Expand All @@ -148,6 +196,9 @@ public Collection<CompoundDataSetId> getDataSetAssignments(String cloudId, Strin
* @param cloudId record id
* @param schemaId representation schema
* @return one dataset
*
* @throws NoHostAvailableException in case of Cassandra issues
* @throws QueryExecutionException in case of Cassandra issues
*/
public Optional<CompoundDataSetId> getOneDataSetFor(String cloudId, String schemaId)
throws NoHostAvailableException, QueryExecutionException {
Expand All @@ -172,6 +223,9 @@ public Optional<CompoundDataSetId> getOneDataSetFor(String cloudId, String schem
* @param providerId data set owner's (provider's) id
* @param dataSetId data set id
* @return data set
*
* @throws NoHostAvailableException in case of Cassandra issues
* @throws QueryExecutionException in case of Cassandra issues
*/
public DataSet getDataSet(String providerId, String dataSetId) throws NoHostAvailableException, QueryExecutionException {

Expand All @@ -189,6 +243,17 @@ public DataSet getDataSet(String providerId, String dataSetId) throws NoHostAvai
return ds;
}

/**
* Deletes row from <b><i>data_set_assignments_by_data_set</i></b> table
*
* @param recordId cloud identifier
* @param schema representation name
* @param versionId representation version
* @param providerDataSetId concatenated provider and datasetId
* @param bucket bucket identifier
* @return if operation was applied
*
*/
public boolean removeDatasetAssignment(String recordId, String schema, String versionId, String providerDataSetId,
Bucket bucket) {
BoundStatement boundStatement = removeAssignmentStatement.bind(
Expand All @@ -198,6 +263,13 @@ public boolean removeDatasetAssignment(String recordId, String schema, String ve
return rs.wasApplied();
}

/**
* Deletes row from <b><i>data_set_assignments_by_representations</i></b> table
* @param providerDataSetId concatenated providerId and datasetId
* @param cloudId cloud identifier
* @param schema representation name
* @param versionId representation version
*/
public void removeAssignmentByRepresentation(String providerDataSetId, String cloudId, String schema, String versionId) {
BoundStatement boundStatement = removeAssignmentByRepresentationsStatement.bind(
cloudId, schema, UUID.fromString(versionId), providerDataSetId);
Expand All @@ -214,6 +286,9 @@ public void removeAssignmentByRepresentation(String providerDataSetId, String cl
* @param description description of data set.
* @param creationTime creation date
* @return created (or updated) data set.
*
* @throws NoHostAvailableException in case of Cassandra issues
* @throws QueryExecutionException in case of Cassandra issues
*/
public DataSet createDataSet(String providerId, String dataSetId, String description, Date creationTime)
throws NoHostAvailableException, QueryExecutionException {
Expand All @@ -236,6 +311,9 @@ public DataSet createDataSet(String providerId, String dataSetId, String descrip
* Might be null.
* @param limit max size of returned data set list.
* @return list of data sets.
*
* @throws NoHostAvailableException in case of Cassandra issues
* @throws QueryExecutionException in case of Cassandra issues
*/
public List<DataSet> getDataSets(String providerId, String thresholdDatasetId, int limit)
throws NoHostAvailableException, QueryExecutionException {
Expand Down Expand Up @@ -263,41 +341,71 @@ public List<DataSet> getDataSets(String providerId, String thresholdDatasetId, i
*
* @param providerId data set owner's (provider's) id
* @param dataSetId data set id
*
* @throws NoHostAvailableException in case of Cassandra issues
* @throws QueryExecutionException in case of Cassandra issues
*/
public void deleteDataSet(String providerId, String dataSetId) throws NoHostAvailableException, QueryExecutionException {
BoundStatement boundStatement = deleteDataSetStatement.bind(providerId, dataSetId);
connectionProvider.getSession().execute(boundStatement);
}

public boolean datasetBucketHasAnyAssignment(String representationName, String providerDatasetId, Bucket bucket) {
BoundStatement boundStatement = hasProvidedRepresentationNameStatement.bind(
providerDatasetId, UUID.fromString(bucket.getBucketId()), representationName);

ResultSet rs = connectionProvider.getSession().execute(boundStatement);
QueryTracer.logConsistencyLevel(boundStatement, rs);
return rs.one() != null;
}

/**
* Addes row to the <b><i>data_set_assignments_by_revision_id_v1</i></b> table.
* @param providerId dataset provider
* @param datasetId dataset name
* @param bucketId bucket identifier
* @param revision revision definition
* @param representationName representation name
* @param cloudId cloud identifier
* @param versionId representation version
*/
public void addDataSetsRevision(String providerId, String datasetId, String bucketId, Revision revision,
String representationName, String cloudId, String version_id) {
String representationName, String cloudId, String versionId) {
BoundStatement boundStatement = addDataSetsRevisionStatement.bind(
providerId, datasetId, UUID.fromString(bucketId), revision.getRevisionProviderId(),
revision.getRevisionName(), revision.getCreationTimeStamp(), representationName, cloudId,
UUID.fromString(version_id),
revision.isPublished(), revision.isAcceptance(), revision.isDeleted());

BoundStatement boundStatementForTempTable = addDataSetsRevisionStatementMigrationTable.bind(
providerId, datasetId, UUID.fromString(bucketId), revision.getRevisionProviderId(),
revision.getRevisionName(), revision.getCreationTimeStamp(), representationName, cloudId,
UUID.fromString(versionId), revision.isDeleted());

ResultSet rs = connectionProvider.getSession().execute(boundStatement);
ResultSet rsTest = connectionProvider.getSession().execute(boundStatementForTempTable);
QueryTracer.logConsistencyLevel(boundStatement, rs);
QueryTracer.logConsistencyLevel(boundStatement, rsTest);
}

/**
* Deletes row from <b><i>data_set_assignments_by_revision_id_v1</i></b> table.
* @param providerId dataset provider
* @param datasetId dataset name
* @param bucketId bucket identifier
* @param revision revision definition
* @param representationName representation name
* @param cloudId cloud identifier
* @param versionId representation version
*
* @return if operation was applied
*/
public boolean removeDataSetRevision(String providerId, String datasetId, String bucketId, Revision revision,
String representationName, String cloudId, String version_id) {
String representationName, String cloudId, String versionId) {
BoundStatement boundStatement = removeDataSetsRevisionStatement.bind(
providerId, datasetId, UUID.fromString(bucketId), revision.getRevisionProviderId(),
revision.getRevisionName(), revision.getCreationTimeStamp(), representationName, cloudId, UUID.fromString(version_id));
revision.getRevisionName(), revision.getCreationTimeStamp(), representationName, cloudId);

BoundStatement boundStatementForTempTable = removeDataSetsRevisionStatementMigrationTable.bind(
providerId, datasetId, UUID.fromString(bucketId), revision.getRevisionProviderId(),
revision.getRevisionName(), revision.getCreationTimeStamp(), representationName, cloudId, UUID.fromString(versionId));

ResultSet rs = connectionProvider.getSession().execute(boundStatement);
ResultSet rsTemp = connectionProvider.getSession().execute(boundStatementForTempTable);

QueryTracer.logConsistencyLevel(boundStatement, rs);
QueryTracer.logConsistencyLevel(boundStatement, rsTemp);

return rs.wasApplied();
}

Expand Down Expand Up @@ -410,24 +518,25 @@ private void prepareStatements() {
"WHERE cloud_id = ? AND schema_id = ? LIMIT 1;"
);

hasProvidedRepresentationNameStatement = connectionProvider.getSession().prepare(
"SELECT schema_id, cloud_id " +
"FROM data_set_assignments_by_data_set " +
"WHERE provider_dataset_id = ? AND bucket_id = ? AND schema_id = ? " +
"LIMIT 1;"
);

addDataSetsRevisionStatement = connectionProvider.getSession().prepare(
"INSERT " +
"INTO data_set_assignments_by_revision_id_v2 (provider_id, dataset_id, bucket_id, " +
"INTO data_set_assignments_by_revision_id_v1 (provider_id, dataset_id, bucket_id, " +
"revision_provider_id, revision_name, revision_timestamp, " +
"representation_id, cloud_id, version_id, published, acceptance, mark_deleted) " +
"VALUES (?,?,?,?,?,?,?,?,?,?,?,?);"
"representation_id, cloud_id, published, acceptance, mark_deleted) " +
"VALUES (?,?,?,?,?,?,?,?,?,?,?);"
);

addDataSetsRevisionStatementMigrationTable = connectionProvider.getSession().prepare(
"INSERT " +
"INTO data_set_assignments_by_revision_id_v2 (provider_id, dataset_id, bucket_id, " +
"revision_provider_id, revision_name, revision_timestamp, " +
"representation_id, cloud_id, version_id, mark_deleted) " +
"VALUES (?,?,?,?,?,?,?,?,?,?);"
);

removeDataSetsRevisionStatement = connectionProvider.getSession().prepare(
"DELETE " +
"FROM data_set_assignments_by_revision_id_v2 " +
"FROM data_set_assignments_by_revision_id_v1 " +
"WHERE provider_id = ? " +
"AND dataset_id = ? " +
"AND bucket_id = ? " +
Expand All @@ -436,10 +545,24 @@ private void prepareStatements() {
"AND revision_timestamp = ? " +
"AND representation_id = ? " +
"AND cloud_id = ? " +
"AND version_id = ? " +
"IF EXISTS;"
);

removeDataSetsRevisionStatementMigrationTable = connectionProvider.getSession().prepare(
"DELETE " +
"FROM data_set_assignments_by_revision_id_v2 " +
"WHERE provider_id = ? " +
"AND dataset_id = ? " +
"AND bucket_id = ? " +
"AND revision_provider_id = ? " +
"AND revision_name = ? " +
"AND revision_timestamp = ? " +
"AND representation_id = ? " +
"AND cloud_id = ? " +
"AND version_id = ? " +
"IF EXISTS;"
);

getDataSetsRevisionStatement = connectionProvider.getSession().prepare(//
"SELECT cloud_id, version_id, published, acceptance, mark_deleted " +
"FROM data_set_assignments_by_revision_id_v2 " +
Expand Down

0 comments on commit 7325311

Please sign in to comment.