Skip to content

Commit

Permalink
Revert "Merge pull request #219 from europeana/MET-1775_Fix_for_data_…
Browse files Browse the repository at this point in the history
…set_assignments_by_revision_id_table"

This reverts commit d2b561f, reversing
changes made to ea19728.
  • Loading branch information
tarekkh committed Apr 2, 2019
1 parent b32c221 commit 76d0ab4
Show file tree
Hide file tree
Showing 9 changed files with 47 additions and 320 deletions.
12 changes: 2 additions & 10 deletions docker/cassandra/cqls/mcs_setup.cql
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,9 @@ CREATE TABLE representation_versions (
PRIMARY KEY (cloud_id, schema_id, version_id)
);

CREATE TABLE data_set_assignments_by_revision_id_v1 (
CREATE TABLE data_set_assignments_by_revision_id (
provider_id varchar,
dataset_id varchar,
bucket_id timeuuid,
revision_provider_id varchar,
revision_name varchar,
revision_timestamp timestamp,
Expand All @@ -59,16 +58,9 @@ CREATE TABLE data_set_assignments_by_revision_id_v1 (
published boolean,
acceptance boolean,
mark_deleted boolean,
PRIMARY KEY ((provider_id, dataset_id, bucket_id), revision_provider_id, revision_name, revision_timestamp, representation_id, cloud_id)
PRIMARY KEY ((provider_id, dataset_id), revision_provider_id, revision_name, revision_timestamp, representation_id, cloud_id)
)WITH comment='Retrieve cloud Ids based on a known provider_id, dataset_id, revision_id';

CREATE TABLE data_set_assignments_by_revision_id_buckets (
object_id varchar,
bucket_id timeuuid,
rows_count counter,
PRIMARY KEY (object_id, bucket_id)
) WITH comment='Keep track of number of rows in a bucket for provider and dataset id assignments.';

CREATE TABLE data_set_representation_names(
provider_id varchar,
dataset_id varchar,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,9 @@ public class CassandraDataSetDAO {
private static final int MAX_PROVIDER_DATASET_BUCKET_COUNT = 210000;

private static final int MAX_DATASET_ASSIGNMENTS_BUCKET_COUNT = 100000;
private static final int MAX_DATASET_ASSIGNMENTS_BY_REVISION_ID_BUCKET_COUNT = 250000;

private static final String DATA_SET_ASSIGNMENTS_BY_DATA_SET_BUCKETS = "data_set_assignments_by_data_set_buckets";

private static final String DATA_SET_ASSIGNMENTS_BY_REVISION_ID_BUCKETS = "data_set_assignments_by_revision_id_buckets";

@Autowired
@Qualifier("dbService")
private CassandraConnectionProvider connectionProvider;
Expand Down Expand Up @@ -263,25 +260,25 @@ private void prepareStatements() {
.getSession()
.prepare( //
"INSERT INTO " //
+ "data_set_assignments_by_revision_id_v1 (provider_id, dataset_id, bucket_id, revision_provider_id, revision_name, revision_timestamp, representation_id, cloud_id, published, acceptance, mark_deleted) " //
+ "VALUES (?,?,?,?,?,?,?,?,?,?,?);");
+ "data_set_assignments_by_revision_id (provider_id, dataset_id, revision_provider_id, revision_name, revision_timestamp, representation_id, cloud_id, published, acceptance, mark_deleted) " //
+ "VALUES (?,?,?,?,?,?,?,?,?,?);");
addDataSetsRevision.setConsistencyLevel(connectionProvider
.getConsistencyLevel());

removeDataSetsRevision
= connectionProvider.getSession().prepare(//
"DELETE "//
+ "FROM data_set_assignments_by_revision_id_v1 "//
+ "WHERE provider_id = ? AND dataset_id = ? AND bucket_id = ? AND revision_provider_id = ? AND revision_name = ? AND revision_timestamp = ? AND representation_id = ? " +
"AND cloud_id = ? IF EXISTS;");
+ "FROM data_set_assignments_by_revision_id "//
+ "WHERE provider_id = ? AND dataset_id = ? AND revision_provider_id = ? AND revision_name = ? AND revision_timestamp = ? AND representation_id = ? " +
"AND cloud_id = ?;");
removeDataSetsRevision
.setConsistencyLevel(connectionProvider.getConsistencyLevel());

getDataSetsRevision = connectionProvider.getSession().prepare(//
"SELECT "//
+ "cloud_id, published, acceptance, mark_deleted "//
+ "FROM data_set_assignments_by_revision_id_v1 "//
+ "WHERE provider_id = ? AND dataset_id = ? AND bucket_id = ? AND revision_provider_id = ? AND revision_name = ? AND revision_timestamp = ? AND representation_id = ? LIMIT ?;");
+ "FROM data_set_assignments_by_revision_id "//
+ "WHERE provider_id = ? AND dataset_id = ? AND revision_provider_id = ? AND revision_name = ? AND revision_timestamp = ? AND representation_id = ? LIMIT ?;");
getDataSetsRevision
.setConsistencyLevel(connectionProvider.getConsistencyLevel());

Expand Down Expand Up @@ -400,10 +397,10 @@ private void prepareStatements() {
* cloud id and schema of the representation, may also contain version (if a
* certain version is in a data set).
*
* @param providerId data set owner's (provider's) id
* @param dataSetId data set id
* @param nextToken next token containing information about paging state and bucket id
* @param limit maximum size of returned list
* @param providerId data set owner's (provider's) id
* @param dataSetId data set id
* @param nextToken next token containing information about paging state and bucket id
* @param limit maximum size of returned list
* @return
*/
public List<Properties> listDataSet(String providerId, String dataSetId, String nextToken, int limit)
Expand All @@ -428,7 +425,7 @@ public List<Properties> listDataSet(String providerId, String dataSetId, String
// first element is the paging state
state = getPagingState(parts[0]);
// second element is bucket id
bucket = getAssignmentBucketId(DATA_SET_ASSIGNMENTS_BY_DATA_SET_BUCKETS, parts[1], state, providerDataSetId);
bucket = getAssignmentBucketId(parts[1], state, providerDataSetId);
}

// if the bucket is null it means we reached the end of data
Expand Down Expand Up @@ -837,77 +834,32 @@ private CompoundDataSetId createCompoundDataSetId(String providerDataSetId) {
}

public void addDataSetsRevision(String providerId, String datasetId, Revision revision, String representationName, String cloudId) {
//
String providerDataSetId = createProviderDataSetId(providerId, datasetId);
Bucket bucket = bucketsHandler.getCurrentBucket(DATA_SET_ASSIGNMENTS_BY_REVISION_ID_BUCKETS, providerDataSetId);
// when there is no bucket or bucket rows count is max we should add another bucket
if (bucket == null || bucket.getRowsCount() >= MAX_DATASET_ASSIGNMENTS_BY_REVISION_ID_BUCKET_COUNT) {
bucket = new Bucket(providerDataSetId, createBucket(), 0);
}
bucketsHandler.increaseBucketCount(DATA_SET_ASSIGNMENTS_BY_REVISION_ID_BUCKETS, bucket);
//
BoundStatement boundStatement = addDataSetsRevision.bind(providerId, datasetId, UUID.fromString(bucket.getBucketId()), revision.getRevisionProviderId(), revision.getRevisionName(), revision.getCreationTimeStamp(), representationName, cloudId, revision.isPublished(), revision.isAcceptance(), revision.isDeleted());
BoundStatement boundStatement = addDataSetsRevision.bind(providerId, datasetId, revision.getRevisionProviderId(), revision.getRevisionName(), revision.getCreationTimeStamp(), representationName, cloudId, revision.isPublished(), revision.isAcceptance(), revision.isDeleted());
ResultSet rs = connectionProvider.getSession().execute(boundStatement);
QueryTracer.logConsistencyLevel(boundStatement, rs);
}

public void removeDataSetsRevision(String providerId, String datasetId, Revision revision, String
representationName, String cloudId) {
List<Bucket> availableBuckets = bucketsHandler.getAllBuckets(DATA_SET_ASSIGNMENTS_BY_REVISION_ID_BUCKETS,
createProviderDataSetId(providerId, datasetId));

for (Bucket bucket : availableBuckets) {
BoundStatement boundStatement = removeDataSetsRevision.bind(providerId, datasetId, UUID.fromString(bucket.getBucketId()), revision.getRevisionProviderId(), revision.getRevisionName(), revision.getCreationTimeStamp(),
representationName, cloudId);
ResultSet rs = connectionProvider.getSession().execute(boundStatement);
QueryTracer.logConsistencyLevel(boundStatement, rs);
if (rs.wasApplied()) {
bucketsHandler.decreaseBucketCount(DATA_SET_ASSIGNMENTS_BY_REVISION_ID_BUCKETS, bucket);
return;
}
}
BoundStatement boundStatement = removeDataSetsRevision.bind(providerId, datasetId, revision.getRevisionProviderId(), revision.getRevisionName(), revision.getCreationTimeStamp(),
representationName, cloudId);
ResultSet rs = connectionProvider.getSession().execute(boundStatement);
QueryTracer.logConsistencyLevel(boundStatement, rs);
}

public List<Properties> getDataSetsRevisions(String providerId, String dataSetId, String revisionProviderId, String revisionName, Date revisionTimestamp, String representationName, String nextToken, int limit) {
String providerDataSetId = createProviderDataSetId(providerId, dataSetId);
List<Properties> result = new ArrayList<>(limit);

Bucket bucket = null;
PagingState state;

if (nextToken == null) {
// there is no next token so do not set paging state, take the first bucket for provider's dataset
bucket = bucketsHandler.getNextBucket(DATA_SET_ASSIGNMENTS_BY_REVISION_ID_BUCKETS, providerDataSetId);
state = null;
} else {
// next token is set, parse it to retrieve paging state and bucket id (token is concatenation of paging state and bucket id using _ character
String[] parts = nextToken.split("_");
if (parts.length != 2) {
throw new IllegalArgumentException("nextToken format is wrong. nextToken = " + nextToken);
}

// first element is the paging state
state = getPagingState(parts[0]);
// second element is bucket id
bucket = getAssignmentBucketId(DATA_SET_ASSIGNMENTS_BY_REVISION_ID_BUCKETS, parts[1], state, providerDataSetId);
}

// if the bucket is null it means we reached the end of data
if (bucket == null) {
return result;
}

// bind parameters, set limit to max int value
BoundStatement boundStatement = getDataSetsRevision.bind(providerId, dataSetId, UUID.fromString(bucket.getBucketId()), revisionProviderId, revisionName, revisionTimestamp, representationName, Integer.MAX_VALUE);
BoundStatement boundStatement = getDataSetsRevision.bind(providerId, dataSetId, revisionProviderId, revisionName, revisionTimestamp, representationName, Integer.MAX_VALUE);
// limit page to "limit" number of results
boundStatement.setFetchSize(limit);
// when this is not a first page call set paging state in the statement
if (nextToken != null) {
boundStatement.setPagingState(state);
boundStatement.setPagingState(PagingState.fromString(nextToken));
}
// execute query
ResultSet rs = connectionProvider.getSession().execute(boundStatement);
PagingState ps = rs.getExecutionInfo().getPagingState();
QueryTracer.logConsistencyLevel(boundStatement, rs);

// get available results
Expand All @@ -922,19 +874,13 @@ public List<Properties> getDataSetsRevisions(String providerId, String dataSetId
result.add(properties);
}

if (result.size() == limit && !rs.isExhausted()) {
// we reached the page limit, prepare the next slice string to be used for the next page
Properties properties = new Properties();
properties.put("nextSlice", ps.toString() + "_" + bucket.getBucketId());
result.add(properties);

} else {
// we reached the end of bucket but number of results is less than the page size - in this case if there are more buckets we should retrieve number of results that will feed the page
if (bucketsHandler.getNextBucket(DATA_SET_ASSIGNMENTS_BY_REVISION_ID_BUCKETS, providerDataSetId, bucket) != null) {
String nextSlice = "_" + bucket.getBucketId();
result.addAll(
getDataSetsRevisions(providerId, dataSetId, revisionProviderId, revisionName, revisionTimestamp,
representationName, nextSlice, limit - result.size()));
if (result.size() == limit) {
PagingState pagingState = rs.getExecutionInfo().getPagingState();
// whole page has been retrieved so add paging state for the next call at the end of the results list
if (pagingState != null && !rs.isExhausted()) {
Properties properties = new Properties();
properties.put("nextSlice", pagingState.toString());
result.add(properties);
}
}

Expand Down Expand Up @@ -1100,20 +1046,19 @@ private String getNextSlice(PagingState pagingState, String bucketId, String pro
* Get bucket id from part of token considering paging state which was retrieved from the same token. This is used for data assignment table where
* provider id and dataset id are concatenated to one string
*
* @param bucketsTableName table name used for buckets
* @param tokenPart part of token containing bucket id
* @param state paging state from the same token as the bucket id
* @param tokenPart part of token containing bucket id
* @param state paging state from the same token as the bucket id
* @param providerDataSetId provider id and dataset id to retrieve next bucket id
* @return bucket id to be used for the query
*/
private Bucket getAssignmentBucketId(String bucketsTableName, String tokenPart, PagingState state, String providerDataSetId) {
private Bucket getAssignmentBucketId(String tokenPart, PagingState state, String providerDataSetId) {
if (tokenPart != null && !tokenPart.isEmpty()) {
// when the state passed in the next token is not null we have to use the same bucket id as the paging state is associated with the query having certain parameter values
if (state != null) {
return new Bucket(providerDataSetId, tokenPart, 0);
}
// the state part is empty which means we reached the end of the bucket passed in the next token, therefore we need to get the next bucket
return bucketsHandler.getNextBucket(bucketsTableName, providerDataSetId, new Bucket(providerDataSetId, tokenPart, 0));
return bucketsHandler.getNextBucket(DATA_SET_ASSIGNMENTS_BY_DATA_SET_BUCKETS, providerDataSetId, new Bucket(providerDataSetId, tokenPart, 0));
}
return null;
}
Expand Down Expand Up @@ -1192,7 +1137,8 @@ public void insertProviderDatasetRepresentationInfo(String dataSetId, String dat
// when there is no bucket or bucket rows count is max we should add another bucket
if (bucketCount == null || bucketCount.getRowsCount() >= MAX_PROVIDER_DATASET_BUCKET_COUNT) {
bucketId = createBucket();
} else
}
else
bucketId = bucketCount.getBucketId();
increaseBucketCount(dataSetProviderId, dataSetId, bucketId);

Expand Down Expand Up @@ -1283,7 +1229,7 @@ public void insertLatestProviderDatasetRepresentationInfo(String dataSetId, Stri
throws NoHostAvailableException, QueryExecutionException {

//
BoundStatement deleteStatement = deleteLatestProviderDatasetRepresentationInfo.bind(
BoundStatement deleteStatement = deleteLatestProviderDatasetRepresentationInfo.bind(
dataSetProviderId,
dataSetId,
schema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,9 @@ CREATE TABLE representation_revisions (
PRIMARY KEY ((cloud_id, representation_id), revision_provider_id, revision_name, revision_timestamp, version_id)
)WITH CLUSTERING ORDER BY (revision_provider_id ASC, revision_name ASC, revision_timestamp DESC, version_id ASC) AND comment='Retrieve version_id based on the rest known fields of the primary key';

CREATE TABLE data_set_assignments_by_revision_id_v1 (
CREATE TABLE data_set_assignments_by_revision_id (
provider_id varchar,
dataset_id varchar,
bucket_id timeuuid,
revision_provider_id varchar,
revision_name varchar,
revision_timestamp timestamp,
Expand All @@ -86,17 +85,9 @@ CREATE TABLE data_set_assignments_by_revision_id_v1 (
published boolean,
acceptance boolean,
mark_deleted boolean,
PRIMARY KEY ((provider_id, dataset_id, bucket_id), revision_provider_id, revision_name, revision_timestamp, representation_id, cloud_id)
PRIMARY KEY ((provider_id, dataset_id), revision_provider_id, revision_name, revision_timestamp, representation_id, cloud_id)
)WITH comment='Retrieve cloud Ids based on a known provider_id, dataset_id, revision_id';

CREATE TABLE data_set_assignments_by_revision_id_buckets (
object_id varchar,
bucket_id timeuuid,
rows_count counter,
PRIMARY KEY (object_id, bucket_id)
) WITH comment='Keep track of number of rows in a bucket for provider and dataset id assignments.';


CREATE TABLE datasets_buckets (
provider_id varchar,
dataset_id varchar,
Expand Down
12 changes: 10 additions & 2 deletions tools/ecloud-db-migrator/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@
<dependency>
<groupId>eu.europeana.cloud</groupId>
<artifactId>ecloud-common</artifactId>
<version>1.2.0-SNAPSHOT</version>
<version>0.8-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>eu.europeana.cloud</groupId>
<artifactId>ecloud-service-commons</artifactId>
<version>1.2.0-SNAPSHOT</version>
<version>0.8-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>junit</groupId>
Expand All @@ -55,6 +55,14 @@
<version>2.1.9.2</version>
<scope>test</scope>
</dependency>


<dependency>
<groupId>eu.europeana.cloud</groupId>
<artifactId>ecloud-service-commons</artifactId>
<version>0.8-SNAPSHOT</version>
</dependency>

</dependencies>
<build>
<plugins>
Expand Down

This file was deleted.

Loading

0 comments on commit 76d0ab4

Please sign in to comment.