Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Pull-based Ingestion] Offset management, support rewind by offset or timestamp #17354

Merged
merged 7 commits into from
Feb 24, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG-3.0.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Added ConfigurationUtils to core for the ease of configuration parsing [#17223](https://github.com/opensearch-project/OpenSearch/pull/17223)
- Add execution_hint to cardinality aggregator request (#[17312](https://github.com/opensearch-project/OpenSearch/pull/17312))
- Arrow Flight RPC plugin with Flight server bootstrap logic and client for internode communication ([#16962](https://github.com/opensearch-project/OpenSearch/pull/16962))
- Added offset management for the pull-based Ingestion ([#17354](https://github.com/opensearch-project/OpenSearch/pull/17354))

### Dependencies
- Update Apache Lucene to 10.1.0 ([#16366](https://github.com/opensearch-project/OpenSearch/pull/16366))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,69 @@ public void testKafkaIngestion() {
}
}

public void testKafkaIngestion_RewindByTimeStamp() {
try {
setupKafka();
// create an index with ingestion source from kafka
createIndex(
"test_rewind_by_timestamp",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put("ingestion_source.type", "kafka")
.put("ingestion_source.pointer.init.reset", "rewind_by_timestamp")
// 1739459500000 is the timestamp of the first message
// 1739459800000 is the timestamp of the second message
// by resetting to 1739459600000, only the second message will be ingested
.put("ingestion_source.pointer.init.reset.value", "1739459600000")
.put("ingestion_source.param.topic", "test")
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
.put("ingestion_source.param.auto.offset.reset", "latest")
.build(),
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
);

RangeQueryBuilder query = new RangeQueryBuilder("age").gte(0);
await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
refresh("test_rewind_by_timestamp");
SearchResponse response = client().prepareSearch("test_rewind_by_timestamp").setQuery(query).get();
assertThat(response.getHits().getTotalHits().value(), is(1L));
});
} finally {
stopKafka();
}
}

public void testKafkaIngestion_RewindByOffset() {
try {
setupKafka();
// create an index with ingestion source from kafka
createIndex(
"test_rewind_by_offset",
Settings.builder()
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put("ingestion_source.type", "kafka")
.put("ingestion_source.pointer.init.reset", "rewind_by_offset")
.put("ingestion_source.pointer.init.reset.value", "1")
.put("ingestion_source.param.topic", "test")
.put("ingestion_source.param.bootstrap_servers", kafka.getBootstrapServers())
.put("ingestion_source.param.auto.offset.reset", "latest")
.build(),
"{\"properties\":{\"name\":{\"type\": \"text\"},\"age\":{\"type\": \"integer\"}}}}"
);

RangeQueryBuilder query = new RangeQueryBuilder("age").gte(0);
await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
refresh("test_rewind_by_offset");
SearchResponse response = client().prepareSearch("test_rewind_by_offset").setQuery(query).get();
assertThat(response.getHits().getTotalHits().value(), is(1L));
});
} finally {
stopKafka();
}
}

private void setupKafka() {
kafka = new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
// disable topic auto creation
Expand All @@ -122,10 +185,14 @@ private void prepareKafkaData() {
Properties props = new Properties();
props.put("bootstrap.servers", kafka.getBootstrapServers());
Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
producer.send(new ProducerRecord<>(topicName, "null", "{\"_id\":\"1\",\"_source\":{\"name\":\"bob\", \"age\": 24}}"));
producer.send(
new ProducerRecord<>(topicName, null, 1739459500000L, "null", "{\"_id\":\"1\",\"_source\":{\"name\":\"bob\", \"age\": 24}}")
);
producer.send(
new ProducerRecord<>(
topicName,
null,
1739459800000L,
"null",
"{\"_id\":\"2\", \"_op_type:\":\"index\",\"_source\":{\"name\":\"alice\", \"age\": 20}}"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@
package org.opensearch.plugin.kafka;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
Expand All @@ -27,6 +30,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeoutException;

Expand All @@ -47,6 +51,7 @@
private long lastFetchedOffset = -1;
final String clientId;
final TopicPartition topicPartition;
final KafkaSourceConfig config;

/**
* Constructor
Expand All @@ -68,6 +73,7 @@
protected KafkaPartitionConsumer(String clientId, KafkaSourceConfig config, int partitionId, Consumer<byte[], byte[]> consumer) {
this.clientId = clientId;
this.consumer = consumer;
this.config = config;
String topic = config.getTopic();
List<PartitionInfo> partitionInfos = AccessController.doPrivileged(
(PrivilegedAction<List<PartitionInfo>>) () -> consumer.partitionsFor(topic, Duration.ofMillis(timeoutMillis))
Expand All @@ -93,6 +99,9 @@
Properties consumerProp = new Properties();
consumerProp.put("bootstrap.servers", config.getBootstrapServers());
consumerProp.put("client.id", clientId);
if (config.getAutoOffsetResetConfig() != null && !config.getAutoOffsetResetConfig().isEmpty()) {
consumerProp.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, config.getAutoOffsetResetConfig());

Check warning on line 103 in plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaPartitionConsumer.java

View check run for this annotation

Codecov / codecov/patch

plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaPartitionConsumer.java#L103

Added line #L103 was not covered by tests
}
// TODO: why Class org.apache.kafka.common.serialization.StringDeserializer could not be found if set the deserializer as prop?
// consumerProp.put("key.deserializer",
// "org.apache.kafka.common.serialization.StringDeserializer");
Expand Down Expand Up @@ -140,6 +149,43 @@
return new KafkaOffset(endOffset);
}

@Override
public IngestionShardPointer pointerFromTimestampMillis(long timestampMillis) {
long offset = AccessController.doPrivileged((PrivilegedAction<Long>) () -> {
Map<TopicPartition, OffsetAndTimestamp> position = consumer.offsetsForTimes(
Collections.singletonMap(topicPartition, timestampMillis)
);
if (position == null || position.isEmpty()) {
return -1L;

Check warning on line 159 in plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaPartitionConsumer.java

View check run for this annotation

Codecov / codecov/patch

plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaPartitionConsumer.java#L159

Added line #L159 was not covered by tests
}
OffsetAndTimestamp offsetAndTimestamp = position.values().iterator().next();
if (offsetAndTimestamp == null) {
return -1L;

Check warning on line 163 in plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaPartitionConsumer.java

View check run for this annotation

Codecov / codecov/patch

plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaPartitionConsumer.java#L163

Added line #L163 was not covered by tests
}
return offsetAndTimestamp.offset();
});
if (offset < 0) {
logger.warn("No message found for timestamp {}, fall back to auto.offset.reset policy", timestampMillis);
String autoOffsetResetConfig = config.getAutoOffsetResetConfig();

Check warning on line 169 in plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaPartitionConsumer.java

View check run for this annotation

Codecov / codecov/patch

plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaPartitionConsumer.java#L168-L169

Added lines #L168 - L169 were not covered by tests
if (OffsetResetStrategy.EARLIEST.toString().equals(autoOffsetResetConfig)) {
logger.warn("The auto.offset.reset is set to earliest, seek to earliest pointer");
return earliestPointer();

Check warning on line 172 in plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaPartitionConsumer.java

View check run for this annotation

Codecov / codecov/patch

plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaPartitionConsumer.java#L171-L172

Added lines #L171 - L172 were not covered by tests
} else if (OffsetResetStrategy.LATEST.toString().equals(autoOffsetResetConfig)) {
logger.warn("The auto.offset.reset is set to latest, seek to latest pointer");
return latestPointer();

Check warning on line 175 in plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaPartitionConsumer.java

View check run for this annotation

Codecov / codecov/patch

plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaPartitionConsumer.java#L174-L175

Added lines #L174 - L175 were not covered by tests
} else {
throw new IllegalArgumentException("No message found for timestamp " + timestampMillis);

Check warning on line 177 in plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaPartitionConsumer.java

View check run for this annotation

Codecov / codecov/patch

plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaPartitionConsumer.java#L177

Added line #L177 was not covered by tests
}
}
return new KafkaOffset(offset);
}

@Override
public IngestionShardPointer pointerFromOffset(String offset) {
long offsetValue = Long.parseLong(offset);
return new KafkaOffset(offsetValue);

Check warning on line 186 in plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaPartitionConsumer.java

View check run for this annotation

Codecov / codecov/patch

plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaPartitionConsumer.java#L185-L186

Added lines #L185 - L186 were not covered by tests
}

private synchronized List<ReadResult<KafkaOffset, KafkaMessage>> fetch(long startOffset, long maxMessages, int timeoutMillis) {
if (lastFetchedOffset < 0 || lastFetchedOffset != startOffset - 1) {
logger.info("Seeking to offset {}", startOffset);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@
public class KafkaSourceConfig {
private final String PROP_TOPIC = "topic";
private final String PROP_BOOTSTRAP_SERVERS = "bootstrap_servers";
// TODO: support pass any generic kafka configs
private final String PROP_AUTO_OFFSET_RESET = "auto.offset.reset";

private final String topic;
private final String bootstrapServers;
private final String autoOffsetResetConfig;

/**
* Constructor
Expand All @@ -29,6 +32,7 @@ public class KafkaSourceConfig {
public KafkaSourceConfig(Map<String, Object> params) {
this.topic = ConfigurationUtils.readStringProperty(params, PROP_TOPIC);
this.bootstrapServers = ConfigurationUtils.readStringProperty(params, PROP_BOOTSTRAP_SERVERS);
this.autoOffsetResetConfig = ConfigurationUtils.readOptionalStringProperty(params, PROP_AUTO_OFFSET_RESET);
}

/**
Expand All @@ -47,4 +51,13 @@ public String getTopic() {
public String getBootstrapServers() {
return bootstrapServers;
}

/**
* Get the auto offset reset configuration
*
* @return the auto offset reset configuration
*/
public String getAutoOffsetResetConfig() {
return autoOffsetResetConfig;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,22 @@ public void testLatestPointer() {
assertEquals(10L, offset.getOffset());
}

public void testPointerFromTimestampMillis() {
TopicPartition topicPartition = new TopicPartition("test-topic", 0);
when(mockConsumer.offsetsForTimes(Collections.singletonMap(topicPartition, 1000L))).thenReturn(
Collections.singletonMap(topicPartition, new org.apache.kafka.clients.consumer.OffsetAndTimestamp(5L, 1000L))
);

KafkaOffset offset = (KafkaOffset) consumer.pointerFromTimestampMillis(1000);

assertEquals(5L, offset.getOffset());
}

public void testPointerFromOffset() {
KafkaOffset offset = new KafkaOffset(5L);
assertEquals(5L, offset.getOffset());
}

public void testTopicDoesNotExist() {
Map<String, Object> params = new HashMap<>();
params.put("topic", "non-existent-topic");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -716,19 +716,58 @@

@Override
public void validate(final String value) {
if (!(value.equalsIgnoreCase(StreamPoller.ResetState.LATEST.name())
|| value.equalsIgnoreCase(StreamPoller.ResetState.EARLIEST.name()))) {
if (!isValidResetState(value)) {
throw new IllegalArgumentException(
"Invalid value for " + SETTING_INGESTION_SOURCE_POINTER_INIT_RESET + " [" + value + "]"
);
}
}

@Override
public void validate(final String value, final Map<Setting<?>, Object> settings) {}
public void validate(final String value, final Map<Setting<?>, Object> settings) {
if (isRewindState(value)) {
// Ensure the reset value setting is provided when rewinding.
final String resetValue = (String) settings.get(INGESTION_SOURCE_POINTER_INIT_RESET_VALUE_SETTING);

Check warning on line 730 in server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java#L730

Added line #L730 was not covered by tests
if (resetValue == null || resetValue.isEmpty()) {
throw new IllegalArgumentException(

Check warning on line 732 in server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java#L732

Added line #L732 was not covered by tests
"Setting "
+ INGESTION_SOURCE_POINTER_INIT_RESET_VALUE_SETTING.getKey()

Check warning on line 734 in server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/metadata/IndexMetadata.java#L734

Added line #L734 was not covered by tests
+ " should be set when REWIND_BY_OFFSET or REWIND_BY_TIMESTAMP"
);
}
}
}

private boolean isValidResetState(String value) {
return StreamPoller.ResetState.LATEST.name().equalsIgnoreCase(value)
|| StreamPoller.ResetState.EARLIEST.name().equalsIgnoreCase(value)
|| isRewindState(value);
}

private boolean isRewindState(String value) {
return StreamPoller.ResetState.REWIND_BY_OFFSET.name().equalsIgnoreCase(value)
|| StreamPoller.ResetState.REWIND_BY_TIMESTAMP.name().equalsIgnoreCase(value);
}

@Override
public Iterator<Setting<?>> settings() {
final List<Setting<?>> settings = Collections.singletonList(INGESTION_SOURCE_POINTER_INIT_RESET_VALUE_SETTING);
return settings.iterator();
}
},
Property.IndexScope,
Property.Dynamic
Property.Final
);

/**
* Defines the setting for the value to be used when resetting by offset or timestamp.
*/
public static final String SETTING_INGESTION_SOURCE_POINTER_INIT_RESET_VALUE = "index.ingestion_source.pointer.init.reset.value";
public static final Setting<String> INGESTION_SOURCE_POINTER_INIT_RESET_VALUE_SETTING = Setting.simpleString(
SETTING_INGESTION_SOURCE_POINTER_INIT_RESET_VALUE,
"",
Property.IndexScope,
Property.Final
);

public static final Setting.AffixSetting<Object> INGESTION_SOURCE_PARAMS_SETTING = Setting.prefixKeySetting(
Expand Down Expand Up @@ -954,7 +993,14 @@
public IngestionSource getIngestionSource() {
final String ingestionSourceType = INGESTION_SOURCE_TYPE_SETTING.get(settings);
if (ingestionSourceType != null && !(NONE_INGESTION_SOURCE_TYPE.equals(ingestionSourceType))) {
final String pointerInitReset = INGESTION_SOURCE_POINTER_INIT_RESET_SETTING.get(settings);
final StreamPoller.ResetState pointerInitResetType = StreamPoller.ResetState.valueOf(
INGESTION_SOURCE_POINTER_INIT_RESET_SETTING.get(settings).toUpperCase(Locale.ROOT)
);
final String pointerInitResetValue = INGESTION_SOURCE_POINTER_INIT_RESET_VALUE_SETTING.get(settings);
IngestionSource.PointerInitReset pointerInitReset = new IngestionSource.PointerInitReset(
pointerInitResetType,
pointerInitResetValue
);
final Map<String, Object> ingestionSourceParams = INGESTION_SOURCE_PARAMS_SETTING.getAsMap(settings);
return new IngestionSource(ingestionSourceType, pointerInitReset, ingestionSourceParams);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
package org.opensearch.cluster.metadata;

import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.indices.pollingingest.StreamPoller;

import java.util.Map;
import java.util.Objects;
Expand All @@ -19,10 +20,10 @@
@ExperimentalApi
public class IngestionSource {
private String type;
private String pointerInitReset;
private PointerInitReset pointerInitReset;
private Map<String, Object> params;

public IngestionSource(String type, String pointerInitReset, Map<String, Object> params) {
public IngestionSource(String type, PointerInitReset pointerInitReset, Map<String, Object> params) {
this.type = type;
this.pointerInitReset = pointerInitReset;
this.params = params;
Expand All @@ -32,7 +33,7 @@
return type;
}

public String getPointerInitReset() {
public PointerInitReset getPointerInitReset() {
return pointerInitReset;
}

Expand All @@ -59,4 +60,44 @@
public String toString() {
return "IngestionSource{" + "type='" + type + '\'' + ",pointer_init_reset='" + pointerInitReset + '\'' + ", params=" + params + '}';
}

/**
* Class encapsulating the configuration of a pointer initialization.
*/
@ExperimentalApi
public static class PointerInitReset {
private final StreamPoller.ResetState type;
private final String value;

public PointerInitReset(StreamPoller.ResetState type, String value) {
this.type = type;
this.value = value;
}

public StreamPoller.ResetState getType() {
return type;
}

public String getValue() {
return value;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
PointerInitReset pointerInitReset = (PointerInitReset) o;

Check warning on line 89 in server/src/main/java/org/opensearch/cluster/metadata/IngestionSource.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/cluster/metadata/IngestionSource.java#L89

Added line #L89 was not covered by tests
return Objects.equals(type, pointerInitReset.type) && Objects.equals(value, pointerInitReset.value);
}

@Override
public int hashCode() {
return Objects.hash(type, value);
}

@Override
public String toString() {
return "PointerInitReset{" + "type='" + type + '\'' + ", value=" + value + '}';
}
}
}
Loading
Loading