Skip to content

Commit

Permalink
Allow duplicate stream titles in route_to_stream (#154)
Browse files Browse the repository at this point in the history
Cache stream ids and titles to avoid heavy database traffic during function evaluation

Fixes #101
  • Loading branch information
kroepke authored and joschi committed Jan 10, 2017
1 parent 3aff8c3 commit 7c7482a
Show file tree
Hide file tree
Showing 6 changed files with 223 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.graylog.plugins.pipelineprocessor.functions;

import com.google.inject.Binder;
import com.google.inject.Scopes;
import com.google.inject.TypeLiteral;
import com.google.inject.multibindings.MapBinder;

Expand Down Expand Up @@ -60,6 +61,7 @@
import org.graylog.plugins.pipelineprocessor.functions.messages.RouteToStream;
import org.graylog.plugins.pipelineprocessor.functions.messages.SetField;
import org.graylog.plugins.pipelineprocessor.functions.messages.SetFields;
import org.graylog.plugins.pipelineprocessor.functions.messages.StreamCacheService;
import org.graylog.plugins.pipelineprocessor.functions.strings.Abbreviate;
import org.graylog.plugins.pipelineprocessor.functions.strings.Capitalize;
import org.graylog.plugins.pipelineprocessor.functions.strings.Concat;
Expand Down Expand Up @@ -100,6 +102,8 @@ protected void configure() {
addMessageProcessorFunction(CreateMessage.NAME, CreateMessage.class);
addMessageProcessorFunction(CloneMessage.NAME, CloneMessage.class);
addMessageProcessorFunction(RouteToStream.NAME, RouteToStream.class);
// helper service for route_to_stream
serviceBinder().addBinding().to(StreamCacheService.class).in(Scopes.SINGLETON);

// input related functions
addMessageProcessorFunction(FromInput.NAME, FromInput.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,18 @@
*/
package org.graylog.plugins.pipelineprocessor.functions.messages;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.inject.Inject;

import org.graylog.plugins.pipelineprocessor.EvaluationContext;
import org.graylog.plugins.pipelineprocessor.ast.functions.AbstractFunction;
import org.graylog.plugins.pipelineprocessor.ast.functions.FunctionArgs;
import org.graylog.plugins.pipelineprocessor.ast.functions.FunctionDescriptor;
import org.graylog.plugins.pipelineprocessor.ast.functions.ParameterDescriptor;
import org.graylog2.database.NotFoundException;
import org.graylog2.plugin.Message;
import org.graylog2.plugin.streams.Stream;
import org.graylog2.streams.StreamService;

import java.util.Collection;
import java.util.Collections;

import static com.google.common.collect.ImmutableList.of;
import static org.graylog.plugins.pipelineprocessor.ast.functions.ParameterDescriptor.string;
Expand All @@ -38,50 +38,48 @@ public class RouteToStream extends AbstractFunction<Void> {
public static final String NAME = "route_to_stream";
private static final String ID_ARG = "id";
private static final String NAME_ARG = "name";
private final StreamService streamService;
private final StreamCacheService streamCacheService;
private final ParameterDescriptor<Message, Message> messageParam;
private final ParameterDescriptor<String, String> nameParam;
private final ParameterDescriptor<String, String> idParam;

@Inject
public RouteToStream(StreamService streamService) {
this.streamService = streamService;
streamService.loadAllEnabled();
public RouteToStream(StreamCacheService streamCacheService) {
this.streamCacheService = streamCacheService;

messageParam = type("message", Message.class).optional().description("The message to use, defaults to '$message'").build();
nameParam = string(NAME_ARG).optional().description("The name of the stream to route the message to, must match exactly").build();
idParam = string(ID_ARG).optional().description("The ID of the stream, this is much faster than using 'name'").build();
idParam = string(ID_ARG).optional().description("The ID of the stream").build();
}

@Override
public Void evaluate(FunctionArgs args, EvaluationContext context) {
String id = idParam.optional(args, context).orElse("");

final Stream stream;
final Collection<Stream> streams;
if ("".equals(id)) {
final String name = nameParam.optional(args, context).orElse("");
if ("".equals(name)) {
return null;
}
// TODO efficiency
final ImmutableMap<String, Stream> stringStreamImmutableMap = Maps.uniqueIndex(streamService.loadAll(),
Stream::getTitle);
stream = stringStreamImmutableMap.get(name);
if (stream == null) {
streams = streamCacheService.getByName(name);
if (streams.isEmpty()) {
// TODO signal error somehow
return null;
}
} else {
try {
stream = streamService.load(id);
} catch (NotFoundException e) {
final Stream stream = streamCacheService.getById(id);
if (stream == null) {
return null;
}
streams = Collections.singleton(stream);
}
if (!stream.isPaused()) {
final Message message = messageParam.optional(args, context).orElse(context.currentMessage());
message.addStream(stream);
}
final Message message = messageParam.optional(args, context).orElse(context.currentMessage());
streams.forEach(stream -> {
if (!stream.isPaused()) {
message.addStream(stream);
}
});
return null;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
/**
* This file is part of Graylog Pipeline Processor.
*
* Graylog Pipeline Processor is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* Graylog Pipeline Processor is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Graylog Pipeline Processor. If not, see <http://www.gnu.org/licenses/>.
*/
package org.graylog.plugins.pipelineprocessor.functions.messages;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Maps;
import com.google.common.collect.MultimapBuilder;
import com.google.common.collect.Multimaps;
import com.google.common.collect.SortedSetMultimap;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import com.google.common.util.concurrent.AbstractIdleService;

import org.graylog2.database.NotFoundException;
import org.graylog2.plugin.streams.Stream;
import org.graylog2.streams.StreamService;
import org.graylog2.streams.events.StreamsChangedEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collection;
import java.util.Comparator;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import javax.annotation.Nullable;
import javax.inject.Inject;
import javax.inject.Named;
import javax.inject.Singleton;

@Singleton
public class StreamCacheService extends AbstractIdleService {
private static final Logger LOG = LoggerFactory.getLogger(StreamCacheService.class);

private final EventBus eventBus;
private final StreamService streamService;
private final ScheduledExecutorService executorService;

private final SortedSetMultimap<String, Stream> nameToStream = Multimaps.synchronizedSortedSetMultimap(
MultimapBuilder.hashKeys()
.treeSetValues(Comparator.comparing(Stream::getId))
.build());
private final Map<String, Stream> idToStream = Maps.newConcurrentMap();

@Inject
public StreamCacheService(EventBus eventBus,
StreamService streamService,
@Named("daemonScheduler") ScheduledExecutorService executorService) {
this.eventBus = eventBus;
this.streamService = streamService;
this.executorService = executorService;
}

@Override
protected void startUp() throws Exception {
streamService.loadAllEnabled().forEach(this::updateCache);
eventBus.register(this);
}

@Override
protected void shutDown() throws Exception {
eventBus.unregister(this);
}

@Subscribe
public void handleStreamUpdate(StreamsChangedEvent event) {
executorService.schedule(() -> updateStreams(event.streamIds()), 0, TimeUnit.SECONDS);
}

@VisibleForTesting
public void updateStreams(Collection<String> ids) {
for (String id : ids) {
LOG.debug("Updating stream id/title cache for id {}", id);
try {
final Stream stream = streamService.load(id);
if (stream.getDisabled()) {
purgeCache(stream.getId());
} else {
updateCache(stream);
}
} catch (NotFoundException e) {
// the stream was deleted, we only have to purge the existing entries
purgeCache(id);
}
}
}

private void purgeCache(String id) {
final Stream stream = idToStream.remove(id);
LOG.debug("Purging stream id/title cache for id {}, stream {}", id, stream);
if (stream != null) {
nameToStream.remove(stream.getTitle(), stream);
}
}

private void updateCache(Stream stream) {
LOG.debug("Updating stream id/title cache for {}/'{}'", stream.getId(), stream.getTitle());
idToStream.put(stream.getId(), stream);
nameToStream.put(stream.getTitle(), stream);
}


public Collection<Stream> getByName(String name) {
return nameToStream.get(name);
}

@Nullable
public Stream getById(String id) {
return idToStream.get(id);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,15 @@
*/
package org.graylog.plugins.pipelineprocessor.functions;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.eventbus.EventBus;
import com.google.common.net.InetAddresses;

import com.fasterxml.jackson.databind.ObjectMapper;

import org.graylog.plugins.pipelineprocessor.BaseParserTest;
import org.graylog.plugins.pipelineprocessor.EvaluationContext;
import org.graylog.plugins.pipelineprocessor.ast.Rule;
Expand Down Expand Up @@ -66,6 +69,7 @@
import org.graylog.plugins.pipelineprocessor.functions.messages.RouteToStream;
import org.graylog.plugins.pipelineprocessor.functions.messages.SetField;
import org.graylog.plugins.pipelineprocessor.functions.messages.SetFields;
import org.graylog.plugins.pipelineprocessor.functions.messages.StreamCacheService;
import org.graylog.plugins.pipelineprocessor.functions.strings.Abbreviate;
import org.graylog.plugins.pipelineprocessor.functions.strings.Capitalize;
import org.graylog.plugins.pipelineprocessor.functions.strings.Concat;
Expand All @@ -86,6 +90,7 @@
import org.graylog.plugins.pipelineprocessor.functions.urls.UrlConversion;
import org.graylog.plugins.pipelineprocessor.parser.FunctionRegistry;
import org.graylog.plugins.pipelineprocessor.parser.ParseException;
import org.graylog2.database.NotFoundException;
import org.graylog2.grok.GrokPattern;
import org.graylog2.grok.GrokPatternRegistry;
import org.graylog2.grok.GrokPatternService;
Expand All @@ -102,6 +107,7 @@
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.ArgumentMatchers;

import java.util.Map;
import java.util.Set;
Expand All @@ -111,12 +117,15 @@
import static org.assertj.core.api.Assertions.fail;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class FunctionsSnippetsTest extends BaseParserTest {

public static final DateTime GRAYLOG_EPOCH = DateTime.parse("2010-07-30T16:03:25Z");
private static final EventBus eventBus = new EventBus();
private static StreamCacheService streamCacheService;

@BeforeClass
public static void registerFunctions() {
Expand Down Expand Up @@ -144,9 +153,24 @@ public static void registerFunctions() {
when(stream.isPaused()).thenReturn(false);
when(stream.getTitle()).thenReturn("some name");
when(stream.getId()).thenReturn("id");
when(streamService.loadAll()).thenReturn(Lists.newArrayList(stream));

functions.put(RouteToStream.NAME, new RouteToStream(streamService));
final Stream stream2 = mock(Stream.class);
when(stream2.isPaused()).thenReturn(false);
when(stream2.getTitle()).thenReturn("some name");
when(stream2.getId()).thenReturn("id2");

when(streamService.loadAll()).thenReturn(Lists.newArrayList(stream, stream2));
when(streamService.loadAllEnabled()).thenReturn(Lists.newArrayList(stream, stream2));
try {
when(streamService.load(anyString())).thenThrow(new NotFoundException());
when(streamService.load(ArgumentMatchers.eq("id"))).thenReturn(stream);
when(streamService.load(ArgumentMatchers.eq("id2"))).thenReturn(stream2);
} catch (NotFoundException ignored) {
// oh well, checked exceptions <3
}
streamCacheService = new StreamCacheService(eventBus, streamService, null);
streamCacheService.startAsync().awaitRunning();
functions.put(RouteToStream.NAME, new RouteToStream(streamCacheService));

// input related functions
// TODO needs mock
Expand Down Expand Up @@ -690,4 +714,20 @@ public void dateArithmetic() {
DateTimeUtils.setCurrentMillisSystem();
}
}

@Test
public void routeToStream() {
final Rule rule = parser.parseRule(ruleForTest(), true);
final Message message = evaluateRule(rule);

assertThat(message).isNotNull();
assertThat(message.getStreams()).isNotEmpty();
assertThat(message.getStreams().size()).isEqualTo(2);

streamCacheService.updateStreams(ImmutableSet.of("id"));

final Message message2 = evaluateRule(rule);
assertThat(message2).isNotNull();
assertThat(message2.getStreams().size()).isEqualTo(2);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package org.graylog.plugins.pipelineprocessor.functions.messages;

import com.google.common.eventbus.EventBus;

import org.graylog2.plugin.streams.Stream;
import org.graylog2.streams.StreamService;
import org.junit.Test;

import java.util.Collection;
import java.util.concurrent.Executors;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.Mockito.mock;

public class StreamCacheServiceTest {
@Test
public void getByName() throws Exception {
final StreamCacheService streamCacheService = new StreamCacheService(new EventBus(), mock(StreamService.class), Executors.newSingleThreadScheduledExecutor());

// make sure getByName always returns a collection
final Collection<Stream> streams = streamCacheService.getByName("nonexisting");
assertThat(streams).isNotNull().isEmpty();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
rule "stream routing"
when true
then
route_to_stream(name: "some name");
end

0 comments on commit 7c7482a

Please sign in to comment.