Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

solr reindexing through events #873

Open
wants to merge 12 commits into
base: dtq-dev
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@
import org.dspace.authorize.dao.ResourcePolicyDAO;
import org.dspace.authorize.service.ResourcePolicyService;
import org.dspace.content.DSpaceObject;
import org.dspace.content.Item;
import org.dspace.content.factory.ContentServiceFactory;
import org.dspace.core.Constants;
import org.dspace.core.Context;
import org.dspace.core.ProvenanceService;
import org.dspace.eperson.EPerson;
import org.dspace.eperson.Group;
import org.dspace.eperson.service.GroupService;
import org.dspace.event.Event;
import org.springframework.beans.factory.annotation.Autowired;

/**
Expand Down Expand Up @@ -55,8 +57,6 @@ public class ResourcePolicyServiceImpl implements ResourcePolicyService {
@Autowired
ProvenanceService provenanceService;

@Autowired
ResourcePolicyService resourcePolicyService;

protected ResourcePolicyServiceImpl() {
}
Expand Down Expand Up @@ -154,6 +154,7 @@ public List<ResourcePolicy> findByTypeGroupActionExceptId(Context context, DSpac
public void delete(Context context, ResourcePolicy resourcePolicy) throws SQLException, AuthorizeException {
// FIXME: authorizations
// Remove ourself
DSpaceObject dso = resourcePolicy.getdSpaceObject();
resourcePolicyDAO.delete(context, resourcePolicy);

context.turnOffAuthorisationSystem();
Expand All @@ -163,6 +164,7 @@ public void delete(Context context, ResourcePolicy resourcePolicy) throws SQLExc
.updateLastModified(context, resourcePolicy.getdSpaceObject());
}
context.restoreAuthSystemState();
addEventModify(context, dso);
}


Expand Down Expand Up @@ -226,6 +228,7 @@ public ResourcePolicy clone(Context context, ResourcePolicy resourcePolicy)
clone.setRpType((String) ObjectUtils.clone(resourcePolicy.getRpType()));
clone.setRpDescription((String) ObjectUtils.clone(resourcePolicy.getRpDescription()));
update(context, clone);
DSpaceObject dso = resourcePolicy.getdSpaceObject();
return clone;
}

Expand All @@ -235,6 +238,7 @@ public void removeAllPolicies(Context c, DSpaceObject o) throws SQLException, Au
c.turnOffAuthorisationSystem();
contentServiceFactory.getDSpaceObjectService(o).updateLastModified(c, o);
c.restoreAuthSystemState();
addEventModify(c, o);
}

@Override
Expand All @@ -243,20 +247,20 @@ public void removePolicies(Context c, DSpaceObject o, String type) throws SQLExc
c.turnOffAuthorisationSystem();
contentServiceFactory.getDSpaceObjectService(o).updateLastModified(c, o);
c.restoreAuthSystemState();
addEventModify(c, o);
}

@Override
public void removePolicies(Context c, DSpaceObject o, String type, int action)
throws SQLException, AuthorizeException {
// Get all read policies of the dso before removing them
List<ResourcePolicy> resPolicies = resourcePolicyService.find(c, o, type);

List<ResourcePolicy> resPolicies = find(c, o, type);
resourcePolicyDAO.deleteByDsoAndTypeAndAction(c, o, type, action);
c.turnOffAuthorisationSystem();
contentServiceFactory.getDSpaceObjectService(o).updateLastModified(c, o);
c.restoreAuthSystemState();

provenanceService.removeReadPolicies(c, o, resPolicies);
addEventModify(c, o);
}

@Override
Expand All @@ -266,6 +270,7 @@ public void removeDsoGroupPolicies(Context context, DSpaceObject dso, Group grou
context.turnOffAuthorisationSystem();
contentServiceFactory.getDSpaceObjectService(dso).updateLastModified(context, dso);
context.restoreAuthSystemState();
addEventModify(context, dso);
}

@Override
Expand All @@ -285,6 +290,10 @@ public void removeAllEPersonPolicies(Context context, EPerson ePerson) throws SQ

@Override
public void removeGroupPolicies(Context c, Group group) throws SQLException {
List<ResourcePolicy> resourcePolicies = find(c, group);
for (ResourcePolicy r : resourcePolicies) {
addEventModify(c, r.getdSpaceObject());
}
resourcePolicyDAO.deleteByGroup(c, group);
}

Expand All @@ -297,6 +306,7 @@ public void removePolicies(Context c, DSpaceObject o, int actionId) throws SQLEx
c.turnOffAuthorisationSystem();
contentServiceFactory.getDSpaceObjectService(o).updateLastModified(c, o);
c.restoreAuthSystemState();
addEventModify(c, o);
}
}

Expand All @@ -307,6 +317,7 @@ public void removeDsoAndTypeNotEqualsToPolicies(Context c, DSpaceObject o, Strin
c.turnOffAuthorisationSystem();
contentServiceFactory.getDSpaceObjectService(o).updateLastModified(c, o);
c.restoreAuthSystemState();
addEventModify(c, o);
}


Expand Down Expand Up @@ -338,6 +349,7 @@ public void update(Context context, List<ResourcePolicy> resourcePolicies) throw

// FIXME: Check authorisation
resourcePolicyDAO.save(context, resourcePolicy);
addEventModify(context, resourcePolicy.getdSpaceObject());
}

//Update the last modified timestamp of all related DSpace Objects
Expand Down Expand Up @@ -436,4 +448,12 @@ public boolean isMyResourcePolicy(Context context, EPerson eperson, Integer id)
}
return isMy;
}

public void addEventModify(Context context, DSpaceObject dso) {
if (dso instanceof Item) {
Item item = (Item) dso;
context.addEvent(new Event(Event.MODIFY, -1, null,
Constants.ITEM, item.getID(), ""));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -311,4 +311,11 @@ public List<ResourcePolicy> findByGroupAndResourceUuid(Context context, Group gr
*/
public boolean isMyResourcePolicy(Context context, EPerson eperson, Integer id) throws SQLException;

/**
* Add MODIFY event when the provided DSpaceObject is an Item.
*
* @param context The DSpace context.
* @param dso The DSpaceObject.
*/
public void addEventModify(Context context, DSpaceObject dso);
}
168 changes: 168 additions & 0 deletions dspace-oai/src/main/java/org/dspace/event/OAIIndexEventConsumer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/**
* The contents of this file are subject to the license and copyright
* detailed in the LICENSE and NOTICE files at the root of the source
* tree and available online at
*
* http://www.dspace.org/license/
*/
package org.dspace.event;

import java.sql.SQLException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Objects;
import java.util.Set;

import org.apache.log4j.Logger;
import org.dspace.content.Bitstream;
import org.dspace.content.Bundle;
import org.dspace.content.Collection;
import org.dspace.content.Community;
import org.dspace.content.DSpaceObject;
import org.dspace.content.Item;
import org.dspace.content.factory.ContentServiceFactory;
import org.dspace.content.service.ItemService;
import org.dspace.core.Constants;
import org.dspace.core.Context;
import org.dspace.xoai.app.BasicConfiguration;
import org.dspace.xoai.app.XOAI;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;

/**
* The OAIIndexEventConsumer determining which items need to be indexed or updated based on the event type and subject.
* It listens for changes to items, collections, communities,
* bundles, and bitstreams, and updates the OAI index accordingly.
* The indexing is done using the XOAI indexer after all relevant items are collected.
*/
public class OAIIndexEventConsumer implements Consumer {
/**
* log4j logger
*/
private static Logger log = Logger.getLogger(OAIIndexEventConsumer.class);

ItemService itemService = ContentServiceFactory.getInstance().getItemService();

// collect Items, Collections, Communities that need indexing
private Set<Item> itemsToUpdate = null;

public void initialize() throws Exception {
// No-op
}

/**
* Consume a content event -- just build the sets of objects to add (new) to
* the index, update, and delete.
*
* @param ctx DSpace context
* @param event Content event
*/
public void consume(Context ctx, Event event) throws Exception {

if (Objects.isNull(itemsToUpdate)) {
itemsToUpdate = new HashSet<Item>();
}

int st = event.getSubjectType();
if (!(st == Constants.ITEM || st == Constants.BUNDLE
|| st == Constants.COLLECTION || st == Constants.COMMUNITY || st == Constants.BITSTREAM)) {
log
.warn("IndexConsumer should not have been given this kind of Subject in an event, skipping: "
+ event.toString());
return;
}

DSpaceObject subject = event.getSubject(ctx);
DSpaceObject object = event.getObject(ctx);

int et = event.getEventType();

if (Objects.nonNull(object) && event.getObjectType() == Constants.ITEM) {
//just update the object
itemsToUpdate.add((Item)object);
return;
}

if (Objects.isNull(subject)) {
return;
}

if (event.getSubjectType() == Constants.COLLECTION || event.getSubjectType() == Constants.COMMUNITY) {
if (et == Event.MODIFY || et == Event.MODIFY_METADATA || et == Event.REMOVE || et == Event.DELETE) {
//must update all the items
if (subject.getType() == Constants.COMMUNITY) {
for (Collection col : ((Community)subject).getCollections()) {
addAll(ctx, col);
}
} else {
addAll(ctx, (Collection)subject);
}
}
} else if (event.getSubjectType() == Constants.BITSTREAM || event.getSubjectType() == Constants.BUNDLE) {
//must update owning items regardless the event
if (subject.getType() == Constants.BITSTREAM) {
for (Bundle bun : ((Bitstream)subject).getBundles()) {
itemsToUpdate.addAll(bun.getItems());
}
} else {
itemsToUpdate.addAll(((Bundle)subject).getItems());
}
} else if (event.getSubjectType() == Constants.ITEM) {
//any event reindex this item
itemsToUpdate.add((Item)subject);
}
}

private void addAll(Context context, Collection col) throws SQLException {
Iterator<Item> i = itemService.findByCollection(context, col);
while (i.hasNext()) {
itemsToUpdate.add(i.next());
}
}

/**
* Process sets of objects to add, update, and delete in index. Correct for
* interactions between the sets -- e.g. objects which were deleted do not
* need to be added or updated, new objects don't also need an update, etc.
*/
public void end(Context ctx) throws Exception {

Context anonymousContext = null;
try {
if (Objects.isNull(itemsToUpdate)) {
return;
}

Set<Item> filtered = new HashSet<Item>(itemsToUpdate.size());
for (Item item : itemsToUpdate) {
if (Objects.isNull(item.getHandle())) {
// probably submission item, skip
continue;
}
filtered.add(item);
}

// "free" the resources
itemsToUpdate = null;

anonymousContext = new Context();
XOAI indexer = new XOAI(anonymousContext, false, false);
AnnotationConfigApplicationContext applicationContext = new AnnotationConfigApplicationContext(
new Class[] { BasicConfiguration.class });
applicationContext.getAutowireCapableBeanFactory()
.autowireBean(indexer);
indexer.indexItems(filtered);
applicationContext.close();
} catch (Exception e) {
itemsToUpdate = null;
throw e;
} finally {
if (Objects.nonNull(anonymousContext)) {
anonymousContext.complete();
}
}
}

public void finish(Context ctx) throws Exception {
// No-op
}
}
34 changes: 34 additions & 0 deletions dspace-oai/src/main/java/org/dspace/xoai/app/XOAI.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@
public class XOAI {
private static Logger log = LogManager.getLogger(XOAI.class);

private final XOAICacheService cacheService;
private final XOAIItemCacheService itemCacheService;

// needed because the solr query only returns 10 rows by default
private final Context context;
private final boolean verbose;
Expand All @@ -105,6 +108,13 @@ public class XOAI {

private List<XOAIExtensionItemCompilePlugin> extensionPlugins;

{
AnnotationConfigApplicationContext applicationContext =
new AnnotationConfigApplicationContext(BasicConfiguration.class);
cacheService = applicationContext.getBean(XOAICacheService.class);
itemCacheService = applicationContext.getBean(XOAIItemCacheService.class);
}

private List<String> getFileFormats(Item item) {
List<String> formats = new ArrayList<>();
try {
Expand Down Expand Up @@ -719,4 +729,28 @@ private static void usage() {
}
}

/**
* Delete the item from Solr by the ID of the item
*/
private void deleteItemByQuery(Item item) throws SolrServerException, IOException {
SolrClient solrClient = solrServerResolver.getServer();
solrClient.deleteByQuery("item.id:" + item.getID().toString());
solrClient.commit();
}

public void indexItems(java.util.Collection<Item> items) throws Exception {
for (Item item : items) {
try {
deleteItemByQuery(item);
solrServerResolver.getServer().add(this.index(item));
} catch (IOException | XMLStreamException | SQLException | WritingXmlException | SolrServerException e) {
log.error("Cannot reindex the item with ID: " + item.getID() + " because: " + e.getMessage());
throw new RuntimeException("Cannot reindex the item with ID: " + item.getID() + " because: "
+ e.getMessage());
}
}
solrServerResolver.getServer().commit();
cacheService.deleteAll();
itemCacheService.deleteAll();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
import org.dspace.app.rest.model.DSpaceObjectRest;
import org.dspace.app.rest.model.patch.Patch;
import org.dspace.app.rest.repository.patch.ResourcePatch;
import org.dspace.app.rest.utils.SolrOAIReindexer;
import org.dspace.authorize.AuthorizeException;
import org.dspace.content.DSpaceObject;
import org.dspace.content.service.DSpaceObjectService;
Expand All @@ -39,9 +38,6 @@ public abstract class DSpaceObjectRestRepository<M extends DSpaceObject, R exten
@Autowired
MetadataConverter metadataConverter;

@Autowired
private SolrOAIReindexer solrOAIReindexer;

DSpaceObjectRestRepository(DSpaceObjectService<M> dsoService) {
this.dsoService = dsoService;
}
Expand All @@ -67,7 +63,6 @@ protected void patchDSpaceObject(String apiCategory, String model, UUID id, Patc
}
resourcePatch.patch(obtainContext(), dso, patch.getOperations());
dsoService.update(obtainContext(), dso);
solrOAIReindexer.reindexItem(dso);
}

@Override
Expand Down
Loading