Skip to content

Commit

Permalink
Merge branch 'opensearch-project:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
ker2x authored Sep 9, 2023
2 parents 0662ffd + c100c0c commit 157dd0c
Show file tree
Hide file tree
Showing 163 changed files with 2,197 additions and 2,572 deletions.
10 changes: 10 additions & 0 deletions .github/workflows/check-compatibility.yml
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,18 @@ jobs:
with:
name: results.txt

- name: Find Comment
uses: peter-evans/find-comment@v2
id: fc
with:
issue-number: ${{ github.event.number }}
comment-author: 'github-actions[bot]'
body-includes: 'Compatibility status:'

- name: Add comment on the PR
uses: peter-evans/create-or-update-comment@v3
with:
comment-id: ${{ steps.fc.outputs.comment-id }}
issue-number: ${{ github.event.number }}
body-path: results.txt
edit-mode: replace
124 changes: 4 additions & 120 deletions CHANGELOG.md

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.common;

import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;

import java.util.Random;
import java.util.function.Supplier;

@Fork(value = 3)
@Warmup(iterations = 3, time = 1)
@Measurement(iterations = 1, time = 1)
@BenchmarkMode(Mode.Throughput)
public class ArrayRoundingBenchmark {

@Benchmark
public void round(Blackhole bh, Options opts) {
Rounding.Prepared rounding = opts.supplier.get();
for (long key : opts.queries) {
bh.consume(rounding.round(key));
}
}

@State(Scope.Benchmark)
public static class Options {
@Param({
"1",
"2",
"3",
"4",
"5",
"6",
"7",
"8",
"9",
"10",
"12",
"14",
"16",
"18",
"20",
"22",
"24",
"26",
"29",
"32",
"37",
"41",
"45",
"49",
"54",
"60",
"64",
"74",
"83",
"90",
"98",
"108",
"118",
"128",
"144",
"159",
"171",
"187",
"204",
"229",
"256" })
public Integer size;

@Param({ "binary", "linear" })
public String type;

@Param({ "uniform", "skewed_edge", "skewed_center" })
public String distribution;

public long[] queries;
public Supplier<Rounding.Prepared> supplier;

@Setup
public void setup() {
Random random = new Random(size);
long[] values = new long[size];
for (int i = 1; i < values.length; i++) {
values[i] = values[i - 1] + 100;
}

long range = values[values.length - 1] - values[0] + 100;
long mean, stddev;
queries = new long[1000000];

switch (distribution) {
case "uniform": // all values equally likely.
for (int i = 0; i < queries.length; i++) {
queries[i] = values[0] + (nextPositiveLong(random) % range);
}
break;
case "skewed_edge": // distribution centered at p90 with ± 5% stddev.
mean = values[0] + (long) (range * 0.9);
stddev = (long) (range * 0.05);
for (int i = 0; i < queries.length; i++) {
queries[i] = Math.max(values[0], mean + (long) (random.nextGaussian() * stddev));
}
break;
case "skewed_center": // distribution centered at p50 with ± 5% stddev.
mean = values[0] + (long) (range * 0.5);
stddev = (long) (range * 0.05);
for (int i = 0; i < queries.length; i++) {
queries[i] = Math.max(values[0], mean + (long) (random.nextGaussian() * stddev));
}
break;
default:
throw new IllegalArgumentException("invalid distribution: " + distribution);
}

switch (type) {
case "binary":
supplier = () -> new Rounding.BinarySearchArrayRounding(values, size, null);
break;
case "linear":
supplier = () -> new Rounding.BidirectionalLinearSearchArrayRounding(values, size, null);
break;
default:
throw new IllegalArgumentException("invalid type: " + type);
}
}

private static long nextPositiveLong(Random random) {
return random.nextLong() & Long.MAX_VALUE;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,13 @@ protected void addDefaultAttributes(Span span) {
}

@Override
public Span startSpan(String spanName, Map<String, List<String>> headers, Attributes attributes) {
public Span startSpan(SpanCreationContext spanCreationContext, Map<String, List<String>> headers) {
Optional<Span> propagatedSpan = tracingTelemetry.getContextPropagator().extractFromHeaders(headers);
return startSpan(spanName, propagatedSpan.map(SpanContext::new).orElse(null), attributes);
return startSpan(
spanCreationContext.getSpanName(),
propagatedSpan.map(SpanContext::new).orElse(null),
spanCreationContext.getAttributes()
);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

import org.opensearch.common.annotation.ExperimentalApi;
import org.opensearch.telemetry.tracing.Span;
import org.opensearch.telemetry.tracing.attributes.Attributes;
import org.opensearch.telemetry.tracing.SpanCreationContext;

import java.util.List;
import java.util.Map;
Expand All @@ -28,10 +28,9 @@ public interface HttpTracer {
/**
* Start the span with propagating the tracing info from the HttpRequest header.
*
* @param spanName span name.
* @param spanCreationContext span name.
* @param header http request header.
* @param attributes span attributes.
* @return span.
*/
Span startSpan(String spanName, Map<String, List<String>> header, Attributes attributes);
Span startSpan(SpanCreationContext spanCreationContext, Map<String, List<String>> header);
}
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void close() {
}

@Override
public Span startSpan(String spanName, Map<String, List<String>> header, Attributes attributes) {
public Span startSpan(SpanCreationContext spanCreationContext, Map<String, List<String>> header) {
return NoopSpan.INSTANCE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;

import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
Expand Down Expand Up @@ -252,42 +252,33 @@ public void testEndSpanByClosingSpanScopeMultiple() {
public void testSpanAcrossThreads() {
TracingTelemetry tracingTelemetry = new MockTracingTelemetry();
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
AtomicReference<Span> currentSpanRefThread1 = new AtomicReference<>();
AtomicReference<Span> currentSpanRefThread2 = new AtomicReference<>();
AtomicReference<Span> currentSpanRefAfterEndThread2 = new AtomicReference<>();

AtomicReference<Span> spanRef = new AtomicReference<>();
AtomicReference<Span> spanT2Ref = new AtomicReference<>();

ThreadContextBasedTracerContextStorage spanTracerStorage = new ThreadContextBasedTracerContextStorage(
threadContext,
tracingTelemetry
);
DefaultTracer defaultTracer = new DefaultTracer(tracingTelemetry, spanTracerStorage);

executorService.execute(() -> {
CompletableFuture<?> asyncTask = CompletableFuture.runAsync(() -> {
// create a span
Span span = defaultTracer.startSpan(new SpanCreationContext("span_name_t_1", Attributes.EMPTY));
SpanScope spanScope = defaultTracer.withSpanInScope(span);
spanRef.set(span);

executorService.execute(() -> {
CompletableFuture<?> asyncTask1 = CompletableFuture.runAsync(() -> {
Span spanT2 = defaultTracer.startSpan(new SpanCreationContext("span_name_t_2", Attributes.EMPTY));
SpanScope spanScopeT2 = defaultTracer.withSpanInScope(spanT2);
spanT2Ref.set(spanT2);

currentSpanRefThread2.set(defaultTracer.getCurrentSpan().getSpan());
assertEquals(spanT2, defaultTracer.getCurrentSpan().getSpan());

spanT2.endSpan();
spanScopeT2.close();
currentSpanRefAfterEndThread2.set(getCurrentSpanFromContext(defaultTracer));
});
spanT2.endSpan();
assertEquals(null, defaultTracer.getCurrentSpan());
}, executorService);
asyncTask1.join();
spanScope.close();
currentSpanRefThread1.set(getCurrentSpanFromContext(defaultTracer));
});
assertEquals(spanT2Ref.get(), currentSpanRefThread2.get());
assertEquals(spanRef.get(), currentSpanRefAfterEndThread2.get());
assertEquals(null, currentSpanRefThread1.get());
span.endSpan();
assertEquals(null, defaultTracer.getCurrentSpan());
}, executorService);
asyncTask.join();
}

public void testSpanCloseOnThread2() {
Expand All @@ -297,27 +288,27 @@ public void testSpanCloseOnThread2() {
threadContext,
tracingTelemetry
);
AtomicReference<SpanContext> currentSpanRefThread1 = new AtomicReference<>();
AtomicReference<SpanContext> currentSpanRefThread2 = new AtomicReference<>();
DefaultTracer defaultTracer = new DefaultTracer(tracingTelemetry, spanTracerStorage);
final Span span = defaultTracer.startSpan(new SpanCreationContext("span_name_t1", Attributes.EMPTY));
try (SpanScope spanScope = defaultTracer.withSpanInScope(span)) {
executorService.execute(() -> async(new ActionListener<Boolean>() {
CompletableFuture<?> asyncTask = CompletableFuture.runAsync(() -> async(new ActionListener<Boolean>() {
@Override
public void onResponse(Boolean response) {
span.endSpan();
currentSpanRefThread2.set(defaultTracer.getCurrentSpan());
try (SpanScope s = defaultTracer.withSpanInScope(span)) {
assertEquals(span, defaultTracer.getCurrentSpan().getSpan());
} finally {
span.endSpan();
}
}

@Override
public void onFailure(Exception e) {

}
}));
currentSpanRefThread1.set(defaultTracer.getCurrentSpan());
}), executorService);
assertEquals(span, defaultTracer.getCurrentSpan().getSpan());
asyncTask.join();
}
assertEquals(null, currentSpanRefThread2.get());
assertEquals(span, currentSpanRefThread1.get().getSpan());
assertEquals(null, defaultTracer.getCurrentSpan());
}

Expand All @@ -337,57 +328,45 @@ private void async(ActionListener<Boolean> actionListener) {
public void testSpanAcrossThreadsMultipleSpans() {
TracingTelemetry tracingTelemetry = new MockTracingTelemetry();
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
AtomicReference<Span> currentSpanRefThread1 = new AtomicReference<>();
AtomicReference<Span> currentSpanRefThread2 = new AtomicReference<>();
AtomicReference<Span> currentSpanRefAfterEndThread2 = new AtomicReference<>();

AtomicReference<Span> parentSpanRef = new AtomicReference<>();
AtomicReference<Span> spanRef = new AtomicReference<>();
AtomicReference<Span> spanT2Ref = new AtomicReference<>();

ThreadContextBasedTracerContextStorage spanTracerStorage = new ThreadContextBasedTracerContextStorage(
threadContext,
tracingTelemetry
);
DefaultTracer defaultTracer = new DefaultTracer(tracingTelemetry, spanTracerStorage);

executorService.execute(() -> {
CompletableFuture<?> asyncTask = CompletableFuture.runAsync(() -> {
// create a parent span
Span parentSpan = defaultTracer.startSpan(new SpanCreationContext("p_span_name", Attributes.EMPTY));
SpanScope parentSpanScope = defaultTracer.withSpanInScope(parentSpan);
parentSpanRef.set(parentSpan);
// create a span
Span span = defaultTracer.startSpan(new SpanCreationContext("span_name_t_1", Attributes.EMPTY));
SpanScope spanScope = defaultTracer.withSpanInScope(span);
spanRef.set(span);

executorService.execute(() -> {
CompletableFuture<?> asyncTask1 = CompletableFuture.runAsync(() -> {
Span spanT2 = defaultTracer.startSpan(new SpanCreationContext("span_name_t_2", Attributes.EMPTY));
SpanScope spanScopeT2 = defaultTracer.withSpanInScope(spanT2);

Span spanT21 = defaultTracer.startSpan(new SpanCreationContext("span_name_t_2", Attributes.EMPTY));
SpanScope spanScopeT21 = defaultTracer.withSpanInScope(spanT2);
spanT2Ref.set(spanT21);
currentSpanRefThread2.set(defaultTracer.getCurrentSpan().getSpan());

spanT21.endSpan();
SpanScope spanScopeT21 = defaultTracer.withSpanInScope(spanT21);
assertEquals(spanT21, defaultTracer.getCurrentSpan().getSpan());
spanScopeT21.close();
spanT21.endSpan();

spanT2.endSpan();
spanScopeT2.close();
currentSpanRefAfterEndThread2.set(getCurrentSpanFromContext(defaultTracer));
});
spanT2.endSpan();

assertEquals(null, defaultTracer.getCurrentSpan());
}, executorService);

asyncTask1.join();

spanScope.close();
span.endSpan();
parentSpanScope.close();
currentSpanRefThread1.set(getCurrentSpanFromContext(defaultTracer));
});
assertEquals(spanT2Ref.get(), currentSpanRefThread2.get());
assertEquals(spanRef.get(), currentSpanRefAfterEndThread2.get());
assertEquals(null, currentSpanRefThread1.get());
}

private static Span getCurrentSpanFromContext(DefaultTracer defaultTracer) {
return defaultTracer.getCurrentSpan() != null ? defaultTracer.getCurrentSpan().getSpan() : null;
parentSpan.endSpan();
assertEquals(null, defaultTracer.getCurrentSpan());
}, executorService);
asyncTask.join();
}

public void testClose() throws IOException {
Expand Down
Loading

0 comments on commit 157dd0c

Please sign in to comment.