Skip to content

Commit

Permalink
[Pull-based Ingestion] Offset management, support rewind by offset or…
Browse files Browse the repository at this point in the history
… timestamp (#17354)

* initial commit

Signed-off-by: xuxiong1 <xiongxug@outlook.com>

* add tests

Signed-off-by: xuxiong1 <xiongxug@outlook.com>

* resolve comments

Signed-off-by: xuxiong1 <xiongxug@outlook.com>

* support optional auto offset config

Signed-off-by: xuxiong1 <xiongxug@outlook.com>

* Update DefaultStreamPollerTests with countDownLatch

Signed-off-by: xuxiong1 <xiongxug@outlook.com>

* use long as timestamp type

Signed-off-by: xuxiong1 <xiongxug@outlook.com>

* add change log

Signed-off-by: xuxiong1 <xiongxug@outlook.com>

---------

Signed-off-by: xuxiong1 <xiongxug@outlook.com>
  • Loading branch information
xuxiong1 authored Feb 24, 2025
1 parent 4648c3f commit 0714a1b
Show file tree
Hide file tree
Showing 15 changed files with 380 additions and 40 deletions.
1 change: 1 addition & 0 deletions CHANGELOG-3.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Added ConfigurationUtils to core for the ease of configuration parsing [#17223](https://github.com/opensearch-project/OpenSearch/pull/17223)
- Add execution_hint to cardinality aggregator request (#[17312](https://github.com/opensearch-project/OpenSearch/pull/17312))
- Arrow Flight RPC plugin with Flight server bootstrap logic and client for internode communication ([#16962](https://github.com/opensearch-project/OpenSearch/pull/16962))
- Added offset management for the pull-based Ingestion ([#17354](https://github.com/opensearch-project/OpenSearch/pull/17354))

### Dependencies
- Update Apache Lucene to 10.1.0 ([#16366](https://github.com/opensearch-project/OpenSearch/pull/16366))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,69 @@ public void testKafkaIngestion() {
}
}

public void testKafkaIngestion_RewindByTimeStamp() {
try {
setupKafka();
// create an index with ingestion source from kafka
createIndex(
"test_rewind_by_timestamp",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put("ingestion_source.type", "kafka")
.put("ingestion_source.pointer.init.reset", "rewind_by_timestamp")
// 1739459500000 is the timestamp of the first message
// 1739459800000 is the timestamp of the second message
// by resetting to 1739459600000, only the second message will be ingested
.put("ingestion_source.pointer.init.reset.value", "1739459600000")
.put("ingestion_source.param.topic", "test")
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
.put("ingestion_source.param.auto.offset.reset", "latest")
.build(),
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
);

RangeQueryBuilder query = new RangeQueryBuilder("age").gte(0);
await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
refresh("test_rewind_by_timestamp");
SearchResponse response = client().prepareSearch("test_rewind_by_timestamp").setQuery(query).get();
assertThat(response.getHits().getTotalHits().value(), is(1L));
});
} finally {
stopKafka();
}
}

public void testKafkaIngestion_RewindByOffset() {
try {
setupKafka();
// create an index with ingestion source from kafka
createIndex(
"test_rewind_by_offset",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put("ingestion_source.type", "kafka")
.put("ingestion_source.pointer.init.reset", "rewind_by_offset")
.put("ingestion_source.pointer.init.reset.value", "1")
.put("ingestion_source.param.topic", "test")
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
.put("ingestion_source.param.auto.offset.reset", "latest")
.build(),
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
);

RangeQueryBuilder query = new RangeQueryBuilder("age").gte(0);
await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
refresh("test_rewind_by_offset");
SearchResponse response = client().prepareSearch("test_rewind_by_offset").setQuery(query).get();
assertThat(response.getHits().getTotalHits().value(), is(1L));
});
} finally {
stopKafka();
}
}

private void setupKafka() {
kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
// disable topic auto creation
Expand All @@ -122,10 +185,14 @@ private void prepareKafkaData() {
Properties props = new Properties();
props.put("bootstrap.servers", kafka.getBootstrapServers());
Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
producer.send(new ProducerRecord<>(topicName, "null", "{\"_id\":\"1\",\"_source\":{\"name\":\"bob\", \"age\": 24}}"));
producer.send(
new ProducerRecord<>(topicName, null, 1739459500000L, "null", "{\"_id\":\"1\",\"_source\":{\"name\":\"bob\", \"age\": 24}}")
);
producer.send(
new ProducerRecord<>(
topicName,
null,
1739459800000L,
"null",
"{\"_id\":\"2\", \"_op_type:\":\"index\",\"_source\":{\"name\":\"alice\", \"age\": 20}}"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@
package org.opensearch.plugin.kafka;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
Expand All @@ -27,6 +30,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeoutException;

Expand All @@ -47,6 +51,7 @@ public class KafkaPartitionConsumer implements IngestionShardConsumer<KafkaOffse
private long lastFetchedOffset = -1;
final String clientId;
final TopicPartition topicPartition;
final KafkaSourceConfig config;

/**
* Constructor
Expand All @@ -68,6 +73,7 @@ public KafkaPartitionConsumer(String clientId, KafkaSourceConfig config, int par
protected KafkaPartitionConsumer(String clientId, KafkaSourceConfig config, int partitionId, Consumer<byte[], byte[]> consumer) {
this.clientId = clientId;
this.consumer = consumer;
this.config = config;
String topic = config.getTopic();
List<PartitionInfo> partitionInfos = AccessController.doPrivileged(
(PrivilegedAction<List<PartitionInfo>>) () -> consumer.partitionsFor(topic, Duration.ofMillis(timeoutMillis))
Expand All @@ -93,6 +99,9 @@ protected static Consumer<byte[], byte[]> createConsumer(String clientId, KafkaS
Properties consumerProp = new Properties();
consumerProp.put("bootstrap.servers", config.getBootstrapServers());
consumerProp.put("client.id", clientId);
if (config.getAutoOffsetResetConfig() != null && !config.getAutoOffsetResetConfig().isEmpty()) {
consumerProp.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, config.getAutoOffsetResetConfig());
}
// TODO: why Class org.apache.kafka.common.serialization.StringDeserializer could not be found if set the deserializer as prop?
// consumerProp.put("key.deserializer",
// "org.apache.kafka.common.serialization.StringDeserializer");
Expand Down Expand Up @@ -140,6 +149,43 @@ public IngestionShardPointer latestPointer() {
return new KafkaOffset(endOffset);
}

@Override
public IngestionShardPointer pointerFromTimestampMillis(long timestampMillis) {
long offset = AccessController.doPrivileged((PrivilegedAction<Long>) () -> {
Map<TopicPartition, OffsetAndTimestamp> position = consumer.offsetsForTimes(
Collections.singletonMap(topicPartition, timestampMillis)
);
if (position == null || position.isEmpty()) {
return -1L;
}
OffsetAndTimestamp offsetAndTimestamp = position.values().iterator().next();
if (offsetAndTimestamp == null) {
return -1L;
}
return offsetAndTimestamp.offset();
});
if (offset < 0) {
logger.warn("No message found for timestamp {}, fall back to auto.offset.reset policy", timestampMillis);
String autoOffsetResetConfig = config.getAutoOffsetResetConfig();
if (OffsetResetStrategy.EARLIEST.toString().equals(autoOffsetResetConfig)) {
logger.warn("The auto.offset.reset is set to earliest, seek to earliest pointer");
return earliestPointer();
} else if (OffsetResetStrategy.LATEST.toString().equals(autoOffsetResetConfig)) {
logger.warn("The auto.offset.reset is set to latest, seek to latest pointer");
return latestPointer();
} else {
throw new IllegalArgumentException("No message found for timestamp " + timestampMillis);
}
}
return new KafkaOffset(offset);
}

@Override
public IngestionShardPointer pointerFromOffset(String offset) {
long offsetValue = Long.parseLong(offset);
return new KafkaOffset(offsetValue);
}

private synchronized List<ReadResult<KafkaOffset, KafkaMessage>> fetch(long startOffset, long maxMessages, int timeoutMillis) {
if (lastFetchedOffset < 0 || lastFetchedOffset != startOffset - 1) {
logger.info("Seeking to offset {}", startOffset);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
public class KafkaSourceConfig {
private final String PROP_TOPIC = "topic";
private final String PROP_BOOTSTRAP_SERVERS = "bootstrap_servers";
// TODO: support pass any generic kafka configs
private final String PROP_AUTO_OFFSET_RESET = "auto.offset.reset";

private final String topic;
private final String bootstrapServers;
private final String autoOffsetResetConfig;

/**
* Constructor
Expand All @@ -29,6 +32,7 @@ public class KafkaSourceConfig {
public KafkaSourceConfig(Map<String, Object> params) {
this.topic = ConfigurationUtils.readStringProperty(params, PROP_TOPIC);
this.bootstrapServers = ConfigurationUtils.readStringProperty(params, PROP_BOOTSTRAP_SERVERS);
this.autoOffsetResetConfig = ConfigurationUtils.readOptionalStringProperty(params, PROP_AUTO_OFFSET_RESET);
}

/**
Expand All @@ -47,4 +51,13 @@ public String getTopic() {
public String getBootstrapServers() {
return bootstrapServers;
}

/**
* Get the auto offset reset configuration
*
* @return the auto offset reset configuration
*/
public String getAutoOffsetResetConfig() {
return autoOffsetResetConfig;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,22 @@ public void testLatestPointer() {
assertEquals(10L, offset.getOffset());
}

public void testPointerFromTimestampMillis() {
TopicPartition topicPartition = new TopicPartition("test-topic", 0);
when(mockConsumer.offsetsForTimes(Collections.singletonMap(topicPartition, 1000L))).thenReturn(
Collections.singletonMap(topicPartition, new org.apache.kafka.clients.consumer.OffsetAndTimestamp(5L, 1000L))
);

KafkaOffset offset = (KafkaOffset) consumer.pointerFromTimestampMillis(1000);

assertEquals(5L, offset.getOffset());
}

public void testPointerFromOffset() {
KafkaOffset offset = new KafkaOffset(5L);
assertEquals(5L, offset.getOffset());
}

public void testTopicDoesNotExist() {
Map<String, Object> params = new HashMap<>();
params.put("topic", "non-existent-topic");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -716,19 +716,58 @@ public void validate(final String value, final Map<Setting<?>, Object> settings)

@Override
public void validate(final String value) {
if (!(value.equalsIgnoreCase(StreamPoller.ResetState.LATEST.name())
|| value.equalsIgnoreCase(StreamPoller.ResetState.EARLIEST.name()))) {
if (!isValidResetState(value)) {
throw new IllegalArgumentException(
"Invalid value for " + SETTING_INGESTION_SOURCE_POINTER_INIT_RESET + " [" + value + "]"
);
}
}

@Override
public void validate(final String value, final Map<Setting<?>, Object> settings) {}
public void validate(final String value, final Map<Setting<?>, Object> settings) {
if (isRewindState(value)) {
// Ensure the reset value setting is provided when rewinding.
final String resetValue = (String) settings.get(INGESTION_SOURCE_POINTER_INIT_RESET_VALUE_SETTING);
if (resetValue == null || resetValue.isEmpty()) {
throw new IllegalArgumentException(
"Setting "
+ INGESTION_SOURCE_POINTER_INIT_RESET_VALUE_SETTING.getKey()
+ " should be set when REWIND_BY_OFFSET or REWIND_BY_TIMESTAMP"
);
}
}
}

private boolean isValidResetState(String value) {
return StreamPoller.ResetState.LATEST.name().equalsIgnoreCase(value)
|| StreamPoller.ResetState.EARLIEST.name().equalsIgnoreCase(value)
|| isRewindState(value);
}

private boolean isRewindState(String value) {
return StreamPoller.ResetState.REWIND_BY_OFFSET.name().equalsIgnoreCase(value)
|| StreamPoller.ResetState.REWIND_BY_TIMESTAMP.name().equalsIgnoreCase(value);
}

@Override
public Iterator<Setting<?>> settings() {
final List<Setting<?>> settings = Collections.singletonList(INGESTION_SOURCE_POINTER_INIT_RESET_VALUE_SETTING);
return settings.iterator();
}
},
Property.IndexScope,
Property.Dynamic
Property.Final
);

/**
* Defines the setting for the value to be used when resetting by offset or timestamp.
*/
public static final String SETTING_INGESTION_SOURCE_POINTER_INIT_RESET_VALUE = "index.ingestion_source.pointer.init.reset.value";
public static final Setting<String> INGESTION_SOURCE_POINTER_INIT_RESET_VALUE_SETTING = Setting.simpleString(
SETTING_INGESTION_SOURCE_POINTER_INIT_RESET_VALUE,
"",
Property.IndexScope,
Property.Final
);

public static final Setting.AffixSetting<Object> INGESTION_SOURCE_PARAMS_SETTING = Setting.prefixKeySetting(
Expand Down Expand Up @@ -954,7 +993,14 @@ public Version getCreationVersion() {
public IngestionSource getIngestionSource() {
final String ingestionSourceType = INGESTION_SOURCE_TYPE_SETTING.get(settings);
if (ingestionSourceType != null && !(NONE_INGESTION_SOURCE_TYPE.equals(ingestionSourceType))) {
final String pointerInitReset = INGESTION_SOURCE_POINTER_INIT_RESET_SETTING.get(settings);
final StreamPoller.ResetState pointerInitResetType = StreamPoller.ResetState.valueOf(
INGESTION_SOURCE_POINTER_INIT_RESET_SETTING.get(settings).toUpperCase(Locale.ROOT)
);
final String pointerInitResetValue = INGESTION_SOURCE_POINTER_INIT_RESET_VALUE_SETTING.get(settings);
IngestionSource.PointerInitReset pointerInitReset = new IngestionSource.PointerInitReset(
pointerInitResetType,
pointerInitResetValue
);
final Map<String, Object> ingestionSourceParams = INGESTION_SOURCE_PARAMS_SETTING.getAsMap(settings);
return new IngestionSource(ingestionSourceType, pointerInitReset, ingestionSourceParams);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.cluster.metadata;

import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.indices.pollingingest.StreamPoller;

import java.util.Map;
import java.util.Objects;
Expand All @@ -19,10 +20,10 @@
@ExperimentalApi
public class IngestionSource {
private String type;
private String pointerInitReset;
private PointerInitReset pointerInitReset;
private Map<String, Object> params;

public IngestionSource(String type, String pointerInitReset, Map<String, Object> params) {
public IngestionSource(String type, PointerInitReset pointerInitReset, Map<String, Object> params) {
this.type = type;
this.pointerInitReset = pointerInitReset;
this.params = params;
Expand All @@ -32,7 +33,7 @@ public String getType() {
return type;
}

public String getPointerInitReset() {
public PointerInitReset getPointerInitReset() {
return pointerInitReset;
}

Expand All @@ -59,4 +60,44 @@ public int hashCode() {
public String toString() {
return "IngestionSource{" + "type='" + type + '\'' + ",pointer_init_reset='" + pointerInitReset + '\'' + ", params=" + params + '}';
}

/**
* Class encapsulating the configuration of a pointer initialization.
*/
@ExperimentalApi
public static class PointerInitReset {
private final StreamPoller.ResetState type;
private final String value;

public PointerInitReset(StreamPoller.ResetState type, String value) {
this.type = type;
this.value = value;
}

public StreamPoller.ResetState getType() {
return type;
}

public String getValue() {
return value;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
PointerInitReset pointerInitReset = (PointerInitReset) o;
return Objects.equals(type, pointerInitReset.type) && Objects.equals(value, pointerInitReset.value);
}

@Override
public int hashCode() {
return Objects.hash(type, value);
}

@Override
public String toString() {
return "PointerInitReset{" + "type='" + type + '\'' + ", value=" + value + '}';
}
}
}
Loading

0 comments on commit 0714a1b

Please sign in to comment.