Skip to content

Commit

Permalink
MET-2239 file metadata (mimeType and size) will be sent to s3 also
Browse files Browse the repository at this point in the history
  • Loading branch information
pWoz committed Oct 9, 2019
1 parent 1ae3c54 commit 9bdf72d
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.ObjectMetadata;
import com.google.gson.Gson;
import eu.europeana.cloud.service.dps.PluginParameterKeys;
import eu.europeana.cloud.service.dps.storm.AbstractDpsBolt;
Expand All @@ -17,6 +18,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;
import java.util.Date;
import java.util.List;
Expand Down Expand Up @@ -66,10 +68,9 @@ public void execute(StormTaskTuple stormTaskTuple) {
if (taskStatusChecker.hasKillFlag(stormTaskTuple.getTaskId()))
break;
try (InputStream stream = thumbnail.getContentStream()) {
amazonClient.putObject(awsBucket, thumbnail.getTargetName(), stream, null);
LOGGER.info("The thumbnail {} was uploaded successfully to S3 in Bluemix", thumbnail.getTargetName());
amazonClient.putObject(awsBucket, thumbnail.getTargetName(), stream, prepareObjectMetadata(thumbnail));
} catch (Exception e) {
String errorMessage = "Error while uploading " + thumbnail.getTargetName() + " to S3 in Bluemix. The full error message is: " + e.getMessage()+" because of: "+e.getCause();
String errorMessage = "Error while uploading " + thumbnail.getTargetName() + " to S3 in Bluemix. The full error message is: " + e.getMessage() + " because of: " + e.getCause();
LOGGER.error(errorMessage);
buildErrorMessage(exception, errorMessage);

Expand All @@ -81,7 +82,7 @@ public void execute(StormTaskTuple stormTaskTuple) {
}
} catch (Exception e) {
LOGGER.error("Exception while processing the resource {}. The full error is:{} ", stormTaskTuple.getParameter(PluginParameterKeys.RESOURCE_URL), ExceptionUtils.getStackTrace(e));
buildErrorMessage(exception, "Exception while processing the resource: " + stormTaskTuple.getParameter(PluginParameterKeys.RESOURCE_URL) + ". The full error is: " + e.getMessage()+" because of: "+e.getCause());
buildErrorMessage(exception, "Exception while processing the resource: " + stormTaskTuple.getParameter(PluginParameterKeys.RESOURCE_URL) + ". The full error is: " + e.getMessage() + " because of: " + e.getCause());
} finally {
stormTaskTuple.getParameters().remove(PluginParameterKeys.RESOURCE_LINK_KEY);
if (exception.length() > 0) {
Expand Down Expand Up @@ -126,6 +127,13 @@ private void initAmazonClient() {
}
}

private ObjectMetadata prepareObjectMetadata(Thumbnail thumbnail) throws IOException {
final ObjectMetadata objectMetadata = new ObjectMetadata();
objectMetadata.setContentType(thumbnail.getMimeType());
objectMetadata.setContentLength(thumbnail.getContentSize());
return objectMetadata;
}

private void buildErrorMessage(StringBuilder message, String newMessage) {
LOGGER.error("Error while processing {}", newMessage);
if (message.toString().isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public void shouldSuccessfullyProcessTheResource() throws Exception {
when(taskStatusChecker.hasKillFlag(eq(TASK_ID))).thenReturn(false);
resourceProcessingBolt.execute(stormTaskTuple);

verify(amazonClient, Mockito.times(thumbnailCount)).putObject(eq(AWS_BUCKET), anyString(), any(InputStream.class), isNull(ObjectMetadata.class));
verify(amazonClient, Mockito.times(thumbnailCount)).putObject(eq(AWS_BUCKET), anyString(), any(InputStream.class), any(ObjectMetadata.class));
verify(outputCollector, Mockito.times(1)).emit(captor.capture());
Values value = captor.getValue();
Map<String, String> parameters = (Map) value.get(4);
Expand Down Expand Up @@ -134,7 +134,7 @@ public void shouldDropTheTaskAndStopProcessing() throws Exception {
when(taskStatusChecker.hasKillFlag(eq(TASK_ID))).thenReturn(false).thenReturn(true);

resourceProcessingBolt.execute(stormTaskTuple);
verify(amazonClient, Mockito.times(1)).putObject(eq(AWS_BUCKET), anyString(), any(InputStream.class), isNull(ObjectMetadata.class));
verify(amazonClient, Mockito.times(1)).putObject(eq(AWS_BUCKET), anyString(), any(InputStream.class), any(ObjectMetadata.class));
}


Expand All @@ -152,10 +152,10 @@ public void shouldFormulateTheAggregateExceptionsWhenSavingToAmazonFails() throw
String errorMessage = "The error was thrown because of something";

when(mediaExtractor.performMediaExtraction(any(RdfResourceEntry.class))).thenReturn(resourceExtractionResult);
doThrow(new AmazonServiceException(errorMessage)).when(amazonClient).putObject(eq(AWS_BUCKET), anyString(), any(InputStream.class), isNull(ObjectMetadata.class));
doThrow(new AmazonServiceException(errorMessage)).when(amazonClient).putObject(eq(AWS_BUCKET), anyString(), any(InputStream.class), any(ObjectMetadata.class));
resourceProcessingBolt.execute(stormTaskTuple);

verify(amazonClient, Mockito.times(3)).putObject(eq(AWS_BUCKET), anyString(), any(InputStream.class), isNull(ObjectMetadata.class));
verify(amazonClient, Mockito.times(3)).putObject(eq(AWS_BUCKET), anyString(), any(InputStream.class), any(ObjectMetadata.class));
verify(outputCollector, Mockito.times(1)).emit(captor.capture());
Values value = captor.getValue();
Map<String, String> parameters = (Map) value.get(4);
Expand Down

0 comments on commit 9bdf72d

Please sign in to comment.