Skip to content
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ If you are using Maven without the BOM, add this to your dependencies:
If you are using Gradle 5.x or later, add this to your dependencies:

```Groovy
implementation platform('com.google.cloud:libraries-bom:26.27.0')
implementation platform('com.google.cloud:libraries-bom:26.29.0')

implementation 'com.google.cloud:google-cloud-bigquerystorage'
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -970,7 +970,7 @@ private Boolean retryOnRetryableError(Code errorCode, AppendRequestAndResponse r
lock.lock();
try {
requestWrapper.retryCount++;
if (this.retrySettings != null && errorCode == Code.RESOURCE_EXHAUSTED) {
if (this.retrySettings != null && useBackoffForError(errorCode, streamName)) {
// Trigger exponential backoff in append loop when request is resent for quota errors.
// createNextAttempt correctly initializes the retry delay; createfirstAttempt does not
// include a positive delay, just 0.
Expand Down Expand Up @@ -1148,6 +1148,17 @@ private boolean isConnectionErrorRetriable(Code statusCode) {
|| statusCode == Code.DEADLINE_EXCEEDED;
}

private boolean useBackoffForError(Code statusCode, String streamName) {
// Default stream uses backoff for INTERNAL, as THROTTLED errors are more likely with default
// streams. RESOURCE_EXHAUSTED streams are used for backoff for each stream type.
if (isDefaultStreamName(streamName)) {
if (statusCode == Code.INTERNAL) {
return true;
}
}
return statusCode == Code.RESOURCE_EXHAUSTED;
}

private void doneCallback(Throwable finalStatus) {
log.info(
"Received done callback. Stream: "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2047,6 +2047,44 @@ public void testExclusiveAppendQuotaErrorRetryExponentialBackoff() throws Except
}
}

@Test
public void testAppendInternalErrorRetryExponentialBackoff() throws Exception {
StreamWriter writer = getTestStreamWriterRetryEnabled();

testBigQueryWrite.addResponse(
new DummyResponseSupplierWillFailThenSucceed(
new FakeBigQueryWriteImpl.Response(createAppendResponse(0)),
/* totalFailCount= */ MAX_RETRY_NUM_ATTEMPTS + 1,
com.google.rpc.Status.newBuilder().setCode(Code.INTERNAL.ordinal()).build()));

ApiFuture<AppendRowsResponse> future =
writer.append(createProtoRows(new String[] {String.valueOf(0)}), 0);

ExecutionException ex =
assertThrows(
ExecutionException.class,
() -> {
future.get();
});
assertEquals(
Status.Code.INTERNAL, ((StatusRuntimeException) ex.getCause()).getStatus().getCode());

ArrayList<Instant> instants = testBigQueryWrite.getLatestRequestReceivedInstants();
Instant previousInstant = instants.get(0);
// Include initial attempt
assertEquals(instants.size(), MAX_RETRY_NUM_ATTEMPTS + 1);
double minExpectedDelay = INITIAL_RETRY_MILLIS * 0.95;
for (int i = 1; i < instants.size(); i++) {
Instant currentInstant = instants.get(i);
double differenceInMillis =
java.time.Duration.between(previousInstant, currentInstant).toMillis();
assertThat(differenceInMillis).isAtLeast((double) INITIAL_RETRY_MILLIS);
assertThat(differenceInMillis).isGreaterThan(minExpectedDelay);
minExpectedDelay = minExpectedDelay * RETRY_MULTIPLIER;
previousInstant = currentInstant;
}
}

@Test
public void testAppendSuccessAndNonRetryableError() throws Exception {
StreamWriter writer = getTestStreamWriterRetryEnabled();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1608,53 +1608,4 @@ public void testLargeRequest() throws IOException, InterruptedException, Executi
assertEquals("50", queryIter.next().get(0).getStringValue());
}
}

@Test
public void testDefaultRequestLimit()
throws IOException, InterruptedException, ExecutionException {
DatasetId datasetId =
DatasetId.of("bq-write-api-java-retry-test", RemoteBigQueryHelper.generateDatasetName());
DatasetInfo datasetInfo = DatasetInfo.newBuilder(datasetId).build();
bigquery.create(datasetInfo);
try {
String tableName = "no_error_table";
TableId tableId = TableId.of(datasetId.getProject(), datasetId.getDataset(), tableName);
Field col1 = Field.newBuilder("col1", StandardSQLTypeName.STRING).build();
Schema originalSchema = Schema.of(col1);
TableInfo tableInfo =
TableInfo.newBuilder(tableId, StandardTableDefinition.of(originalSchema)).build();
bigquery.create(tableInfo);
TableName parent = TableName.of(datasetId.getProject(), datasetId.getDataset(), tableName);
try (StreamWriter streamWriter =
StreamWriter.newBuilder(parent.toString() + "/_default")
.setWriterSchema(CreateProtoSchemaWithColField())
.build()) {
ApiFuture<AppendRowsResponse> response =
streamWriter.append(
CreateProtoRows(
new String[] {new String(new char[19 * 1024 * 1024]).replace("\0", "a")}));
try {
response.get();
Assert.fail("Large request should fail with InvalidArgumentError");
} catch (ExecutionException ex) {
assertEquals(io.grpc.StatusRuntimeException.class, ex.getCause().getClass());
io.grpc.StatusRuntimeException actualError =
(io.grpc.StatusRuntimeException) ex.getCause();
// This verifies that the Beam connector can consume this custom exception's grpc
// StatusCode
// TODO(yiru): temp fix to unblock test, while final fix is being rolled out.
if (actualError.getStatus().getCode() != Code.INTERNAL) {
assertEquals(Code.INVALID_ARGUMENT, actualError.getStatus().getCode());
assertThat(
actualError
.getStatus()
.getDescription()
.contains("AppendRows request too large: 19923131 limit 10485760"));
}
}
}
} finally {
RemoteBigQueryHelper.forceDelete(bigquery, datasetId.toString());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,38 @@

package com.google.cloud.bigquery.storage.v1.it;

import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertEquals;

import com.google.api.core.ApiFuture;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.DatasetId;
import com.google.cloud.bigquery.DatasetInfo;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.LegacySQLTypeName;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.StandardSQLTypeName;
import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.bigquery.storage.test.Test.FooType;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.google.cloud.bigquery.storage.v1.ProtoRows;
import com.google.cloud.bigquery.storage.v1.ProtoSchema;
import com.google.cloud.bigquery.storage.v1.StreamWriter;
import com.google.cloud.bigquery.storage.v1.TableName;
import com.google.cloud.bigquery.storage.v1.WriteStream;
import com.google.cloud.bigquery.testing.RemoteBigQueryHelper;
import com.google.protobuf.DescriptorProtos.DescriptorProto;
import com.google.protobuf.DescriptorProtos.FieldDescriptorProto;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import io.grpc.Status.Code;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.logging.Logger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

Expand Down Expand Up @@ -80,6 +97,15 @@ public static void afterClass() {
}
}

ProtoRows CreateProtoRows(String[] messages) {
ProtoRows.Builder rows = ProtoRows.newBuilder();
for (String message : messages) {
FooType foo = FooType.newBuilder().setFoo(message).build();
rows.addSerializedRows(foo.toByteString());
}
return rows.build();
}

@Test
public void testJsonStreamWriterCommittedStreamWithNonQuotaRetry()
throws IOException, InterruptedException, DescriptorValidationException {
Expand All @@ -104,4 +130,88 @@ public void testJsonStreamWriterDefaultStreamWithNonQuotaRetry()
/* requestCount=*/ 901,
/* rowBatchSize=*/ 1);
}

// Moved to ITBigQueryWriteNonQuotaRetryTest from ITBigQueryWriteManualClientTest, as it requires
// usage of the project this file uses to inject errors (bq-write-api-java-retry-test).
@Test
public void testDefaultRequestLimit()
throws IOException, InterruptedException, ExecutionException {
DatasetId datasetId =
DatasetId.of(NON_QUOTA_RETRY_PROJECT_ID, RemoteBigQueryHelper.generateDatasetName());
DatasetInfo datasetInfo = DatasetInfo.newBuilder(datasetId).build();
bigquery.create(datasetInfo);
try {
String tableName = "no_error_table";
TableId tableId = TableId.of(datasetId.getProject(), datasetId.getDataset(), tableName);
Field col1 = Field.newBuilder("col1", StandardSQLTypeName.STRING).build();
Schema originalSchema = Schema.of(col1);
TableInfo tableInfo =
TableInfo.newBuilder(tableId, StandardTableDefinition.of(originalSchema)).build();
bigquery.create(tableInfo);
ProtoSchema schema =
ProtoSchema.newBuilder()
.setProtoDescriptor(
DescriptorProto.newBuilder()
.setName("testProto")
.addField(
FieldDescriptorProto.newBuilder()
.setName("col1")
.setNumber(1)
.setType(FieldDescriptorProto.Type.TYPE_STRING)
.build())
.build())
.build();
TableName parent = TableName.of(datasetId.getProject(), datasetId.getDataset(), tableName);
try (StreamWriter streamWriter =
StreamWriter.newBuilder(parent.toString() + "/_default")
.setWriterSchema(schema)
.build()) {
ApiFuture<AppendRowsResponse> response =
streamWriter.append(
CreateProtoRows(
new String[] {new String(new char[19 * 1024 * 1024]).replace("\0", "a")}));
try {
AppendRowsResponse resp = response.get();
LOG.info(
"Message succeded. Dataset info: "
+ datasetInfo.toString()
+ " tableinfo: "
+ tableInfo.toString()
+ " parent: "
+ parent
+ "streamWriter: "
+ streamWriter.toString()
+ "response: "
+ resp);
Assert.fail("Large request should fail with InvalidArgumentError");
} catch (ExecutionException ex) {
LOG.info(
"Message failed. Dataset info: "
+ datasetInfo.toString()
+ " tableinfo: "
+ tableInfo.toString()
+ " parent: "
+ parent
+ "streamWriter: "
+ streamWriter);
assertEquals(io.grpc.StatusRuntimeException.class, ex.getCause().getClass());
io.grpc.StatusRuntimeException actualError =
(io.grpc.StatusRuntimeException) ex.getCause();
// This verifies that the Beam connector can consume this custom exception's grpc
// StatusCode
// TODO(yiru): temp fix to unblock test, while final fix is being rolled out.
if (actualError.getStatus().getCode() != Code.INTERNAL) {
assertEquals(Code.INVALID_ARGUMENT, actualError.getStatus().getCode());
assertThat(
actualError
.getStatus()
.getDescription()
.contains("AppendRows request too large: 19923131 limit 10485760"));
}
}
}
} finally {
RemoteBigQueryHelper.forceDelete(bigquery, datasetId.toString());
}
}
}