Skip to content

Commit

Permalink
Add connectTimeout configuration option OtlpGrpc{Signal}Exporters (#6079
Browse files Browse the repository at this point in the history
)
  • Loading branch information
jack-berg authored Feb 24, 2024
1 parent 96fe54f commit 1d4a2f4
Show file tree
Hide file tree
Showing 15 changed files with 132 additions and 6 deletions.
13 changes: 12 additions & 1 deletion docs/apidiffs/current_vs_latest/opentelemetry-exporter-otlp.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,13 @@
Comparing source compatibility of against
No changes.
*** MODIFIED CLASS: PUBLIC FINAL io.opentelemetry.exporter.otlp.logs.OtlpGrpcLogRecordExporterBuilder (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.exporter.otlp.logs.OtlpGrpcLogRecordExporterBuilder setConnectTimeout(long, java.util.concurrent.TimeUnit)
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.exporter.otlp.logs.OtlpGrpcLogRecordExporterBuilder setConnectTimeout(java.time.Duration)
*** MODIFIED CLASS: PUBLIC FINAL io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporterBuilder (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporterBuilder setConnectTimeout(long, java.util.concurrent.TimeUnit)
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.exporter.otlp.metrics.OtlpGrpcMetricExporterBuilder setConnectTimeout(java.time.Duration)
*** MODIFIED CLASS: PUBLIC FINAL io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporterBuilder (not serializable)
=== CLASS FILE FORMAT VERSION: 52.0 <- 52.0
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporterBuilder setConnectTimeout(long, java.util.concurrent.TimeUnit)
+++ NEW METHOD: PUBLIC(+) io.opentelemetry.exporter.otlp.trace.OtlpGrpcSpanExporterBuilder setConnectTimeout(java.time.Duration)
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@
@SuppressWarnings("JavadocMethod")
public class GrpcExporterBuilder<T extends Marshaler> {

public static final long DEFAULT_CONNECT_TIMEOUT_SECS = 10;

private static final Logger LOGGER = Logger.getLogger(GrpcExporterBuilder.class.getName());

private final String exporterName;
Expand All @@ -52,6 +54,7 @@ public class GrpcExporterBuilder<T extends Marshaler> {
grpcStubFactory;

private long timeoutNanos;
private long connectTimeoutNanos = TimeUnit.SECONDS.toNanos(DEFAULT_CONNECT_TIMEOUT_SECS);
private URI endpoint;
@Nullable private Compressor compressor;
private final Map<String, String> constantHeaders = new HashMap<>();
Expand Down Expand Up @@ -92,6 +95,11 @@ public GrpcExporterBuilder<T> setTimeout(Duration timeout) {
return setTimeout(timeout.toNanos(), TimeUnit.NANOSECONDS);
}

public GrpcExporterBuilder<T> setConnectTimeout(long timeout, TimeUnit unit) {
connectTimeoutNanos = unit.toNanos(timeout);
return this;
}

public GrpcExporterBuilder<T> setEndpoint(String endpoint) {
this.endpoint = ExporterBuilderUtil.validateEndpoint(endpoint);
return this;
Expand Down Expand Up @@ -151,6 +159,7 @@ public GrpcExporterBuilder<T> copy() {
grpcEndpointPath);

copy.timeoutNanos = timeoutNanos;
copy.connectTimeoutNanos = connectTimeoutNanos;
copy.endpoint = endpoint;
copy.compressor = compressor;
copy.constantHeaders.putAll(constantHeaders);
Expand Down Expand Up @@ -193,6 +202,7 @@ public GrpcExporter<T> build() {
grpcEndpointPath,
compressor,
timeoutNanos,
connectTimeoutNanos,
headerSupplier,
grpcChannel,
grpcStubFactory,
Expand All @@ -214,6 +224,7 @@ public String toString(boolean includePrefixAndSuffix) {
joiner.add("endpoint=" + endpoint.toString());
joiner.add("endpointPath=" + grpcEndpointPath);
joiner.add("timeoutNanos=" + timeoutNanos);
joiner.add("connectTimeoutNanos=" + connectTimeoutNanos);
joiner.add(
"compressorEncoding="
+ Optional.ofNullable(compressor).map(Compressor::getEncoding).orElse(null));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ <T extends Marshaler> GrpcSender<T> createSender(
String endpointPath,
@Nullable Compressor compressor,
long timeoutNanos,
long connectTimeoutNanos,
Supplier<Map<String, List<String>>> headersSupplier,
@Nullable Object managedChannel,
Supplier<BiFunction<Channel, String, MarshalerServiceStub<T, ?, ?>>> stubFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ public void setUp() {
.toString(),
null,
10,
10,
Collections::emptyMap,
null,
null,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,26 @@ public OtlpGrpcLogRecordExporterBuilder setTimeout(Duration timeout) {
return this;
}

/**
* Sets the maximum time to wait for new connections to be established. If unset, defaults to
* {@value GrpcExporterBuilder#DEFAULT_CONNECT_TIMEOUT_SECS}s.
*/
public OtlpGrpcLogRecordExporterBuilder setConnectTimeout(long timeout, TimeUnit unit) {
requireNonNull(unit, "unit");
checkArgument(timeout >= 0, "timeout must be non-negative");
delegate.setConnectTimeout(timeout, unit);
return this;
}

/**
* Sets the maximum time to wait for new connections to be established. If unset, defaults to
* {@value GrpcExporterBuilder#DEFAULT_CONNECT_TIMEOUT_SECS}s.
*/
public OtlpGrpcLogRecordExporterBuilder setConnectTimeout(Duration timeout) {
requireNonNull(timeout, "timeout");
return setConnectTimeout(timeout.toNanos(), TimeUnit.NANOSECONDS);
}

/**
* Sets the OTLP endpoint to connect to. If unset, defaults to {@value DEFAULT_ENDPOINT_URL}. The
* endpoint must start with either http:// or https://.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,26 @@ public OtlpGrpcMetricExporterBuilder setTimeout(Duration timeout) {
return this;
}

/**
* Sets the maximum time to wait for new connections to be established. If unset, defaults to
* {@value GrpcExporterBuilder#DEFAULT_CONNECT_TIMEOUT_SECS}s.
*/
public OtlpGrpcMetricExporterBuilder setConnectTimeout(long timeout, TimeUnit unit) {
requireNonNull(unit, "unit");
checkArgument(timeout >= 0, "timeout must be non-negative");
delegate.setConnectTimeout(timeout, unit);
return this;
}

/**
* Sets the maximum time to wait for new connections to be established. If unset, defaults to
* {@value GrpcExporterBuilder#DEFAULT_CONNECT_TIMEOUT_SECS}s.
*/
public OtlpGrpcMetricExporterBuilder setConnectTimeout(Duration timeout) {
requireNonNull(timeout, "timeout");
return setConnectTimeout(timeout.toNanos(), TimeUnit.NANOSECONDS);
}

/**
* Sets the OTLP endpoint to connect to. If unset, defaults to {@value DEFAULT_ENDPOINT_URL}. The
* endpoint must start with either http:// or https://.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,26 @@ public OtlpGrpcSpanExporterBuilder setTimeout(Duration timeout) {
return this;
}

/**
* Sets the maximum time to wait for new connections to be established. If unset, defaults to
* {@value GrpcExporterBuilder#DEFAULT_CONNECT_TIMEOUT_SECS}s.
*/
public OtlpGrpcSpanExporterBuilder setConnectTimeout(long timeout, TimeUnit unit) {
requireNonNull(unit, "unit");
checkArgument(timeout >= 0, "timeout must be non-negative");
delegate.setConnectTimeout(timeout, unit);
return this;
}

/**
* Sets the maximum time to wait for new connections to be established. If unset, defaults to
* {@value GrpcExporterBuilder#DEFAULT_CONNECT_TIMEOUT_SECS}s.
*/
public OtlpGrpcSpanExporterBuilder setConnectTimeout(Duration timeout) {
requireNonNull(timeout, "timeout");
return setConnectTimeout(timeout.toNanos(), TimeUnit.NANOSECONDS);
}

/**
* Sets the OTLP endpoint to connect to. If unset, defaults to {@value DEFAULT_ENDPOINT_URL}. The
* endpoint must start with either http:// or https://.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,33 @@ public Stream<? extends Arguments> provideArguments(ExtensionContext context) th
}
}

@Test
@SuppressLogger(GrpcExporter.class)
void connectTimeout() {
// UpstreamGrpcSender doesn't support connectTimeout, so we skip the test
assumeThat(exporter.unwrap())
.extracting("delegate.grpcSender")
.matches(sender -> sender.getClass().getSimpleName().equals("OkHttpGrpcSender"));

TelemetryExporter<T> exporter =
exporterBuilder()
// Connecting to a non-routable IP address to trigger connection error
.setEndpoint("http://10.255.255.1")
.setConnectTimeout(Duration.ofMillis(1))
.build();
try {
long startTimeMillis = System.currentTimeMillis();
CompletableResultCode result =
exporter.export(Collections.singletonList(generateFakeTelemetry()));
assertThat(result.join(10, TimeUnit.SECONDS).isSuccess()).isFalse();
// Assert that the export request fails well before the default connect timeout of 10s
assertThat(System.currentTimeMillis() - startTimeMillis)
.isLessThan(TimeUnit.SECONDS.toMillis(1));
} finally {
exporter.shutdown();
}
}

@Test
void deadlineSetPerExport() throws InterruptedException {
TelemetryExporter<T> exporter =
Expand Down Expand Up @@ -840,6 +867,9 @@ void stringRepresentation() throws IOException, CertificateEncodingException {
+ "timeoutNanos="
+ TimeUnit.SECONDS.toNanos(10)
+ ", "
+ "connectTimeoutNanos="
+ TimeUnit.SECONDS.toNanos(10)
+ ", "
+ "compressorEncoding=null, "
+ "headers=Headers\\{User-Agent=OBFUSCATED\\}"
+ ".*" // Maybe additional grpcChannel field
Expand All @@ -851,6 +881,7 @@ void stringRepresentation() throws IOException, CertificateEncodingException {
telemetryExporter =
exporterBuilder()
.setTimeout(Duration.ofSeconds(5))
.setConnectTimeout(Duration.ofSeconds(4))
.setEndpoint("http://example:4317")
.setCompression("gzip")
.addHeader("foo", "bar")
Expand All @@ -877,6 +908,9 @@ void stringRepresentation() throws IOException, CertificateEncodingException {
+ "timeoutNanos="
+ TimeUnit.SECONDS.toNanos(5)
+ ", "
+ "connectTimeoutNanos="
+ TimeUnit.SECONDS.toNanos(4)
+ ", "
+ "compressorEncoding=gzip, "
+ "headers=Headers\\{.*foo=OBFUSCATED.*\\}, "
+ "retryPolicy=RetryPolicy\\{maxAttempts=2, initialBackoff=PT0\\.05S, maxBackoff=PT3S, backoffMultiplier=1\\.3\\}"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ public TelemetryExporterBuilder<LogRecordData> setConnectTimeout(long timeout, T

@Override
public TelemetryExporterBuilder<LogRecordData> setConnectTimeout(Duration timeout) {
throw new UnsupportedOperationException();
builder.setConnectTimeout(timeout);
return this;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ public TelemetryExporterBuilder<MetricData> setConnectTimeout(long timeout, Time

@Override
public TelemetryExporterBuilder<MetricData> setConnectTimeout(Duration timeout) {
throw new UnsupportedOperationException();
builder.setConnectTimeout(timeout);
return this;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ public TelemetryExporterBuilder<SpanData> setConnectTimeout(long timeout, TimeUn

@Override
public TelemetryExporterBuilder<SpanData> setConnectTimeout(Duration timeout) {
throw new UnsupportedOperationException();
builder.setConnectTimeout(timeout);
return this;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ public <T extends Marshaler> GrpcSender<T> createSender(
String endpointPath,
@Nullable Compressor compressor,
long timeoutNanos,
long connectTimeoutNanos,
Supplier<Map<String, List<String>>> headersSupplier,
@Nullable Object managedChannel,
Supplier<BiFunction<Channel, String, MarshalerServiceStub<T, ?, ?>>> stubFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,14 +75,16 @@ public OkHttpGrpcSender(
String endpoint,
@Nullable Compressor compressor,
long timeoutNanos,
long connectTimeoutNanos,
Supplier<Map<String, List<String>>> headersSupplier,
@Nullable RetryPolicy retryPolicy,
@Nullable SSLContext sslContext,
@Nullable X509TrustManager trustManager) {
OkHttpClient.Builder clientBuilder =
new OkHttpClient.Builder()
.dispatcher(OkHttpUtil.newDispatcher())
.callTimeout(Duration.ofNanos(timeoutNanos));
.callTimeout(Duration.ofNanos(timeoutNanos))
.connectTimeout(Duration.ofNanos(connectTimeoutNanos));
if (retryPolicy != null) {
clientBuilder.addInterceptor(
new RetryInterceptor(retryPolicy, OkHttpGrpcSender::isRetryable));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public <T extends Marshaler> GrpcSender<T> createSender(
String endpointPath,
@Nullable Compressor compressor,
long timeoutNanos,
long connectTimeoutNanos,
Supplier<Map<String, List<String>>> headersSupplier,
@Nullable Object managedChannel,
Supplier<BiFunction<Channel, String, MarshalerServiceStub<T, ?, ?>>> stubFactory,
Expand All @@ -45,6 +46,7 @@ public <T extends Marshaler> GrpcSender<T> createSender(
endpoint.resolve(endpointPath).toString(),
compressor,
timeoutNanos,
connectTimeoutNanos,
headersSupplier,
retryPolicy,
sslContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ void send(OkHttpGrpcSender<DummyMarshaler> sender, Runnable onSuccess, Runnable
@Override
OkHttpGrpcSender<DummyMarshaler> createSender(String endpoint) {
return new OkHttpGrpcSender<>(
"https://localhost", null, 10L, Collections::emptyMap, null, null, null);
"https://localhost", null, 10L, 10L, Collections::emptyMap, null, null, null);
}

protected static class DummyMarshaler extends MarshalerWithSize {
Expand Down

0 comments on commit 1d4a2f4

Please sign in to comment.