Skip to content

Commit

Permalink
Merge pull request #12 from europeana/ECL-944
Browse files Browse the repository at this point in the history
Ecl 944
  • Loading branch information
tarekkh authored Jul 20, 2016
2 parents 15c0852 + a352761 commit c515ad6
Show file tree
Hide file tree
Showing 15 changed files with 121 additions and 137 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -84,15 +84,15 @@ public boolean equals(Object o) {
if (state != taskInfo.state) return false;
if (startDate == null)
if (taskInfo.startDate != null) return false;
if (startDate != null)
if (startDate != null && taskInfo.startDate != null)
if (startDate.getTime() != taskInfo.startDate.getTime()) return false;
if (sentDate == null)
if (taskInfo.sentDate != null) return false;
if (sentDate != null)
if (sentDate != null && taskInfo.sentDate != null)
if (sentDate.getTime() != taskInfo.sentDate.getTime()) return false;
if (finishDate == null)
if (taskInfo.finishDate != null) return false;
if (finishDate != null)
if (finishDate != null && taskInfo.finishDate != null)
if (finishDate.getTime() != taskInfo.finishDate.getTime()) return false;

return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,8 @@ public class NotificationBolt extends BaseRichBolt {
private final String keyspaceName;
private final String userName;
private final String password;
private final Boolean grouping;

private Map<Long, NotificationCache> cache;
private String topologyName;

private CassandraTaskInfoDAO taskInfoDAO;
private CassandraSubTaskInfoDAO subTaskInfoDAO;

Expand All @@ -57,30 +55,11 @@ public class NotificationBolt extends BaseRichBolt {
*/
public NotificationBolt(String hosts, int port, String keyspaceName,
String userName, String password) {
this(hosts, port, keyspaceName, userName, password, false);
}

/**
* Constructor of notification bolt.
*
* @param hosts Cassandra hosts separated by comma (e.g.
* localhost,192.168.47.129)
* @param port Cassandra port
* @param keyspaceName Cassandra keyspace name
* @param userName Cassandra username
* @param password Cassandra password
* @param grouping this bolt is connected to topology by fields grouping If true:
* keep number of notifications in memory and emit notification
* when task is completed.
*/
public NotificationBolt(String hosts, int port, String keyspaceName,
String userName, String password, Boolean grouping) {
this.hosts = hosts;
this.port = port;
this.keyspaceName = keyspaceName;
this.userName = userName;
this.password = password;
this.grouping = grouping;
}

@Override
Expand All @@ -89,21 +68,9 @@ public void execute(Tuple tuple) {
NotificationTuple notificationTuple = NotificationTuple
.fromStormTuple(tuple);

switch (notificationTuple.getInformationType()) {
case UPDATE_TASK:
updateTask(notificationTuple.getTaskId(),
notificationTuple.getParameters());
break;
case END_TASK:
endTask(notificationTuple.getTaskId(),
notificationTuple.getParameters());
break;
case NOTIFICATION:
storeNotification(notificationTuple.getTaskId(),
notificationTuple.getParameters());
storeFinishState(notificationTuple.getTaskId());
break;
}
NotificationCache nCache = getNotificationCache(notificationTuple);
storeTaskDetails(notificationTuple, nCache);

} catch (NoHostAvailableException | QueryExecutionException ex) {
LOGGER.error("Cannot store notification to Cassandra because: {}",
ex.getMessage());
Expand All @@ -117,6 +84,44 @@ public void execute(Tuple tuple) {
}
}


private NotificationCache getNotificationCache(NotificationTuple notificationTuple) throws TaskInfoDoesNotExistException {
NotificationCache nCache = cache.get(notificationTuple.getTaskId());
if (nCache == null) {
nCache = new NotificationCache();
long taskId = notificationTuple.getTaskId();
nCache.setTotalSize(getExpectedSize(taskId));
cache.put(taskId, nCache);

}
return nCache;
}

private void storeTaskDetails(NotificationTuple notificationTuple, NotificationCache nCache) throws TaskInfoDoesNotExistException, DatabaseConnectionException {
long taskId = notificationTuple.getTaskId();
switch (notificationTuple.getInformationType()) {
case UPDATE_TASK:
updateTask(taskId,
notificationTuple.getParameters());
break;
case END_TASK:
endTask(taskId,
notificationTuple.getParameters());
break;
case NOTIFICATION:
storeNotification(taskId,
notificationTuple.getParameters());
if (nCache != null) {
nCache.inc();
if (nCache.isComplete()) {
storeFinishState(taskId);
cache.remove(taskId);
}
}
break;
}
}

@Override
public void prepare(Map stormConf, TopologyContext tc, OutputCollector oc) {
CassandraConnectionProvider cassandra = new CassandraConnectionProvider(hosts, port, keyspaceName,
Expand All @@ -127,6 +132,8 @@ public void prepare(Map stormConf, TopologyContext tc, OutputCollector oc) {
this.stormConfig = stormConf;
this.topologyContext = tc;
this.outputCollector = oc;
cache = new HashMap<>();

}

@Override
Expand Down Expand Up @@ -160,14 +167,7 @@ private static Date prepareDate(Object dateObject) {
}

private void storeFinishState(long taskId) throws TaskInfoDoesNotExistException, DatabaseConnectionException {
List<TaskInfo> task = taskInfoDAO.searchById(taskId);
if (task != null && task.get(0) != null) {
long count = taskInfoDAO.getProcessedCount(taskId);
int expectedSize = task.get(0).getContainsElements();
if (count == expectedSize) {
taskInfoDAO.endTask(taskId, "Completely processed", String.valueOf(TaskState.PROCESSED), new Date());
}
}
taskInfoDAO.endTask(taskId, "Completely processed", String.valueOf(TaskState.PROCESSED), new Date());
}

private void storeNotification(long taskId, Map<String, Object> parameters) throws DatabaseConnectionException {
Expand All @@ -179,4 +179,31 @@ private void storeNotification(long taskId, Map<String, Object> parameters) thro
String resultResource = String.valueOf(parameters.get(NotificationParameterKeys.RESULT_RESOURCE));
subTaskInfoDAO.insert(taskId, topologyName, resource, state, infoText, additionalInfo, resultResource);
}

private class NotificationCache {
int totalSize = -1;
int processed = 0;

public void setTotalSize(int totalSize) {
this.totalSize = totalSize;
}

public void inc() {
processed++;
}

public Boolean isComplete() {
return totalSize != -1 ? processed >= totalSize : false;
}
}


private int getExpectedSize(long taskId) throws TaskInfoDoesNotExistException {
TaskInfo task = taskInfoDAO.searchById(taskId);
return task.getContainsElements();

}

}


Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
import eu.europeana.cloud.service.dps.exception.TaskInfoDoesNotExistException;
import eu.europeana.cloud.service.dps.service.cassandra.CassandraTablesAndColumnsNames;


import java.util.ArrayList;
import java.util.Date;
import java.util.List;

Expand All @@ -25,7 +23,6 @@
public class CassandraTaskInfoDAO extends CassandraDAO {
private PreparedStatement taskSearchStatement;
private PreparedStatement taskInsertStatement;
private PreparedStatement notificationResourcesCountStatement;
private CassandraSubTaskInfoDAO cassandraSubTaskInfoDAO;
private PreparedStatement updateTask;
private PreparedStatement endTask;
Expand All @@ -41,8 +38,6 @@ public CassandraTaskInfoDAO(CassandraConnectionProvider dbService) {

@Override
void prepareStatements() {

notificationResourcesCountStatement = dbService.getSession().prepare("SELECT count(*) FROM " + CassandraTablesAndColumnsNames.NOTIFICATIONS_TABLE + " WHERE " + CassandraTablesAndColumnsNames.NOTIFICATION_TASK_ID + " = ?");
taskSearchStatement = dbService.getSession().prepare(
"SELECT * FROM " + CassandraTablesAndColumnsNames.BASIC_INFO_TABLE + " WHERE " + CassandraTablesAndColumnsNames.BASIC_TASK_ID + " = ?");
taskSearchStatement.setConsistencyLevel(dbService.getConsistencyLevel());
Expand All @@ -63,19 +58,16 @@ void prepareStatements() {
taskInsertStatement.setConsistencyLevel(dbService.getConsistencyLevel());
}

public List<TaskInfo> searchById(long taskId)
public TaskInfo searchById(long taskId)
throws NoHostAvailableException, QueryExecutionException, TaskInfoDoesNotExistException {
ResultSet rs = dbService.getSession().execute(taskSearchStatement.bind(taskId));
if (!rs.iterator().hasNext()) {
throw new TaskInfoDoesNotExistException();
}
List<TaskInfo> result = new ArrayList<>();
for (Row row : rs.all()) {
TaskInfo task = new TaskInfo(row.getLong(CassandraTablesAndColumnsNames.BASIC_TASK_ID), row.getString(CassandraTablesAndColumnsNames.BASIC_TOPOLOGY_NAME), TaskState.valueOf(row.getString(CassandraTablesAndColumnsNames.STATE)), row.getString(CassandraTablesAndColumnsNames.INFO), row.getDate(CassandraTablesAndColumnsNames.SENT_TIME), row.getDate(CassandraTablesAndColumnsNames.START_TIME), row.getDate(CassandraTablesAndColumnsNames.FINISH_TIME));
task.setContainsElements(row.getInt(CassandraTablesAndColumnsNames.BASIC_EXPECTED_SIZE));
result.add(task);
}
return result;
Row row = rs.one();
TaskInfo task = new TaskInfo(row.getLong(CassandraTablesAndColumnsNames.BASIC_TASK_ID), row.getString(CassandraTablesAndColumnsNames.BASIC_TOPOLOGY_NAME), TaskState.valueOf(row.getString(CassandraTablesAndColumnsNames.STATE)), row.getString(CassandraTablesAndColumnsNames.INFO), row.getDate(CassandraTablesAndColumnsNames.SENT_TIME), row.getDate(CassandraTablesAndColumnsNames.START_TIME), row.getDate(CassandraTablesAndColumnsNames.FINISH_TIME));
task.setContainsElements(row.getInt(CassandraTablesAndColumnsNames.BASIC_EXPECTED_SIZE));
return task;
}


Expand All @@ -99,25 +91,13 @@ public void insert(long taskId, String topologyName, int expectedSize, String st
insert(taskId, topologyName, expectedSize, state, info, sentTime, null, null);
}

public List<TaskInfo> searchByIdWithSubtasks(long taskId)
public TaskInfo searchByIdWithSubtasks(long taskId)
throws NoHostAvailableException, QueryExecutionException, TaskInfoDoesNotExistException {
List<TaskInfo> result = searchById(taskId);
for (TaskInfo taskInfo : result) {
List<SubTaskInfo> subTasks = cassandraSubTaskInfoDAO.searchById(taskId);
for (SubTaskInfo subTask : subTasks) {
taskInfo.addSubtask(subTask);
}
TaskInfo result = searchById(taskId);
List<SubTaskInfo> subTasks = cassandraSubTaskInfoDAO.searchById(taskId);
for (SubTaskInfo subTask : subTasks) {
result.addSubtask(subTask);
}
return result;
}

public long getProcessedCount(long taskId) throws TaskInfoDoesNotExistException {
ResultSet rs = dbService.getSession().execute(notificationResourcesCountStatement.bind(taskId));
if (!rs.iterator().hasNext()) {
throw new TaskInfoDoesNotExistException();
}
Row row = rs.one();
return row.getLong("count");

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
import eu.europeana.cloud.service.dps.storm.utils.CassandraTaskInfoDAO;
import eu.europeana.cloud.service.dps.storm.utils.CassandraTestBase;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.mockito.Mockito;

Expand Down Expand Up @@ -54,15 +53,14 @@ public void testUpdateBasicInfoStateWithStartDateAndInfo() throws Exception {
String taskInfo = "";
Date startTime = new Date();
TaskInfo expectedTaskInfo = createTaskInfo(taskId, containsElements, topologyName, taskState, taskInfo, null, startTime, null);
taskInfoDAO.insert(taskId, topologyName, containsElements, taskState.toString(), taskInfo, null, startTime, null);
final Tuple tuple = createTestTuple(NotificationTuple.prepareUpdateTask(taskId, taskInfo, taskState, startTime));
//when
testedBolt.execute(tuple);
//then
List<TaskInfo> result = taskInfoDAO.searchById(taskId);
TaskInfo result = taskInfoDAO.searchById(taskId);
assertThat(result, notNullValue());
assertThat(result.size(), is(equalTo(1)));
TaskInfo info = result.get(0);
assertThat(info, is(expectedTaskInfo));
assertThat(result, is(expectedTaskInfo));
}

@Test
Expand All @@ -75,15 +73,14 @@ public void testUpdateBasicInfoStateWithFinishDateAndInfo() throws Exception {
String taskInfo = "";
Date finishDate = new Date();
TaskInfo expectedTaskInfo = createTaskInfo(taskId, containsElements, topologyName, taskState, taskInfo, null, null, finishDate);
taskInfoDAO.insert(taskId, topologyName, containsElements, taskState.toString(), taskInfo, null, null, finishDate);
final Tuple tuple = createTestTuple(NotificationTuple.prepareEndTask(taskId, taskInfo, taskState, finishDate));
//when
testedBolt.execute(tuple);
//then
List<TaskInfo> result = taskInfoDAO.searchById(taskId);
TaskInfo result = taskInfoDAO.searchById(taskId);
assertThat(result, notNullValue());
assertThat(result.size(), is(equalTo(1)));
TaskInfo info = result.get(0);
assertThat(info, is(expectedTaskInfo));
assertThat(result, is(expectedTaskInfo));
}

@Test
Expand All @@ -94,25 +91,26 @@ public void testSuccessfulNotificationTuple() throws Exception {
String topologyName = null;
TaskState taskState = TaskState.CURRENTLY_PROCESSING;
String taskInfo = "";
TaskInfo expectedTaskInfo = createTaskInfo(taskId, containsElements, topologyName, taskState, taskInfo, null, null, null);
TaskInfo expectedTaskInfo = createTaskInfo(taskId, containsElements, topologyName, TaskState.PROCESSED, taskInfo, null, null, null);
taskInfoDAO.insert(taskId, topologyName, containsElements, taskState.toString(), taskInfo, null, null, null);
String resource = "resource";
States state = States.SUCCESS;
String text = "text";
String additionalInformation = "additionalInformations";
String resultResource = "";
expectedTaskInfo.addSubtask(new SubTaskInfo(resource, state, text, additionalInformation, resultResource));

final Tuple setUpTuple = createTestTuple(NotificationTuple.prepareUpdateTask(taskId, taskInfo, taskState, null));
testedBolt.execute(setUpTuple);

final Tuple tuple = createTestTuple(NotificationTuple.prepareNotification(taskId, resource, state, text, additionalInformation, resultResource));
//when
testedBolt.execute(tuple);
//then
List<TaskInfo> result = taskInfoDAO.searchByIdWithSubtasks(taskId);
TaskInfo result = taskInfoDAO.searchByIdWithSubtasks(taskId);
assertThat(result, notNullValue());
assertThat(result.size(), is(equalTo(1)));
TaskInfo info = result.get(0);
info.setContainsElements(info.getSubtasks().size());
assertThat(info, is(expectedTaskInfo));
result.setContainsElements(result.getSubtasks().size());
assertThat(result, is(expectedTaskInfo));
}

private TaskInfo createTaskInfo(long taskId, int containElement, String topologyName, TaskState state, String info, Date sentTime, Date startTime, Date finishTime) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public static void main(String[] args) throws Exception {
"imageConversionBolt");
builder.setBolt("notificationBolt", new NotificationBolt("iks-kbase.synat.pcss.pl",
9042, "ecloud_dps",
"cassandra", "cassandra", true),
"cassandra", "cassandra"),
1)
.fieldsGrouping("retrieveFileBolt", AbstractDpsBolt.NOTIFICATION_STREAM_NAME, new Fields(NotificationTuple.taskIdFieldName))
.fieldsGrouping("imageConversionBolt", AbstractDpsBolt.NOTIFICATION_STREAM_NAME, new Fields(NotificationTuple.taskIdFieldName))
Expand Down
Loading

0 comments on commit c515ad6

Please sign in to comment.