From acf6dfa8ffdf5ad9214f2df142d230af116d8554 Mon Sep 17 00:00:00 2001 From: Tarek Alkhaeir Date: Wed, 20 Jul 2016 09:39:52 +0200 Subject: [PATCH 1/2] ECL-927, small refactoring --- .../service/dps/storm/io/ReadDatasetBolt.java | 21 ++++++++----------- .../dps/storm/io/ReadDatasetsBolt.java | 5 ----- .../service/dps/storm/io/ReadFileBolt.java | 9 +------- .../dps/storm/io/ReadRepresentationBolt.java | 9 ++++---- .../service/dps/storm/io/WriteRecordBolt.java | 10 +++------ .../dps/storm/io/ReadDataSetBoltTest.java | 4 ++-- .../storm/io/ReadRepresentationBoltTest.java | 2 +- 7 files changed, 20 insertions(+), 40 deletions(-) diff --git a/service/dps/storm/io/src/main/java/eu/europeana/cloud/service/dps/storm/io/ReadDatasetBolt.java b/service/dps/storm/io/src/main/java/eu/europeana/cloud/service/dps/storm/io/ReadDatasetBolt.java index 38fce7662a..c819f07a4b 100644 --- a/service/dps/storm/io/src/main/java/eu/europeana/cloud/service/dps/storm/io/ReadDatasetBolt.java +++ b/service/dps/storm/io/src/main/java/eu/europeana/cloud/service/dps/storm/io/ReadDatasetBolt.java @@ -23,8 +23,6 @@ */ public class ReadDatasetBolt extends AbstractDpsBolt { - - private DataSetServiceClient datasetClient; private static final Logger LOGGER = LoggerFactory.getLogger(ReadDatasetBolt.class); private final String ecloudMcsAddress; @@ -35,37 +33,36 @@ public ReadDatasetBolt(String ecloudMcsAddress) { /** * Should be used only on tests. */ - public static ReadDatasetBolt getTestInstance(String ecloudMcsAddress, OutputCollector outputCollector, - DataSetServiceClient datasetClient) { + public static ReadDatasetBolt getTestInstance(String ecloudMcsAddress, OutputCollector outputCollector + ) { ReadDatasetBolt instance = new ReadDatasetBolt(ecloudMcsAddress); instance.outputCollector = outputCollector; - instance.datasetClient = datasetClient; return instance; } @Override public void prepare() { - } @Override public void execute(StormTaskTuple t) { - datasetClient = new DataSetServiceClient(ecloudMcsAddress); - emitSingleRepresentationFromDataSet(t); + DataSetServiceClient datasetClient = new DataSetServiceClient(ecloudMcsAddress); + final String authorizationHeader = t.getParameter(PluginParameterKeys.AUTHORIZATION_HEADER); + datasetClient.useAuthorizationHeader(authorizationHeader); + emitSingleRepresentationFromDataSet(t, datasetClient); } - public void emitSingleRepresentationFromDataSet(StormTaskTuple t) { - final String authorizationHeader = t.getParameter(PluginParameterKeys.AUTHORIZATION_HEADER); + public void emitSingleRepresentationFromDataSet(StormTaskTuple t, DataSetServiceClient dataSetServiceClient) { final String dataSetUrl = t.getParameter(PluginParameterKeys.DATASET_URL); final String representationName = t.getParameter(PluginParameterKeys.REPRESENTATION_NAME); t.getParameters().remove(PluginParameterKeys.DATASET_URL); - datasetClient.useAuthorizationHeader(authorizationHeader); + if (dataSetUrl != null) { try { final UrlParser urlParser = new UrlParser(dataSetUrl); if (urlParser.isUrlToDataset()) { - List representations = datasetClient.getDataSetRepresentations(urlParser.getPart(UrlPart.DATA_PROVIDERS), + List representations = dataSetServiceClient.getDataSetRepresentations(urlParser.getPart(UrlPart.DATA_PROVIDERS), urlParser.getPart(UrlPart.DATA_SETS)); t.getParameters().remove(PluginParameterKeys.REPRESENTATION_NAME); for (Representation representation : representations) { diff --git a/service/dps/storm/io/src/main/java/eu/europeana/cloud/service/dps/storm/io/ReadDatasetsBolt.java b/service/dps/storm/io/src/main/java/eu/europeana/cloud/service/dps/storm/io/ReadDatasetsBolt.java index f6baa3dda4..2f3d214242 100644 --- a/service/dps/storm/io/src/main/java/eu/europeana/cloud/service/dps/storm/io/ReadDatasetsBolt.java +++ b/service/dps/storm/io/src/main/java/eu/europeana/cloud/service/dps/storm/io/ReadDatasetsBolt.java @@ -1,19 +1,14 @@ package eu.europeana.cloud.service.dps.storm.io; import backtype.storm.task.OutputCollector; -import com.google.gson.Gson; -import com.google.gson.reflect.TypeToken; import com.rits.cloning.Cloner; import eu.europeana.cloud.common.model.dps.TaskState; -import eu.europeana.cloud.mcs.driver.DataSetServiceClient; -import eu.europeana.cloud.service.dps.DpsTask; import eu.europeana.cloud.service.dps.PluginParameterKeys; import eu.europeana.cloud.service.dps.storm.AbstractDpsBolt; import eu.europeana.cloud.service.dps.storm.StormTaskTuple; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.lang.reflect.Type; import java.util.*; /** diff --git a/service/dps/storm/io/src/main/java/eu/europeana/cloud/service/dps/storm/io/ReadFileBolt.java b/service/dps/storm/io/src/main/java/eu/europeana/cloud/service/dps/storm/io/ReadFileBolt.java index e69c1f8b33..25c31f0168 100644 --- a/service/dps/storm/io/src/main/java/eu/europeana/cloud/service/dps/storm/io/ReadFileBolt.java +++ b/service/dps/storm/io/src/main/java/eu/europeana/cloud/service/dps/storm/io/ReadFileBolt.java @@ -1,16 +1,10 @@ package eu.europeana.cloud.service.dps.storm.io; -import backtype.storm.task.OutputCollector; -import com.google.gson.Gson; -import com.google.gson.reflect.TypeToken; import com.rits.cloning.Cloner; - import java.io.IOException; import java.io.InputStream; import java.util.*; - import eu.europeana.cloud.common.model.dps.TaskState; -import eu.europeana.cloud.mcs.driver.DataSetServiceClient; import eu.europeana.cloud.service.mcs.exception.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,7 +28,6 @@ public class ReadFileBolt extends AbstractDpsBolt { * Properties to connect to eCloud */ private final String ecloudMcsAddress; - private FileServiceClient fileClient; public ReadFileBolt(String ecloudMcsAddress) { this.ecloudMcsAddress = ecloudMcsAddress; @@ -64,7 +57,7 @@ public void execute(StormTaskTuple t) { private void emitFiles(StormTaskTuple t, List files) { StormTaskTuple tt; final String authorizationHeader = t.getParameter(PluginParameterKeys.AUTHORIZATION_HEADER); - fileClient = new FileServiceClient(ecloudMcsAddress); + FileServiceClient fileClient = new FileServiceClient(ecloudMcsAddress); fileClient.useAuthorizationHeader(authorizationHeader); for (String file : files) { tt = new Cloner().deepClone(t); //without cloning every emitted tuple will have the same object!!! diff --git a/service/dps/storm/io/src/main/java/eu/europeana/cloud/service/dps/storm/io/ReadRepresentationBolt.java b/service/dps/storm/io/src/main/java/eu/europeana/cloud/service/dps/storm/io/ReadRepresentationBolt.java index d1d98c16fb..970be38be7 100644 --- a/service/dps/storm/io/src/main/java/eu/europeana/cloud/service/dps/storm/io/ReadRepresentationBolt.java +++ b/service/dps/storm/io/src/main/java/eu/europeana/cloud/service/dps/storm/io/ReadRepresentationBolt.java @@ -21,7 +21,7 @@ public class ReadRepresentationBolt extends AbstractDpsBolt { private static final Logger LOGGER = LoggerFactory.getLogger(ReadRepresentationBolt.class); private final String ecloudMcsAddress; - private FileServiceClient fileClient; + public ReadRepresentationBolt(String ecloudMcsAddress) { this.ecloudMcsAddress = ecloudMcsAddress; @@ -30,11 +30,10 @@ public ReadRepresentationBolt(String ecloudMcsAddress) { /** * Should be used only on tests. */ - public static ReadRepresentationBolt getTestInstance(String ecloudMcsAddress, OutputCollector outputCollector, - FileServiceClient fileClient) { + public static ReadRepresentationBolt getTestInstance(String ecloudMcsAddress, OutputCollector outputCollector + ) { ReadRepresentationBolt instance = new ReadRepresentationBolt(ecloudMcsAddress); instance.outputCollector = outputCollector; - instance.fileClient = fileClient; return instance; } @@ -46,11 +45,11 @@ public void prepare() { @Override public void execute(StormTaskTuple t) { - fileClient = new FileServiceClient(ecloudMcsAddress); readRepresentationBolt(t); } private void readRepresentationBolt(StormTaskTuple t) { + FileServiceClient fileClient = new FileServiceClient(ecloudMcsAddress); final String authorizationHeader = t.getParameter(PluginParameterKeys.AUTHORIZATION_HEADER); final String jsonRepresentation = t.getParameter(PluginParameterKeys.REPRESENTATION); t.getParameters().remove(PluginParameterKeys.REPRESENTATION); diff --git a/service/dps/storm/io/src/main/java/eu/europeana/cloud/service/dps/storm/io/WriteRecordBolt.java b/service/dps/storm/io/src/main/java/eu/europeana/cloud/service/dps/storm/io/WriteRecordBolt.java index 5b1bbc7e55..290b169dc4 100644 --- a/service/dps/storm/io/src/main/java/eu/europeana/cloud/service/dps/storm/io/WriteRecordBolt.java +++ b/service/dps/storm/io/src/main/java/eu/europeana/cloud/service/dps/storm/io/WriteRecordBolt.java @@ -1,10 +1,8 @@ package eu.europeana.cloud.service.dps.storm.io; -import com.google.gson.Gson; -import com.google.gson.reflect.TypeToken; + import eu.europeana.cloud.common.model.Representation; import eu.europeana.cloud.common.model.dps.States; -import eu.europeana.cloud.common.web.ParamConstants; import eu.europeana.cloud.mcs.driver.FileServiceClient; import eu.europeana.cloud.mcs.driver.RecordServiceClient; import eu.europeana.cloud.service.commons.urls.UrlParser; @@ -33,8 +31,6 @@ */ public class WriteRecordBolt extends AbstractDpsBolt { private String ecloudMcsAddress; - private FileServiceClient mcsClient; - private RecordServiceClient recordServiceClient; public static final Logger LOGGER = LoggerFactory.getLogger(WriteRecordBolt.class); public WriteRecordBolt(String ecloudMcsAddress) { @@ -66,8 +62,8 @@ public void execute(StormTaskTuple t) { } private URI uploadFileInNewRepresentation(StormTaskTuple stormTaskTuple) throws MalformedURLException, MCSException { - mcsClient = new FileServiceClient(ecloudMcsAddress); - recordServiceClient = new RecordServiceClient(ecloudMcsAddress); + FileServiceClient mcsClient = new FileServiceClient(ecloudMcsAddress); + RecordServiceClient recordServiceClient = new RecordServiceClient(ecloudMcsAddress); URI newFileUri = null; final UrlParser urlParser = new UrlParser(stormTaskTuple.getFileUrl()); if (urlParser.isUrlToRepresentationVersionFile()) { diff --git a/service/dps/storm/topologies/ic/ic-topology/src/test/java/eu/europeana/cloud/service/dps/storm/io/ReadDataSetBoltTest.java b/service/dps/storm/topologies/ic/ic-topology/src/test/java/eu/europeana/cloud/service/dps/storm/io/ReadDataSetBoltTest.java index 53fef0dccd..22db3057b1 100644 --- a/service/dps/storm/topologies/ic/ic-topology/src/test/java/eu/europeana/cloud/service/dps/storm/io/ReadDataSetBoltTest.java +++ b/service/dps/storm/topologies/ic/ic-topology/src/test/java/eu/europeana/cloud/service/dps/storm/io/ReadDataSetBoltTest.java @@ -46,7 +46,7 @@ public class ReadDataSetBoltTest implements TestConstantsHelper { public void init() { oc = mock(OutputCollector.class); datasetClient = mock(DataSetServiceClient.class); - instance = getTestInstance("http://localhost:8080/mcs", oc, datasetClient); + instance = getTestInstance("http://localhost:8080/mcs", oc); } @Captor @@ -63,7 +63,7 @@ public void successfulExecuteStormTuple() throws MCSException, URISyntaxExceptio //when - instance.emitSingleRepresentationFromDataSet(tuple); + instance.emitSingleRepresentationFromDataSet(tuple, datasetClient); //then Representation expectedRepresentation = representationList.get(0); diff --git a/service/dps/storm/topologies/ic/ic-topology/src/test/java/eu/europeana/cloud/service/dps/storm/io/ReadRepresentationBoltTest.java b/service/dps/storm/topologies/ic/ic-topology/src/test/java/eu/europeana/cloud/service/dps/storm/io/ReadRepresentationBoltTest.java index 08c909ccce..d70dd58f0f 100644 --- a/service/dps/storm/topologies/ic/ic-topology/src/test/java/eu/europeana/cloud/service/dps/storm/io/ReadRepresentationBoltTest.java +++ b/service/dps/storm/topologies/ic/ic-topology/src/test/java/eu/europeana/cloud/service/dps/storm/io/ReadRepresentationBoltTest.java @@ -49,7 +49,7 @@ public class ReadRepresentationBoltTest implements TestConstantsHelper { public void init() { oc = mock(OutputCollector.class); fileClient = mock(FileServiceClient.class); - instance = getTestInstance("http://localhost:8080/mcs", oc, fileClient); + instance = getTestInstance("http://localhost:8080/mcs", oc); } @Captor From a352761ad8b1f76421adca1e2211ff01a98f184f Mon Sep 17 00:00:00 2001 From: Tarek Alkhaeir Date: Wed, 20 Jul 2016 14:41:38 +0200 Subject: [PATCH 2/2] ECL-944, Change the way we discover the end of a task --- .../cloud/common/model/dps/TaskInfo.java | 6 +- .../service/dps/storm/NotificationBolt.java | 117 +++++++++++------- .../dps/storm/utils/CassandraTaskInfoDAO.java | 40 ++---- .../dps/storm/NotificationBoltTest.java | 28 ++--- .../dps/ic/topology/StaticICTopology.java | 2 +- .../topologies/ic/topology/ICTopology.java | 2 +- .../eu/europeana/cloud/ICTopologyTest.java | 1 - .../storm/topologies/xslt/XSLTTopology.java | 2 +- 8 files changed, 101 insertions(+), 97 deletions(-) diff --git a/common/src/main/java/eu/europeana/cloud/common/model/dps/TaskInfo.java b/common/src/main/java/eu/europeana/cloud/common/model/dps/TaskInfo.java index 90f0974469..698829fe21 100644 --- a/common/src/main/java/eu/europeana/cloud/common/model/dps/TaskInfo.java +++ b/common/src/main/java/eu/europeana/cloud/common/model/dps/TaskInfo.java @@ -84,15 +84,15 @@ public boolean equals(Object o) { if (state != taskInfo.state) return false; if (startDate == null) if (taskInfo.startDate != null) return false; - if (startDate != null) + if (startDate != null && taskInfo.startDate != null) if (startDate.getTime() != taskInfo.startDate.getTime()) return false; if (sentDate == null) if (taskInfo.sentDate != null) return false; - if (sentDate != null) + if (sentDate != null && taskInfo.sentDate != null) if (sentDate.getTime() != taskInfo.sentDate.getTime()) return false; if (finishDate == null) if (taskInfo.finishDate != null) return false; - if (finishDate != null) + if (finishDate != null && taskInfo.finishDate != null) if (finishDate.getTime() != taskInfo.finishDate.getTime()) return false; return true; diff --git a/service/dps/storm/common/src/main/java/eu/europeana/cloud/service/dps/storm/NotificationBolt.java b/service/dps/storm/common/src/main/java/eu/europeana/cloud/service/dps/storm/NotificationBolt.java index 091d90e6df..efcc1534e2 100644 --- a/service/dps/storm/common/src/main/java/eu/europeana/cloud/service/dps/storm/NotificationBolt.java +++ b/service/dps/storm/common/src/main/java/eu/europeana/cloud/service/dps/storm/NotificationBolt.java @@ -38,10 +38,8 @@ public class NotificationBolt extends BaseRichBolt { private final String keyspaceName; private final String userName; private final String password; - private final Boolean grouping; - + private Map cache; private String topologyName; - private CassandraTaskInfoDAO taskInfoDAO; private CassandraSubTaskInfoDAO subTaskInfoDAO; @@ -57,30 +55,11 @@ public class NotificationBolt extends BaseRichBolt { */ public NotificationBolt(String hosts, int port, String keyspaceName, String userName, String password) { - this(hosts, port, keyspaceName, userName, password, false); - } - - /** - * Constructor of notification bolt. - * - * @param hosts Cassandra hosts separated by comma (e.g. - * localhost,192.168.47.129) - * @param port Cassandra port - * @param keyspaceName Cassandra keyspace name - * @param userName Cassandra username - * @param password Cassandra password - * @param grouping this bolt is connected to topology by fields grouping If true: - * keep number of notifications in memory and emit notification - * when task is completed. - */ - public NotificationBolt(String hosts, int port, String keyspaceName, - String userName, String password, Boolean grouping) { this.hosts = hosts; this.port = port; this.keyspaceName = keyspaceName; this.userName = userName; this.password = password; - this.grouping = grouping; } @Override @@ -89,21 +68,9 @@ public void execute(Tuple tuple) { NotificationTuple notificationTuple = NotificationTuple .fromStormTuple(tuple); - switch (notificationTuple.getInformationType()) { - case UPDATE_TASK: - updateTask(notificationTuple.getTaskId(), - notificationTuple.getParameters()); - break; - case END_TASK: - endTask(notificationTuple.getTaskId(), - notificationTuple.getParameters()); - break; - case NOTIFICATION: - storeNotification(notificationTuple.getTaskId(), - notificationTuple.getParameters()); - storeFinishState(notificationTuple.getTaskId()); - break; - } + NotificationCache nCache = getNotificationCache(notificationTuple); + storeTaskDetails(notificationTuple, nCache); + } catch (NoHostAvailableException | QueryExecutionException ex) { LOGGER.error("Cannot store notification to Cassandra because: {}", ex.getMessage()); @@ -117,6 +84,44 @@ public void execute(Tuple tuple) { } } + + private NotificationCache getNotificationCache(NotificationTuple notificationTuple) throws TaskInfoDoesNotExistException { + NotificationCache nCache = cache.get(notificationTuple.getTaskId()); + if (nCache == null) { + nCache = new NotificationCache(); + long taskId = notificationTuple.getTaskId(); + nCache.setTotalSize(getExpectedSize(taskId)); + cache.put(taskId, nCache); + + } + return nCache; + } + + private void storeTaskDetails(NotificationTuple notificationTuple, NotificationCache nCache) throws TaskInfoDoesNotExistException, DatabaseConnectionException { + long taskId = notificationTuple.getTaskId(); + switch (notificationTuple.getInformationType()) { + case UPDATE_TASK: + updateTask(taskId, + notificationTuple.getParameters()); + break; + case END_TASK: + endTask(taskId, + notificationTuple.getParameters()); + break; + case NOTIFICATION: + storeNotification(taskId, + notificationTuple.getParameters()); + if (nCache != null) { + nCache.inc(); + if (nCache.isComplete()) { + storeFinishState(taskId); + cache.remove(taskId); + } + } + break; + } + } + @Override public void prepare(Map stormConf, TopologyContext tc, OutputCollector oc) { CassandraConnectionProvider cassandra = new CassandraConnectionProvider(hosts, port, keyspaceName, @@ -127,6 +132,8 @@ public void prepare(Map stormConf, TopologyContext tc, OutputCollector oc) { this.stormConfig = stormConf; this.topologyContext = tc; this.outputCollector = oc; + cache = new HashMap<>(); + } @Override @@ -160,14 +167,7 @@ private static Date prepareDate(Object dateObject) { } private void storeFinishState(long taskId) throws TaskInfoDoesNotExistException, DatabaseConnectionException { - List task = taskInfoDAO.searchById(taskId); - if (task != null && task.get(0) != null) { - long count = taskInfoDAO.getProcessedCount(taskId); - int expectedSize = task.get(0).getContainsElements(); - if (count == expectedSize) { - taskInfoDAO.endTask(taskId, "Completely processed", String.valueOf(TaskState.PROCESSED), new Date()); - } - } + taskInfoDAO.endTask(taskId, "Completely processed", String.valueOf(TaskState.PROCESSED), new Date()); } private void storeNotification(long taskId, Map parameters) throws DatabaseConnectionException { @@ -179,4 +179,31 @@ private void storeNotification(long taskId, Map parameters) thro String resultResource = String.valueOf(parameters.get(NotificationParameterKeys.RESULT_RESOURCE)); subTaskInfoDAO.insert(taskId, topologyName, resource, state, infoText, additionalInfo, resultResource); } + + private class NotificationCache { + int totalSize = -1; + int processed = 0; + + public void setTotalSize(int totalSize) { + this.totalSize = totalSize; + } + + public void inc() { + processed++; + } + + public Boolean isComplete() { + return totalSize != -1 ? processed >= totalSize : false; + } + } + + + private int getExpectedSize(long taskId) throws TaskInfoDoesNotExistException { + TaskInfo task = taskInfoDAO.searchById(taskId); + return task.getContainsElements(); + + } + } + + diff --git a/service/dps/storm/common/src/main/java/eu/europeana/cloud/service/dps/storm/utils/CassandraTaskInfoDAO.java b/service/dps/storm/common/src/main/java/eu/europeana/cloud/service/dps/storm/utils/CassandraTaskInfoDAO.java index 13ffe89ffd..a497f45be5 100644 --- a/service/dps/storm/common/src/main/java/eu/europeana/cloud/service/dps/storm/utils/CassandraTaskInfoDAO.java +++ b/service/dps/storm/common/src/main/java/eu/europeana/cloud/service/dps/storm/utils/CassandraTaskInfoDAO.java @@ -12,8 +12,6 @@ import eu.europeana.cloud.service.dps.exception.TaskInfoDoesNotExistException; import eu.europeana.cloud.service.dps.service.cassandra.CassandraTablesAndColumnsNames; - -import java.util.ArrayList; import java.util.Date; import java.util.List; @@ -25,7 +23,6 @@ public class CassandraTaskInfoDAO extends CassandraDAO { private PreparedStatement taskSearchStatement; private PreparedStatement taskInsertStatement; - private PreparedStatement notificationResourcesCountStatement; private CassandraSubTaskInfoDAO cassandraSubTaskInfoDAO; private PreparedStatement updateTask; private PreparedStatement endTask; @@ -41,8 +38,6 @@ public CassandraTaskInfoDAO(CassandraConnectionProvider dbService) { @Override void prepareStatements() { - - notificationResourcesCountStatement = dbService.getSession().prepare("SELECT count(*) FROM " + CassandraTablesAndColumnsNames.NOTIFICATIONS_TABLE + " WHERE " + CassandraTablesAndColumnsNames.NOTIFICATION_TASK_ID + " = ?"); taskSearchStatement = dbService.getSession().prepare( "SELECT * FROM " + CassandraTablesAndColumnsNames.BASIC_INFO_TABLE + " WHERE " + CassandraTablesAndColumnsNames.BASIC_TASK_ID + " = ?"); taskSearchStatement.setConsistencyLevel(dbService.getConsistencyLevel()); @@ -63,19 +58,16 @@ void prepareStatements() { taskInsertStatement.setConsistencyLevel(dbService.getConsistencyLevel()); } - public List searchById(long taskId) + public TaskInfo searchById(long taskId) throws NoHostAvailableException, QueryExecutionException, TaskInfoDoesNotExistException { ResultSet rs = dbService.getSession().execute(taskSearchStatement.bind(taskId)); if (!rs.iterator().hasNext()) { throw new TaskInfoDoesNotExistException(); } - List result = new ArrayList<>(); - for (Row row : rs.all()) { - TaskInfo task = new TaskInfo(row.getLong(CassandraTablesAndColumnsNames.BASIC_TASK_ID), row.getString(CassandraTablesAndColumnsNames.BASIC_TOPOLOGY_NAME), TaskState.valueOf(row.getString(CassandraTablesAndColumnsNames.STATE)), row.getString(CassandraTablesAndColumnsNames.INFO), row.getDate(CassandraTablesAndColumnsNames.SENT_TIME), row.getDate(CassandraTablesAndColumnsNames.START_TIME), row.getDate(CassandraTablesAndColumnsNames.FINISH_TIME)); - task.setContainsElements(row.getInt(CassandraTablesAndColumnsNames.BASIC_EXPECTED_SIZE)); - result.add(task); - } - return result; + Row row = rs.one(); + TaskInfo task = new TaskInfo(row.getLong(CassandraTablesAndColumnsNames.BASIC_TASK_ID), row.getString(CassandraTablesAndColumnsNames.BASIC_TOPOLOGY_NAME), TaskState.valueOf(row.getString(CassandraTablesAndColumnsNames.STATE)), row.getString(CassandraTablesAndColumnsNames.INFO), row.getDate(CassandraTablesAndColumnsNames.SENT_TIME), row.getDate(CassandraTablesAndColumnsNames.START_TIME), row.getDate(CassandraTablesAndColumnsNames.FINISH_TIME)); + task.setContainsElements(row.getInt(CassandraTablesAndColumnsNames.BASIC_EXPECTED_SIZE)); + return task; } @@ -99,25 +91,13 @@ public void insert(long taskId, String topologyName, int expectedSize, String st insert(taskId, topologyName, expectedSize, state, info, sentTime, null, null); } - public List searchByIdWithSubtasks(long taskId) + public TaskInfo searchByIdWithSubtasks(long taskId) throws NoHostAvailableException, QueryExecutionException, TaskInfoDoesNotExistException { - List result = searchById(taskId); - for (TaskInfo taskInfo : result) { - List subTasks = cassandraSubTaskInfoDAO.searchById(taskId); - for (SubTaskInfo subTask : subTasks) { - taskInfo.addSubtask(subTask); - } + TaskInfo result = searchById(taskId); + List subTasks = cassandraSubTaskInfoDAO.searchById(taskId); + for (SubTaskInfo subTask : subTasks) { + result.addSubtask(subTask); } return result; } - - public long getProcessedCount(long taskId) throws TaskInfoDoesNotExistException { - ResultSet rs = dbService.getSession().execute(notificationResourcesCountStatement.bind(taskId)); - if (!rs.iterator().hasNext()) { - throw new TaskInfoDoesNotExistException(); - } - Row row = rs.one(); - return row.getLong("count"); - - } } diff --git a/service/dps/storm/common/src/test/java/eu/europeana/cloud/service/dps/storm/NotificationBoltTest.java b/service/dps/storm/common/src/test/java/eu/europeana/cloud/service/dps/storm/NotificationBoltTest.java index 0ae9788a71..13ea719eaf 100644 --- a/service/dps/storm/common/src/test/java/eu/europeana/cloud/service/dps/storm/NotificationBoltTest.java +++ b/service/dps/storm/common/src/test/java/eu/europeana/cloud/service/dps/storm/NotificationBoltTest.java @@ -16,7 +16,6 @@ import eu.europeana.cloud.service.dps.storm.utils.CassandraTaskInfoDAO; import eu.europeana.cloud.service.dps.storm.utils.CassandraTestBase; import org.junit.Before; -import org.junit.Ignore; import org.junit.Test; import org.mockito.Mockito; @@ -54,15 +53,14 @@ public void testUpdateBasicInfoStateWithStartDateAndInfo() throws Exception { String taskInfo = ""; Date startTime = new Date(); TaskInfo expectedTaskInfo = createTaskInfo(taskId, containsElements, topologyName, taskState, taskInfo, null, startTime, null); + taskInfoDAO.insert(taskId, topologyName, containsElements, taskState.toString(), taskInfo, null, startTime, null); final Tuple tuple = createTestTuple(NotificationTuple.prepareUpdateTask(taskId, taskInfo, taskState, startTime)); //when testedBolt.execute(tuple); //then - List result = taskInfoDAO.searchById(taskId); + TaskInfo result = taskInfoDAO.searchById(taskId); assertThat(result, notNullValue()); - assertThat(result.size(), is(equalTo(1))); - TaskInfo info = result.get(0); - assertThat(info, is(expectedTaskInfo)); + assertThat(result, is(expectedTaskInfo)); } @Test @@ -75,15 +73,14 @@ public void testUpdateBasicInfoStateWithFinishDateAndInfo() throws Exception { String taskInfo = ""; Date finishDate = new Date(); TaskInfo expectedTaskInfo = createTaskInfo(taskId, containsElements, topologyName, taskState, taskInfo, null, null, finishDate); + taskInfoDAO.insert(taskId, topologyName, containsElements, taskState.toString(), taskInfo, null, null, finishDate); final Tuple tuple = createTestTuple(NotificationTuple.prepareEndTask(taskId, taskInfo, taskState, finishDate)); //when testedBolt.execute(tuple); //then - List result = taskInfoDAO.searchById(taskId); + TaskInfo result = taskInfoDAO.searchById(taskId); assertThat(result, notNullValue()); - assertThat(result.size(), is(equalTo(1))); - TaskInfo info = result.get(0); - assertThat(info, is(expectedTaskInfo)); + assertThat(result, is(expectedTaskInfo)); } @Test @@ -94,25 +91,26 @@ public void testSuccessfulNotificationTuple() throws Exception { String topologyName = null; TaskState taskState = TaskState.CURRENTLY_PROCESSING; String taskInfo = ""; - TaskInfo expectedTaskInfo = createTaskInfo(taskId, containsElements, topologyName, taskState, taskInfo, null, null, null); + TaskInfo expectedTaskInfo = createTaskInfo(taskId, containsElements, topologyName, TaskState.PROCESSED, taskInfo, null, null, null); + taskInfoDAO.insert(taskId, topologyName, containsElements, taskState.toString(), taskInfo, null, null, null); String resource = "resource"; States state = States.SUCCESS; String text = "text"; String additionalInformation = "additionalInformations"; String resultResource = ""; expectedTaskInfo.addSubtask(new SubTaskInfo(resource, state, text, additionalInformation, resultResource)); + final Tuple setUpTuple = createTestTuple(NotificationTuple.prepareUpdateTask(taskId, taskInfo, taskState, null)); testedBolt.execute(setUpTuple); + final Tuple tuple = createTestTuple(NotificationTuple.prepareNotification(taskId, resource, state, text, additionalInformation, resultResource)); //when testedBolt.execute(tuple); //then - List result = taskInfoDAO.searchByIdWithSubtasks(taskId); + TaskInfo result = taskInfoDAO.searchByIdWithSubtasks(taskId); assertThat(result, notNullValue()); - assertThat(result.size(), is(equalTo(1))); - TaskInfo info = result.get(0); - info.setContainsElements(info.getSubtasks().size()); - assertThat(info, is(expectedTaskInfo)); + result.setContainsElements(result.getSubtasks().size()); + assertThat(result, is(expectedTaskInfo)); } private TaskInfo createTaskInfo(long taskId, int containElement, String topologyName, TaskState state, String info, Date sentTime, Date startTime, Date finishTime) { diff --git a/service/dps/storm/examples/src/main/java/eu/europeana/cloud/service/dps/ic/topology/StaticICTopology.java b/service/dps/storm/examples/src/main/java/eu/europeana/cloud/service/dps/ic/topology/StaticICTopology.java index 0e506e386f..b2ea83f7dc 100644 --- a/service/dps/storm/examples/src/main/java/eu/europeana/cloud/service/dps/ic/topology/StaticICTopology.java +++ b/service/dps/storm/examples/src/main/java/eu/europeana/cloud/service/dps/ic/topology/StaticICTopology.java @@ -62,7 +62,7 @@ public static void main(String[] args) throws Exception { "imageConversionBolt"); builder.setBolt("notificationBolt", new NotificationBolt("iks-kbase.synat.pcss.pl", 9042, "ecloud_dps", - "cassandra", "cassandra", true), + "cassandra", "cassandra"), 1) .fieldsGrouping("retrieveFileBolt", AbstractDpsBolt.NOTIFICATION_STREAM_NAME, new Fields(NotificationTuple.taskIdFieldName)) .fieldsGrouping("imageConversionBolt", AbstractDpsBolt.NOTIFICATION_STREAM_NAME, new Fields(NotificationTuple.taskIdFieldName)) diff --git a/service/dps/storm/topologies/ic/ic-topology/src/main/java/eu/europeana/cloud/service/dps/storm/topologies/ic/topology/ICTopology.java b/service/dps/storm/topologies/ic/ic-topology/src/main/java/eu/europeana/cloud/service/dps/storm/topologies/ic/topology/ICTopology.java index e259c2cc37..1e8fd1d2e3 100644 --- a/service/dps/storm/topologies/ic/ic-topology/src/main/java/eu/europeana/cloud/service/dps/storm/topologies/ic/topology/ICTopology.java +++ b/service/dps/storm/topologies/ic/ic-topology/src/main/java/eu/europeana/cloud/service/dps/storm/topologies/ic/topology/ICTopology.java @@ -121,7 +121,7 @@ public StormTopology buildTopology(String icTopic, String ecloudMcsAddress) { Integer.parseInt(topologyProperties.getProperty(TopologyPropertyKeys.CASSANDRA_PORT)), topologyProperties.getProperty(TopologyPropertyKeys.CASSANDRA_KEYSPACE_NAME), topologyProperties.getProperty(TopologyPropertyKeys.CASSANDRA_USERNAME), - topologyProperties.getProperty(TopologyPropertyKeys.CASSANDRA_PASSWORD), true), + topologyProperties.getProperty(TopologyPropertyKeys.CASSANDRA_PASSWORD)), Integer.parseInt(topologyProperties.getProperty(TopologyPropertyKeys.NOTIFICATION_BOLT_PARALLEL))) .setNumTasks( ((int) Integer.parseInt(topologyProperties.getProperty(TopologyPropertyKeys.NUMBER_OF_TASKS)))) diff --git a/service/dps/storm/topologies/ic/ic-topology/src/test/java/eu/europeana/cloud/ICTopologyTest.java b/service/dps/storm/topologies/ic/ic-topology/src/test/java/eu/europeana/cloud/ICTopologyTest.java index 0bb9f044cd..ae02d5e814 100644 --- a/service/dps/storm/topologies/ic/ic-topology/src/test/java/eu/europeana/cloud/ICTopologyTest.java +++ b/service/dps/storm/topologies/ic/ic-topology/src/test/java/eu/europeana/cloud/ICTopologyTest.java @@ -49,7 +49,6 @@ @RunWith(PowerMockRunner.class) @PrepareForTest({ReadFileBolt.class, ReadDatasetsBolt.class, ReadRepresentationBolt.class, ReadDatasetBolt.class, IcBolt.class, WriteRecordBolt.class, NotificationBolt.class}) - @PowerMockIgnore({"javax.management.*", "javax.security.*"}) public class ICTopologyTest extends ICTestMocksHelper implements TestConstantsHelper { diff --git a/service/dps/storm/topologies/xslt/src/main/java/eu/europeana/cloud/service/dps/storm/topologies/xslt/XSLTTopology.java b/service/dps/storm/topologies/xslt/src/main/java/eu/europeana/cloud/service/dps/storm/topologies/xslt/XSLTTopology.java index 89352b4e23..defa53d596 100644 --- a/service/dps/storm/topologies/xslt/src/main/java/eu/europeana/cloud/service/dps/storm/topologies/xslt/XSLTTopology.java +++ b/service/dps/storm/topologies/xslt/src/main/java/eu/europeana/cloud/service/dps/storm/topologies/xslt/XSLTTopology.java @@ -128,7 +128,7 @@ public StormTopology buildTopology(String xsltTopic, String ecloudMcsAddress) { Integer.parseInt(topologyProperties.getProperty(TopologyPropertyKeys.CASSANDRA_PORT)), topologyProperties.getProperty(TopologyPropertyKeys.CASSANDRA_KEYSPACE_NAME), topologyProperties.getProperty(TopologyPropertyKeys.CASSANDRA_USERNAME), - topologyProperties.getProperty(TopologyPropertyKeys.CASSANDRA_PASSWORD), true), + topologyProperties.getProperty(TopologyPropertyKeys.CASSANDRA_PASSWORD)), Integer.parseInt(topologyProperties.getProperty(TopologyPropertyKeys.NOTIFICATION_BOLT_PARALLEL))) .setNumTasks( ((int) Integer.parseInt(topologyProperties.getProperty(TopologyPropertyKeys.NUMBER_OF_TASKS))))