Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Met 1775 fix for data set assignments by revision id table #219

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions docker/cassandra/cqls/mcs_setup.cql
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,10 @@ CREATE TABLE representation_versions (
PRIMARY KEY (cloud_id, schema_id, version_id)
);

CREATE TABLE data_set_assignments_by_revision_id (
CREATE TABLE data_set_assignments_by_revision_id_v1 (
provider_id varchar,
dataset_id varchar,
bucket_id timeuuid,
revision_provider_id varchar,
revision_name varchar,
revision_timestamp timestamp,
Expand All @@ -58,9 +59,16 @@ CREATE TABLE data_set_assignments_by_revision_id (
published boolean,
acceptance boolean,
mark_deleted boolean,
PRIMARY KEY ((provider_id, dataset_id), revision_provider_id, revision_name, revision_timestamp, representation_id, cloud_id)
PRIMARY KEY ((provider_id, dataset_id, bucket_id), revision_provider_id, revision_name, revision_timestamp, representation_id, cloud_id)
)WITH comment='Retrieve cloud Ids based on a known provider_id, dataset_id, revision_id';

CREATE TABLE data_set_assignments_by_revision_id_buckets (
object_id varchar,
bucket_id timeuuid,
rows_count counter,
PRIMARY KEY (object_id, bucket_id)
) WITH comment='Keep track of number of rows in a bucket for provider and dataset id assignments.';

CREATE TABLE data_set_representation_names(
provider_id varchar,
dataset_id varchar,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,12 @@ public class CassandraDataSetDAO {
private static final int MAX_PROVIDER_DATASET_BUCKET_COUNT = 210000;

private static final int MAX_DATASET_ASSIGNMENTS_BUCKET_COUNT = 100000;
private static final int MAX_DATASET_ASSIGNMENTS_BY_REVISION_ID_BUCKET_COUNT = 250000;

private static final String DATA_SET_ASSIGNMENTS_BY_DATA_SET_BUCKETS = "data_set_assignments_by_data_set_buckets";

private static final String DATA_SET_ASSIGNMENTS_BY_REVISION_ID_BUCKETS = "data_set_assignments_by_revision_id_buckets";

@Autowired
@Qualifier("dbService")
private CassandraConnectionProvider connectionProvider;
Expand Down Expand Up @@ -260,25 +263,25 @@ private void prepareStatements() {
.getSession()
.prepare( //
"INSERT INTO " //
+ "data_set_assignments_by_revision_id (provider_id, dataset_id, revision_provider_id, revision_name, revision_timestamp, representation_id, cloud_id, published, acceptance, mark_deleted) " //
+ "VALUES (?,?,?,?,?,?,?,?,?,?);");
+ "data_set_assignments_by_revision_id_v1 (provider_id, dataset_id, bucket_id, revision_provider_id, revision_name, revision_timestamp, representation_id, cloud_id, published, acceptance, mark_deleted) " //
+ "VALUES (?,?,?,?,?,?,?,?,?,?,?);");
addDataSetsRevision.setConsistencyLevel(connectionProvider
.getConsistencyLevel());

removeDataSetsRevision
= connectionProvider.getSession().prepare(//
"DELETE "//
+ "FROM data_set_assignments_by_revision_id "//
+ "WHERE provider_id = ? AND dataset_id = ? AND revision_provider_id = ? AND revision_name = ? AND revision_timestamp = ? AND representation_id = ? " +
"AND cloud_id = ?;");
+ "FROM data_set_assignments_by_revision_id_v1 "//
+ "WHERE provider_id = ? AND dataset_id = ? AND bucket_id = ? AND revision_provider_id = ? AND revision_name = ? AND revision_timestamp = ? AND representation_id = ? " +
"AND cloud_id = ? IF EXISTS;");
removeDataSetsRevision
.setConsistencyLevel(connectionProvider.getConsistencyLevel());

getDataSetsRevision = connectionProvider.getSession().prepare(//
"SELECT "//
+ "cloud_id, published, acceptance, mark_deleted "//
+ "FROM data_set_assignments_by_revision_id "//
+ "WHERE provider_id = ? AND dataset_id = ? AND revision_provider_id = ? AND revision_name = ? AND revision_timestamp = ? AND representation_id = ? LIMIT ?;");
+ "FROM data_set_assignments_by_revision_id_v1 "//
+ "WHERE provider_id = ? AND dataset_id = ? AND bucket_id = ? AND revision_provider_id = ? AND revision_name = ? AND revision_timestamp = ? AND representation_id = ? LIMIT ?;");
getDataSetsRevision
.setConsistencyLevel(connectionProvider.getConsistencyLevel());

Expand Down Expand Up @@ -397,10 +400,10 @@ private void prepareStatements() {
* cloud id and schema of the representation, may also contain version (if a
* certain version is in a data set).
*
* @param providerId data set owner's (provider's) id
* @param dataSetId data set id
* @param nextToken next token containing information about paging state and bucket id
* @param limit maximum size of returned list
* @param providerId data set owner's (provider's) id
* @param dataSetId data set id
* @param nextToken next token containing information about paging state and bucket id
* @param limit maximum size of returned list
* @return
*/
public List<Properties> listDataSet(String providerId, String dataSetId, String nextToken, int limit)
Expand All @@ -425,7 +428,7 @@ public List<Properties> listDataSet(String providerId, String dataSetId, String
// first element is the paging state
state = getPagingState(parts[0]);
// second element is bucket id
bucket = getAssignmentBucketId(parts[1], state, providerDataSetId);
bucket = getAssignmentBucketId(DATA_SET_ASSIGNMENTS_BY_DATA_SET_BUCKETS, parts[1], state, providerDataSetId);
}

// if the bucket is null it means we reached the end of data
Expand Down Expand Up @@ -834,32 +837,77 @@ private CompoundDataSetId createCompoundDataSetId(String providerDataSetId) {
}

public void addDataSetsRevision(String providerId, String datasetId, Revision revision, String representationName, String cloudId) {
BoundStatement boundStatement = addDataSetsRevision.bind(providerId, datasetId, revision.getRevisionProviderId(), revision.getRevisionName(), revision.getCreationTimeStamp(), representationName, cloudId, revision.isPublished(), revision.isAcceptance(), revision.isDeleted());
//
String providerDataSetId = createProviderDataSetId(providerId, datasetId);
Bucket bucket = bucketsHandler.getCurrentBucket(DATA_SET_ASSIGNMENTS_BY_REVISION_ID_BUCKETS, providerDataSetId);
// when there is no bucket or bucket rows count is max we should add another bucket
if (bucket == null || bucket.getRowsCount() >= MAX_DATASET_ASSIGNMENTS_BY_REVISION_ID_BUCKET_COUNT) {
bucket = new Bucket(providerDataSetId, createBucket(), 0);
}
bucketsHandler.increaseBucketCount(DATA_SET_ASSIGNMENTS_BY_REVISION_ID_BUCKETS, bucket);
//
BoundStatement boundStatement = addDataSetsRevision.bind(providerId, datasetId, UUID.fromString(bucket.getBucketId()), revision.getRevisionProviderId(), revision.getRevisionName(), revision.getCreationTimeStamp(), representationName, cloudId, revision.isPublished(), revision.isAcceptance(), revision.isDeleted());
ResultSet rs = connectionProvider.getSession().execute(boundStatement);
QueryTracer.logConsistencyLevel(boundStatement, rs);
}

public void removeDataSetsRevision(String providerId, String datasetId, Revision revision, String
representationName, String cloudId) {
BoundStatement boundStatement = removeDataSetsRevision.bind(providerId, datasetId, revision.getRevisionProviderId(), revision.getRevisionName(), revision.getCreationTimeStamp(),
representationName, cloudId);
ResultSet rs = connectionProvider.getSession().execute(boundStatement);
QueryTracer.logConsistencyLevel(boundStatement, rs);
List<Bucket> availableBuckets = bucketsHandler.getAllBuckets(DATA_SET_ASSIGNMENTS_BY_REVISION_ID_BUCKETS,
createProviderDataSetId(providerId, datasetId));

for (Bucket bucket : availableBuckets) {
BoundStatement boundStatement = removeDataSetsRevision.bind(providerId, datasetId, UUID.fromString(bucket.getBucketId()), revision.getRevisionProviderId(), revision.getRevisionName(), revision.getCreationTimeStamp(),
representationName, cloudId);
ResultSet rs = connectionProvider.getSession().execute(boundStatement);
QueryTracer.logConsistencyLevel(boundStatement, rs);
if (rs.wasApplied()) {
bucketsHandler.decreaseBucketCount(DATA_SET_ASSIGNMENTS_BY_REVISION_ID_BUCKETS, bucket);
return;
}
}
}

public List<Properties> getDataSetsRevisions(String providerId, String dataSetId, String revisionProviderId, String revisionName, Date revisionTimestamp, String representationName, String nextToken, int limit) {
String providerDataSetId = createProviderDataSetId(providerId, dataSetId);
List<Properties> result = new ArrayList<>(limit);

Bucket bucket = null;
PagingState state;

if (nextToken == null) {
// there is no next token so do not set paging state, take the first bucket for provider's dataset
bucket = bucketsHandler.getNextBucket(DATA_SET_ASSIGNMENTS_BY_REVISION_ID_BUCKETS, providerDataSetId);
state = null;
} else {
// next token is set, parse it to retrieve paging state and bucket id (token is concatenation of paging state and bucket id using _ character
String[] parts = nextToken.split("_");
if (parts.length != 2) {
throw new IllegalArgumentException("nextToken format is wrong. nextToken = " + nextToken);
}

// first element is the paging state
state = getPagingState(parts[0]);
// second element is bucket id
bucket = getAssignmentBucketId(DATA_SET_ASSIGNMENTS_BY_REVISION_ID_BUCKETS, parts[1], state, providerDataSetId);
}

// if the bucket is null it means we reached the end of data
if (bucket == null) {
return result;
}

// bind parameters, set limit to max int value
BoundStatement boundStatement = getDataSetsRevision.bind(providerId, dataSetId, revisionProviderId, revisionName, revisionTimestamp, representationName, Integer.MAX_VALUE);
BoundStatement boundStatement = getDataSetsRevision.bind(providerId, dataSetId, UUID.fromString(bucket.getBucketId()), revisionProviderId, revisionName, revisionTimestamp, representationName, Integer.MAX_VALUE);
// limit page to "limit" number of results
boundStatement.setFetchSize(limit);
// when this is not a first page call set paging state in the statement
if (nextToken != null) {
boundStatement.setPagingState(PagingState.fromString(nextToken));
boundStatement.setPagingState(state);
}
// execute query
ResultSet rs = connectionProvider.getSession().execute(boundStatement);
PagingState ps = rs.getExecutionInfo().getPagingState();
QueryTracer.logConsistencyLevel(boundStatement, rs);

// get available results
Expand All @@ -874,13 +922,19 @@ public List<Properties> getDataSetsRevisions(String providerId, String dataSetId
result.add(properties);
}

if (result.size() == limit) {
PagingState pagingState = rs.getExecutionInfo().getPagingState();
// whole page has been retrieved so add paging state for the next call at the end of the results list
if (pagingState != null && !rs.isExhausted()) {
Properties properties = new Properties();
properties.put("nextSlice", pagingState.toString());
result.add(properties);
if (result.size() == limit && !rs.isExhausted()) {
// we reached the page limit, prepare the next slice string to be used for the next page
Properties properties = new Properties();
properties.put("nextSlice", ps.toString() + "_" + bucket.getBucketId());
result.add(properties);

} else {
// we reached the end of bucket but number of results is less than the page size - in this case if there are more buckets we should retrieve number of results that will feed the page
if (bucketsHandler.getNextBucket(DATA_SET_ASSIGNMENTS_BY_REVISION_ID_BUCKETS, providerDataSetId, bucket) != null) {
String nextSlice = "_" + bucket.getBucketId();
result.addAll(
getDataSetsRevisions(providerId, dataSetId, revisionProviderId, revisionName, revisionTimestamp,
representationName, nextSlice, limit - result.size()));
}
}

Expand Down Expand Up @@ -1046,19 +1100,20 @@ private String getNextSlice(PagingState pagingState, String bucketId, String pro
* Get bucket id from part of token considering paging state which was retrieved from the same token. This is used for data assignment table where
* provider id and dataset id are concatenated to one string
*
* @param tokenPart part of token containing bucket id
* @param state paging state from the same token as the bucket id
* @param bucketsTableName table name used for buckets
* @param tokenPart part of token containing bucket id
* @param state paging state from the same token as the bucket id
* @param providerDataSetId provider id and dataset id to retrieve next bucket id
* @return bucket id to be used for the query
*/
private Bucket getAssignmentBucketId(String tokenPart, PagingState state, String providerDataSetId) {
private Bucket getAssignmentBucketId(String bucketsTableName, String tokenPart, PagingState state, String providerDataSetId) {
if (tokenPart != null && !tokenPart.isEmpty()) {
// when the state passed in the next token is not null we have to use the same bucket id as the paging state is associated with the query having certain parameter values
if (state != null) {
return new Bucket(providerDataSetId, tokenPart, 0);
}
// the state part is empty which means we reached the end of the bucket passed in the next token, therefore we need to get the next bucket
return bucketsHandler.getNextBucket(DATA_SET_ASSIGNMENTS_BY_DATA_SET_BUCKETS, providerDataSetId, new Bucket(providerDataSetId, tokenPart, 0));
return bucketsHandler.getNextBucket(bucketsTableName, providerDataSetId, new Bucket(providerDataSetId, tokenPart, 0));
}
return null;
}
Expand Down Expand Up @@ -1137,8 +1192,7 @@ public void insertProviderDatasetRepresentationInfo(String dataSetId, String dat
// when there is no bucket or bucket rows count is max we should add another bucket
if (bucketCount == null || bucketCount.getRowsCount() >= MAX_PROVIDER_DATASET_BUCKET_COUNT) {
bucketId = createBucket();
}
else
} else
bucketId = bucketCount.getBucketId();
increaseBucketCount(dataSetProviderId, dataSetId, bucketId);

Expand Down Expand Up @@ -1229,7 +1283,7 @@ public void insertLatestProviderDatasetRepresentationInfo(String dataSetId, Stri
throws NoHostAvailableException, QueryExecutionException {

//
BoundStatement deleteStatement = deleteLatestProviderDatasetRepresentationInfo.bind(
BoundStatement deleteStatement = deleteLatestProviderDatasetRepresentationInfo.bind(
dataSetProviderId,
dataSetId,
schema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,10 @@ CREATE TABLE representation_revisions (
PRIMARY KEY ((cloud_id, representation_id), revision_provider_id, revision_name, revision_timestamp, version_id)
)WITH CLUSTERING ORDER BY (revision_provider_id ASC, revision_name ASC, revision_timestamp DESC, version_id ASC) AND comment='Retrieve version_id based on the rest known fields of the primary key';

CREATE TABLE data_set_assignments_by_revision_id (
CREATE TABLE data_set_assignments_by_revision_id_v1 (
provider_id varchar,
dataset_id varchar,
bucket_id timeuuid,
revision_provider_id varchar,
revision_name varchar,
revision_timestamp timestamp,
Expand All @@ -85,9 +86,17 @@ CREATE TABLE data_set_assignments_by_revision_id (
published boolean,
acceptance boolean,
mark_deleted boolean,
PRIMARY KEY ((provider_id, dataset_id), revision_provider_id, revision_name, revision_timestamp, representation_id, cloud_id)
PRIMARY KEY ((provider_id, dataset_id, bucket_id), revision_provider_id, revision_name, revision_timestamp, representation_id, cloud_id)
)WITH comment='Retrieve cloud Ids based on a known provider_id, dataset_id, revision_id';

CREATE TABLE data_set_assignments_by_revision_id_buckets (
object_id varchar,
bucket_id timeuuid,
rows_count counter,
PRIMARY KEY (object_id, bucket_id)
) WITH comment='Keep track of number of rows in a bucket for provider and dataset id assignments.';


CREATE TABLE datasets_buckets (
provider_id varchar,
dataset_id varchar,
Expand Down
12 changes: 2 additions & 10 deletions tools/ecloud-db-migrator/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@
<dependency>
<groupId>eu.europeana.cloud</groupId>
<artifactId>ecloud-common</artifactId>
<version>0.8-SNAPSHOT</version>
<version>1.2.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>eu.europeana.cloud</groupId>
<artifactId>ecloud-service-commons</artifactId>
<version>0.8-SNAPSHOT</version>
<version>1.2.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>junit</groupId>
Expand All @@ -55,14 +55,6 @@
<version>2.1.9.2</version>
<scope>test</scope>
</dependency>


<dependency>
<groupId>eu.europeana.cloud</groupId>
<artifactId>ecloud-service-commons</artifactId>
<version>0.8-SNAPSHOT</version>
</dependency>

</dependencies>
<build>
<plugins>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package migrations.service.mcs.V13;

import com.contrastsecurity.cassandra.migration.api.JavaMigration;
import com.datastax.driver.core.Session;

public class V13_1__create_table_data_set_assignments_by_revision_id_buckets implements JavaMigration {

@Override
public void migrate(Session session) throws Exception {

session.execute(
"CREATE TABLE data_set_assignments_by_revision_id_buckets (\n" +
" object_id varchar,\n" +
" bucket_id timeuuid,\n" +
" rows_count counter,\n" +
" PRIMARY KEY (object_id, bucket_id)\n" +
") WITH comment='Keep track of number of rows in a bucket for provider and dataset id assignments.';\n");
}
}
Loading