Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
408f82e
Add integration tests with RetrySettings enabled.
Oct 13, 2023
ab069b5
Fix formatting
Oct 13, 2023
51f1f51
Add quota and non-quota e2e tests.
Oct 30, 2023
8c82567
Cleanup
Oct 30, 2023
d904fa6
Merge branch 'googleapis:main' into retry-tests
egreco12 Oct 31, 2023
9abfa88
Revert sample changes to keep this branch only for e2e tests
Oct 31, 2023
3bea4df
Remove additional retry-specific sample code
Oct 31, 2023
97614b1
Remove backoff-related retry settings for non quota tests
Oct 31, 2023
52cb39e
Update kokoro build to ignore retry tests until retry-specific Kokoro…
Oct 31, 2023
9e8506e
Run format
Oct 31, 2023
872acff
Wrap -Dtest args
Oct 31, 2023
610f032
Fix integration command
Oct 31, 2023
18f442c
Fix integration command
Oct 31, 2023
0bbc3ff
Remove -Dtest arg
Nov 1, 2023
f39a19d
Fix integration test to ignore retry tests
Nov 3, 2023
5401143
Use list instead of regex for ignoring retry tests when integration t…
Nov 3, 2023
5984f9f
Fix typo in integration test command
Nov 3, 2023
8f11c10
Merge branch 'googleapis:main' into retry-tests
egreco12 Nov 6, 2023
5e86a16
Merge branch 'googleapis:main' into retry-tests
egreco12 Nov 6, 2023
8a44d41
Fix ignore retry test settings
Nov 8, 2023
8432f80
Add debug logs to see why test fails in github
Nov 8, 2023
385d3ea
Remove log
Nov 8, 2023
1de97ce
Add additional retry-based logging
Nov 8, 2023
3e2ed81
Remove unused profile
Nov 8, 2023
a46d327
Add more debugging logs for connection worker test
Nov 8, 2023
4b0d1e1
rearrange builder order
Nov 8, 2023
fdf20d6
Remove debug log
Nov 8, 2023
d083ac0
Directly add streamwriter to list in connection worker pool test
Nov 8, 2023
b341de5
Merge branch 'googleapis:main' into retry-tests
egreco12 Nov 8, 2023
7e29b8e
Refactor retry tests into helper class
Nov 9, 2023
4803de7
Fix file headers
Nov 9, 2023
057cf8c
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Nov 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Refactor retry tests into helper class
  • Loading branch information
Evan Greco committed Nov 9, 2023
commit 7e29b8eb9625c3fac1d869766810defd32b0206d
Original file line number Diff line number Diff line change
Expand Up @@ -16,52 +16,33 @@

package com.google.cloud.bigquery.storage.v1.it;

import static org.junit.Assert.assertFalse;

import com.google.api.core.ApiFuture;
import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.DatasetInfo;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.LegacySQLTypeName;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.StandardSQLTypeName;
import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1.JsonStreamWriter;
import com.google.cloud.bigquery.storage.v1.TableFieldSchema;
import com.google.cloud.bigquery.storage.v1.TableName;
import com.google.cloud.bigquery.storage.v1.TableSchema;
import com.google.cloud.bigquery.storage.v1.WriteStream;
import com.google.cloud.bigquery.testing.RemoteBigQueryHelper;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
import java.util.logging.Logger;
import org.json.JSONArray;
import org.json.JSONObject;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.threeten.bp.Duration;

/** Integration tests for BigQuery Write API. */
public class ITBigQueryWriteNonQuotaRetryTest {
private static final Logger LOG =
Logger.getLogger(ITBigQueryWriteNonQuotaRetryTest.class.getName());
private static final Logger LOG = Logger.getLogger(ITBigQueryWriteQuotaRetryTest.class.getName());
private static final String DATASET = RemoteBigQueryHelper.generateDatasetName();
private static final String TABLE = "testtable";
private static final String DESCRIPTION = "BigQuery Write Java manual client test dataset";
// This project is configured on the server to inject INTERNAL in-stream errors every 10 messages.
// This is done to verify in-stream message retries.
// This project is configured on the server to inject INTERNAL in-stream errors every
// 10 messages. This is done to verify in-stream message retries.
private static final String NON_QUOTA_RETRY_PROJECT_ID = "bq-write-api-java-retry-test";

private static BigQueryWriteClient client;
private static BigQuery bigquery;

Expand Down Expand Up @@ -102,109 +83,25 @@ public static void afterClass() {
@Test
public void testJsonStreamWriterCommittedStreamWithNonQuotaRetry()
throws IOException, InterruptedException, DescriptorValidationException {
RetrySettings retrySettings = RetrySettings.newBuilder().setMaxAttempts(5).build();
String tableName = "CommittedRetry";
TableId tableId = TableId.of(DATASET, tableName);
Field col1 = Field.newBuilder("col1", StandardSQLTypeName.STRING).build();
Schema schema = Schema.of(col1);
TableInfo tableInfo = TableInfo.newBuilder(tableId, StandardTableDefinition.of(schema)).build();
bigquery.create(tableInfo);
TableName parent = TableName.of(NON_QUOTA_RETRY_PROJECT_ID, DATASET, tableName);

WriteStream writeStream =
client.createWriteStream(
CreateWriteStreamRequest.newBuilder()
.setParent(parent.toString())
.setWriteStream(
WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build())
.build());
int totalRequest = 901;
int rowBatch = 1;
ArrayList<ApiFuture<AppendRowsResponse>> allResponses = new ArrayList<>(totalRequest);
try (JsonStreamWriter jsonStreamWriter =
JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema())
.setRetrySettings(retrySettings)
.build()) {
for (int k = 0; k < totalRequest; k++) {
JSONObject row = new JSONObject();
row.put("col1", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa");
JSONArray jsonArr = new JSONArray();
// 3MB batch.
for (int j = 0; j < rowBatch; j++) {
jsonArr.put(row);
}
LOG.info("Appending: " + k + "/" + totalRequest);
allResponses.add(jsonStreamWriter.append(jsonArr, k * rowBatch));
}
LOG.info("Waiting for all responses to come back");
for (int i = 0; i < totalRequest; i++) {
LOG.info("Waiting for request " + i);
try {
Assert.assertEquals(
allResponses.get(i).get().getAppendResult().getOffset().getValue(), i * rowBatch);
} catch (ExecutionException ex) {
Assert.fail("Unexpected error " + ex);
}
}
}
WriteRetryTestUtil.runExclusiveRetryTest(
bigquery,
client,
DATASET,
NON_QUOTA_RETRY_PROJECT_ID,
WriteStream.Type.COMMITTED,
/* requestCount=*/ 901,
/* rowBatchSize=*/ 1);
}

@Test
public void testJsonStreamWriterDefaultStreamWithNonQuotaRetry()
throws IOException, InterruptedException, DescriptorValidationException {
RetrySettings retrySettings =
RetrySettings.newBuilder()
.setInitialRetryDelay(Duration.ofMillis(500))
.setRetryDelayMultiplier(1.1)
.setMaxAttempts(5)
.setMaxRetryDelay(Duration.ofMinutes(1))
.build();
String tableName = "JsonTableDefaultStream";
TableFieldSchema TEST_STRING =
TableFieldSchema.newBuilder()
.setType(TableFieldSchema.Type.STRING)
.setMode(TableFieldSchema.Mode.NULLABLE)
.setName("test_str")
.build();
TableSchema tableSchema = TableSchema.newBuilder().addFields(0, TEST_STRING).build();
TableInfo tableInfo =
TableInfo.newBuilder(
TableId.of(DATASET, tableName),
StandardTableDefinition.of(
Schema.of(Field.newBuilder("test_str", StandardSQLTypeName.STRING).build())))
.build();

bigquery.create(tableInfo);
TableName parent = TableName.of(NON_QUOTA_RETRY_PROJECT_ID, DATASET, tableName);

int totalRequest = 901;
int rowBatch = 1;
ArrayList<ApiFuture<AppendRowsResponse>> allResponses = new ArrayList<>(totalRequest);
try (JsonStreamWriter jsonStreamWriter =
JsonStreamWriter.newBuilder(parent.toString(), tableSchema)
.setIgnoreUnknownFields(true)
.setRetrySettings(retrySettings)
.build()) {
for (int k = 0; k < totalRequest; k++) {
JSONObject row = new JSONObject();
row.put("test_str", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa");
JSONArray jsonArr = new JSONArray();
// 3MB batch.
for (int j = 0; j < rowBatch; j++) {
jsonArr.put(row);
}
LOG.info("Appending: " + k + "/" + totalRequest);
allResponses.add(jsonStreamWriter.append(jsonArr));
}
LOG.info("Waiting for all responses to come back");
for (int i = 0; i < totalRequest; i++) {
LOG.info("Waiting for request " + i);
try {
assertFalse(allResponses.get(i).get().hasError());
} catch (Exception ex) {
Assert.fail("Unexpected error " + ex);
}
}
}
WriteRetryTestUtil.runDefaultRetryTest(
bigquery,
client,
DATASET,
NON_QUOTA_RETRY_PROJECT_ID,
/* requestCount=*/ 901,
/* rowBatchSize=*/ 1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,40 +16,23 @@

package com.google.cloud.bigquery.storage.v1.it;

import static org.junit.Assert.assertFalse;

import com.google.api.core.ApiFuture;
import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.DatasetInfo;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.LegacySQLTypeName;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.StandardSQLTypeName;
import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1.JsonStreamWriter;
import com.google.cloud.bigquery.storage.v1.TableFieldSchema;
import com.google.cloud.bigquery.storage.v1.TableName;
import com.google.cloud.bigquery.storage.v1.TableSchema;
import com.google.cloud.bigquery.storage.v1.WriteStream;
import com.google.cloud.bigquery.testing.RemoteBigQueryHelper;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.ExecutionException;
import java.util.logging.Logger;
import org.json.JSONArray;
import org.json.JSONObject;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.threeten.bp.Duration;

/** Integration tests for BigQuery Write API. */
public class ITBigQueryWriteQuotaRetryTest {
Expand Down Expand Up @@ -98,117 +81,27 @@ public static void afterClass() {
}

@Test
public void testJsonStreamWriterCommittedStreamWithNonQuotaRetry()
public void testJsonStreamWriterCommittedStreamWithQuotaRetry()
throws IOException, InterruptedException, DescriptorValidationException {
RetrySettings retrySettings =
RetrySettings.newBuilder()
.setInitialRetryDelay(Duration.ofMillis(500))
.setRetryDelayMultiplier(1.1)
.setMaxAttempts(5)
.setMaxRetryDelay(Duration.ofMinutes(1))
.build();
String tableName = "CommittedRetry";
TableId tableId = TableId.of(DATASET, tableName);
Field col1 = Field.newBuilder("col1", StandardSQLTypeName.STRING).build();
Schema schema = Schema.of(col1);
TableInfo tableInfo = TableInfo.newBuilder(tableId, StandardTableDefinition.of(schema)).build();
bigquery.create(tableInfo);
TableName parent = TableName.of(QUOTA_RETRY_PROJECT_ID, DATASET, tableName);

WriteStream writeStream =
client.createWriteStream(
CreateWriteStreamRequest.newBuilder()
.setParent(parent.toString())
.setWriteStream(
WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build())
.build());
int totalRequest = 901;
int rowBatch = 1;
ArrayList<ApiFuture<AppendRowsResponse>> allResponses = new ArrayList<>(totalRequest);
try (JsonStreamWriter jsonStreamWriter =
JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema())
.setRetrySettings(retrySettings)
.build()) {
for (int k = 0; k < totalRequest; k++) {
JSONObject row = new JSONObject();
row.put("col1", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa");
JSONArray jsonArr = new JSONArray();
// 3MB batch.
for (int j = 0; j < rowBatch; j++) {
jsonArr.put(row);
}
LOG.info("Appending: " + k + "/" + totalRequest);
allResponses.add(jsonStreamWriter.append(jsonArr, k * rowBatch));
}
LOG.info("Waiting for all responses to come back");
for (int i = 0; i < totalRequest; i++) {
LOG.info("Waiting for request " + i);
try {
Assert.assertEquals(
allResponses.get(i).get().getAppendResult().getOffset().getValue(), i * rowBatch);
} catch (ExecutionException ex) {
Assert.fail("Unexpected error " + ex);
}
}
}
WriteRetryTestUtil.runExclusiveRetryTest(
bigquery,
client,
DATASET,
QUOTA_RETRY_PROJECT_ID,
WriteStream.Type.COMMITTED,
/* requestCount=*/ 901,
/* rowBatchSize=*/ 1);
}

@Test
public void testJsonStreamWriterDefaultStreamWithNonQuotaRetry()
public void testJsonStreamWriterDefaultStreamWithQuotaRetry()
throws IOException, InterruptedException, DescriptorValidationException {
RetrySettings retrySettings =
RetrySettings.newBuilder()
.setInitialRetryDelay(Duration.ofMillis(500))
.setRetryDelayMultiplier(1.1)
.setMaxAttempts(5)
.setMaxRetryDelay(Duration.ofMinutes(1))
.build();
String tableName = "JsonTableDefaultStream";
TableFieldSchema TEST_STRING =
TableFieldSchema.newBuilder()
.setType(TableFieldSchema.Type.STRING)
.setMode(TableFieldSchema.Mode.NULLABLE)
.setName("test_str")
.build();
TableSchema tableSchema = TableSchema.newBuilder().addFields(0, TEST_STRING).build();
TableInfo tableInfo =
TableInfo.newBuilder(
TableId.of(DATASET, tableName),
StandardTableDefinition.of(
Schema.of(Field.newBuilder("test_str", StandardSQLTypeName.STRING).build())))
.build();

bigquery.create(tableInfo);
TableName parent = TableName.of(QUOTA_RETRY_PROJECT_ID, DATASET, tableName);

int totalRequest = 901;
int rowBatch = 1;
ArrayList<ApiFuture<AppendRowsResponse>> allResponses = new ArrayList<>(totalRequest);
try (JsonStreamWriter jsonStreamWriter =
JsonStreamWriter.newBuilder(parent.toString(), tableSchema)
.setIgnoreUnknownFields(true)
.setRetrySettings(retrySettings)
.build()) {
for (int k = 0; k < totalRequest; k++) {
JSONObject row = new JSONObject();
row.put("test_str", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa");
JSONArray jsonArr = new JSONArray();
// 3MB batch.
for (int j = 0; j < rowBatch; j++) {
jsonArr.put(row);
}
LOG.info("Appending: " + k + "/" + totalRequest);
allResponses.add(jsonStreamWriter.append(jsonArr));
}
LOG.info("Waiting for all responses to come back");
for (int i = 0; i < totalRequest; i++) {
LOG.info("Waiting for request " + i);
try {
assertFalse(allResponses.get(i).get().hasError());
} catch (Exception ex) {
Assert.fail("Unexpected error " + ex);
}
}
}
WriteRetryTestUtil.runDefaultRetryTest(
bigquery,
client,
DATASET,
QUOTA_RETRY_PROJECT_ID,
/* requestCount=*/ 901,
/* rowBatchSize=*/ 1);
}
}
Loading