Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

solr reindexing through events #873

Open
wants to merge 12 commits into
base: dtq-dev
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@
import org.dspace.authorize.dao.ResourcePolicyDAO;
import org.dspace.authorize.service.ResourcePolicyService;
import org.dspace.content.DSpaceObject;
import org.dspace.content.Item;
import org.dspace.content.factory.ContentServiceFactory;
import org.dspace.core.Constants;
import org.dspace.core.Context;
import org.dspace.core.ProvenanceService;
import org.dspace.eperson.EPerson;
import org.dspace.eperson.Group;
import org.dspace.eperson.service.GroupService;
import org.dspace.event.Event;
import org.springframework.beans.factory.annotation.Autowired;

/**
Expand Down Expand Up @@ -55,8 +57,6 @@ public class ResourcePolicyServiceImpl implements ResourcePolicyService {
@Autowired
ProvenanceService provenanceService;

@Autowired
ResourcePolicyService resourcePolicyService;

protected ResourcePolicyServiceImpl() {
}
Expand Down Expand Up @@ -146,6 +146,7 @@ public List<ResourcePolicy> findByTypeGroupActionExceptId(Context context, DSpac
public void delete(Context context, ResourcePolicy resourcePolicy) throws SQLException, AuthorizeException {
// FIXME: authorizations
// Remove ourself
DSpaceObject dso = resourcePolicy.getdSpaceObject();
resourcePolicyDAO.delete(context, resourcePolicy);

context.turnOffAuthorisationSystem();
Expand All @@ -155,6 +156,10 @@ public void delete(Context context, ResourcePolicy resourcePolicy) throws SQLExc
.updateLastModified(context, resourcePolicy.getdSpaceObject());
}
context.restoreAuthSystemState();
if (dso instanceof Item) {
Item item = (Item) dso;
context.addEvent(new Event(Event.MODIFY, -1, null, Constants.ITEM, item.getID(), ""));
}
}


Expand Down Expand Up @@ -220,6 +225,11 @@ public ResourcePolicy clone(Context context, ResourcePolicy resourcePolicy)
clone.setRpType((String) ObjectUtils.clone(resourcePolicy.getRpType()));
clone.setRpDescription((String) ObjectUtils.clone(resourcePolicy.getRpDescription()));
update(context, clone);
DSpaceObject dso = resourcePolicy.getdSpaceObject();
if (dso instanceof Item) {
Item item = (Item) dso;
context.addEvent(new Event(Event.MODIFY, -1, null, Constants.ITEM, item.getID(), ""));
}
return clone;
}

Expand All @@ -229,6 +239,10 @@ public void removeAllPolicies(Context c, DSpaceObject o) throws SQLException, Au
c.turnOffAuthorisationSystem();
contentServiceFactory.getDSpaceObjectService(o).updateLastModified(c, o);
c.restoreAuthSystemState();
if (o instanceof Item) {
Item item = (Item) o;
c.addEvent(new Event(Event.MODIFY, -1, null, Constants.ITEM, item.getID(), ""));
}
}

@Override
Expand All @@ -237,20 +251,26 @@ public void removePolicies(Context c, DSpaceObject o, String type) throws SQLExc
c.turnOffAuthorisationSystem();
contentServiceFactory.getDSpaceObjectService(o).updateLastModified(c, o);
c.restoreAuthSystemState();
if (o instanceof Item) {
Item item = (Item) o;
c.addEvent(new Event(Event.MODIFY, -1, null, Constants.ITEM, item.getID(), ""));
}
}

@Override
public void removePolicies(Context c, DSpaceObject o, String type, int action)
throws SQLException, AuthorizeException {
// Get all read policies of the dso before removing them
List<ResourcePolicy> resPolicies = resourcePolicyService.find(c, o, type);

List<ResourcePolicy> resPolicies = find(c, o, type);
resourcePolicyDAO.deleteByDsoAndTypeAndAction(c, o, type, action);
c.turnOffAuthorisationSystem();
contentServiceFactory.getDSpaceObjectService(o).updateLastModified(c, o);
c.restoreAuthSystemState();

provenanceService.removeReadPolicies(c, o, resPolicies);
if (o instanceof Item) {
Item item = (Item) o;
c.addEvent(new Event(Event.MODIFY, -1, null, Constants.ITEM, item.getID(), ""));
}
}

@Override
Expand All @@ -260,6 +280,10 @@ public void removeDsoGroupPolicies(Context context, DSpaceObject dso, Group grou
context.turnOffAuthorisationSystem();
contentServiceFactory.getDSpaceObjectService(dso).updateLastModified(context, dso);
context.restoreAuthSystemState();
if (dso instanceof Item) {
Item item = (Item) dso;
context.addEvent(new Event(Event.MODIFY, -1, null, Constants.ITEM, item.getID(), ""));
}
}

@Override
Expand All @@ -279,6 +303,13 @@ public void removeAllEPersonPolicies(Context context, EPerson ePerson) throws SQ

@Override
public void removeGroupPolicies(Context c, Group group) throws SQLException {
List<ResourcePolicy> resourcePolicies = find(c, group);
for (ResourcePolicy r : resourcePolicies) {
if (r.getdSpaceObject() instanceof Item) {
Item item = (Item) r.getdSpaceObject();
c.addEvent(new Event(Event.MODIFY, -1, null, Constants.ITEM, item.getID(), ""));
}
}
resourcePolicyDAO.deleteByGroup(c, group);
}

Expand All @@ -291,6 +322,10 @@ public void removePolicies(Context c, DSpaceObject o, int actionId) throws SQLEx
c.turnOffAuthorisationSystem();
contentServiceFactory.getDSpaceObjectService(o).updateLastModified(c, o);
c.restoreAuthSystemState();
if (o instanceof Item) {
Item item = (Item) o;
c.addEvent(new Event(Event.MODIFY, -1, null, Constants.ITEM, item.getID(), ""));
}
}
}

Expand All @@ -301,6 +336,10 @@ public void removeDsoAndTypeNotEqualsToPolicies(Context c, DSpaceObject o, Strin
c.turnOffAuthorisationSystem();
contentServiceFactory.getDSpaceObjectService(o).updateLastModified(c, o);
c.restoreAuthSystemState();
if (o instanceof Item) {
Item item = (Item) o;
c.addEvent(new Event(Event.MODIFY, -1, null, Constants.ITEM, item.getID(), ""));
}
}


Expand Down Expand Up @@ -332,6 +371,10 @@ public void update(Context context, List<ResourcePolicy> resourcePolicies) throw

// FIXME: Check authorisation
resourcePolicyDAO.save(context, resourcePolicy);
if (resourcePolicy.getdSpaceObject() instanceof Item) {
Item item = (Item) resourcePolicy.getdSpaceObject();
context.addEvent(new Event(Event.MODIFY, -1, null, Constants.ITEM, item.getID(), ""));
}
}

//Update the last modified timestamp of all related DSpace Objects
Expand Down
158 changes: 158 additions & 0 deletions dspace-oai/src/main/java/org/dspace/event/OAIIndexEventConsumer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
/**
* The contents of this file are subject to the license and copyright
* detailed in the LICENSE and NOTICE files at the root of the source
* tree and available online at
*
* http://www.dspace.org/license/
*/
package org.dspace.event;

import java.sql.SQLException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;

import org.apache.log4j.Logger;
import org.dspace.content.Bitstream;
import org.dspace.content.Bundle;
import org.dspace.content.Collection;
import org.dspace.content.Community;
import org.dspace.content.DSpaceObject;
import org.dspace.content.Item;
import org.dspace.content.factory.ContentServiceFactory;
import org.dspace.content.service.ItemService;
import org.dspace.core.Constants;
import org.dspace.core.Context;
import org.dspace.xoai.app.BasicConfiguration;
import org.dspace.xoai.app.XOAI;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;

public class OAIIndexEventConsumer implements Consumer {
/**
* log4j logger
*/
private static Logger log = Logger.getLogger(OAIIndexEventConsumer.class);

ItemService itemService = ContentServiceFactory.getInstance().getItemService();
// collect Items, Collections, Communities that need indexing
private Set<Item> itemsToUpdate = null;

public void initialize() throws Exception {
// No-op
}

/**
* Consume a content event -- just build the sets of objects to add (new) to
* the index, update, and delete.
*
* @param ctx DSpace context
* @param event Content event
*/
public void consume(Context ctx, Event event) throws Exception {

if (itemsToUpdate == null) {
itemsToUpdate = new HashSet<Item>();
}

int st = event.getSubjectType();
if (!(st == Constants.ITEM || st == Constants.BUNDLE
|| st == Constants.COLLECTION || st == Constants.COMMUNITY || st == Constants.BITSTREAM)) {
log
.warn("IndexConsumer should not have been given this kind of Subject in an event, skipping: "
+ event.toString());
return;
}

DSpaceObject subject = event.getSubject(ctx);
DSpaceObject object = event.getObject(ctx);


int et = event.getEventType();

if (object != null && event.getObjectType() == Constants.ITEM) {
//update just object
itemsToUpdate.add((Item)object);
return;
}

if (subject != null) {
if (event.getSubjectType() == Constants.COLLECTION || event.getSubjectType() == Constants.COMMUNITY) {
if (et == Event.MODIFY || et == Event.MODIFY_METADATA || et == Event.REMOVE || et == Event.DELETE) {
//must update all the items
if (subject.getType() == Constants.COMMUNITY) {
for (Collection col : ((Community)subject).getCollections()) {
addAll(ctx, col);
}
} else {
addAll(ctx, (Collection)subject);
}
}
} else if (event.getSubjectType() == Constants.BITSTREAM || event.getSubjectType() == Constants.BUNDLE) {
//must update owning items regardless the event
if (subject.getType() == Constants.BITSTREAM) {
for (Bundle bun : ((Bitstream)subject).getBundles()) {
itemsToUpdate.addAll(bun.getItems());
}
} else {
itemsToUpdate.addAll(((Bundle)subject).getItems());
}
} else if (event.getSubjectType() == Constants.ITEM) {
//any event reindex this item
itemsToUpdate.add((Item)subject);
}
}
}

private void addAll(Context context, Collection col) throws SQLException {
Iterator<Item> i = itemService.findByCollection(context, col);
while (i.hasNext()) {
itemsToUpdate.add(i.next());
}
}

/**
* Process sets of objects to add, update, and delete in index. Correct for
* interactions between the sets -- e.g. objects which were deleted do not
* need to be added or updated, new objects don't also need an update, etc.
*/
public void end(Context ctx) throws Exception {

Context anonymousContext = null;
try {
if (itemsToUpdate != null) {

Set<Item> filtered = new HashSet<Item>(itemsToUpdate.size());
for (Item item : itemsToUpdate) {
if (item.getHandle() == null) {
// probably submission item, skip
continue;
}
filtered.add(item);
}

// "free" the resources
itemsToUpdate = null;

anonymousContext = new Context();
XOAI indexer = new XOAI(anonymousContext, false, false);
AnnotationConfigApplicationContext applicationContext = new AnnotationConfigApplicationContext(
new Class[] { BasicConfiguration.class });
applicationContext.getAutowireCapableBeanFactory()
.autowireBean(indexer);
indexer.indexItems(filtered);
applicationContext.close();
}
} catch (Exception e) {
itemsToUpdate = null;
throw e;
} finally {
if (anonymousContext != null) {
anonymousContext.complete();
}
}
}

public void finish(Context ctx) throws Exception {
// No-op
}
}
53 changes: 53 additions & 0 deletions dspace-oai/src/main/java/org/dspace/xoai/app/XOAI.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Options;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.solr.client.solrj.SolrClient;
Expand Down Expand Up @@ -83,6 +84,9 @@
public class XOAI {
private static Logger log = LogManager.getLogger(XOAI.class);

private final XOAICacheService cacheService;
private final XOAIItemCacheService itemCacheService;

// needed because the solr query only returns 10 rows by default
private final Context context;
private final boolean verbose;
Expand All @@ -105,6 +109,14 @@ public class XOAI {

private List<XOAIExtensionItemCompilePlugin> extensionPlugins;

{
AnnotationConfigApplicationContext applicationContext =
new AnnotationConfigApplicationContext(BasicConfiguration.class);
cacheService = applicationContext.getBean(XOAICacheService.class);
itemCacheService = applicationContext.getBean(XOAIItemCacheService.class);

}

private List<String> getFileFormats(Item item) {
List<String> formats = new ArrayList<>();
try {
Expand Down Expand Up @@ -719,4 +731,45 @@ private static void usage() {
}
}

private boolean isTest() {
try {
if (StringUtils.equals("jdbc:h2:mem:test", this.context.getDBConfig().getDatabaseUrl())) {
return true;
}
} catch (SQLException exception) {
return false;
}

return false;
}

/**
* Delete the item from Solr by the ID of the item
*/
private void deleteItemByQuery(Item item) throws SolrServerException, IOException {
SolrClient solrClient = solrServerResolver.getServer();
solrClient.deleteByQuery("item.id:" + item.getID().toString());
solrClient.commit();
}

public void indexItems(java.util.Collection<Item> items) throws Exception {
for (Item item : items) {
try {
deleteItemByQuery(item);
solrServerResolver.getServer().add(this.index(item));
} catch (IOException | XMLStreamException | SQLException | WritingXmlException | SolrServerException e) {
// Do not throw RuntimeException in tests
if (this.isTest()) {
log.error("Cannot reindex the item with ID: " + item.getID() + " because: " + e.getMessage());
} else {
log.error("Cannot reindex the item with ID: " + item.getID() + " because: " + e.getMessage());
throw new RuntimeException("Cannot reindex the item with ID: " + item.getID() + " because: "
+ e.getMessage());
}
}
}
solrServerResolver.getServer().commit();
cacheService.deleteAll();
itemCacheService.deleteAll();
}
}
Loading
Loading