+ getMissingValueInterpretationMap() {
+ return streamWriter.getMissingValueInterpretationMap();
+ }
+
+ /** Sets all StreamWriter settings. */
+ private void setStreamWriterSettings(
+ @Nullable TransportChannelProvider channelProvider,
+ @Nullable CredentialsProvider credentialsProvider,
+ @Nullable ExecutorProvider executorProvider,
+ @Nullable String endpoint,
+ @Nullable FlowControlSettings flowControlSettings,
+ @Nullable String traceIdBase,
+ @Nullable String traceId) {
+ if (channelProvider != null) {
+ streamWriterBuilder.setChannelProvider(channelProvider);
+ }
+ if (credentialsProvider != null) {
+ streamWriterBuilder.setCredentialsProvider(credentialsProvider);
+ }
+ if (executorProvider != null) {
+ streamWriterBuilder.setExecutorProvider(executorProvider);
+ }
+ if (endpoint != null) {
+ streamWriterBuilder.setEndpoint(endpoint);
+ }
+ if (traceIdBase != null) {
+ if (traceId != null) {
+ streamWriterBuilder.setTraceId(traceIdBase + "_" + traceId);
+ } else {
+ streamWriterBuilder.setTraceId(traceIdBase + ":null");
+ }
+ } else {
+ if (traceId != null) {
+ streamWriterBuilder.setTraceId("SchemaAwareStreamWriter_" + traceId);
+ } else {
+ streamWriterBuilder.setTraceId("SchemaAwareStreamWriter:null");
+ }
+ }
+ if (flowControlSettings != null) {
+ if (flowControlSettings.getMaxOutstandingRequestBytes() != null) {
+ streamWriterBuilder.setMaxInflightBytes(
+ flowControlSettings.getMaxOutstandingRequestBytes());
+ }
+ if (flowControlSettings.getMaxOutstandingElementCount() != null) {
+ streamWriterBuilder.setMaxInflightRequests(
+ flowControlSettings.getMaxOutstandingElementCount());
+ }
+ if (flowControlSettings.getLimitExceededBehavior() != null) {
+ streamWriterBuilder.setLimitExceededBehavior(
+ flowControlSettings.getLimitExceededBehavior());
+ }
+ }
+ }
+
+ /**
+ * newBuilder that constructs a SchemaAwareStreamWriter builder with BigQuery client being
+ * initialized by StreamWriter by default.
+ *
+ * The table schema passed in will be updated automatically when there is a schema update
+ * event. When used for Writer creation, it should be the latest schema. So when you are trying to
+ * reuse a stream, you should use Builder newBuilder( String streamOrTableName,
+ * BigQueryWriteClient client) instead, so the created Writer will be based on a fresh schema.
+ *
+ * @param streamOrTableName name of the stream that must follow
+ * "projects/[^/]+/datasets/[^/]+/tables/[^/]+/streams/[^/]+" or table name
+ * "projects/[^/]+/datasets/[^/]+/tables/[^/]+"
+ * @param tableSchema The schema of the table when the stream was created, which is passed back
+ * through {@code WriteStream}
+ * @return Builder
+ */
+ public static Builder newBuilder(
+ String streamOrTableName, TableSchema tableSchema, ToProtoConverter toProtoConverter) {
+ Preconditions.checkNotNull(streamOrTableName, "StreamOrTableName is null.");
+ Preconditions.checkNotNull(tableSchema, "TableSchema is null.");
+ Preconditions.checkNotNull(toProtoConverter, "ToProtoConverter is null.");
+ return new Builder<>(streamOrTableName, tableSchema, null, toProtoConverter);
+ }
+
+ /**
+ * newBuilder that constructs a SchemaAwareStreamWriter builder.
+ *
+ * The table schema passed in will be updated automatically when there is a schema update
+ * event. When used for Writer creation, it should be the latest schema. So when you are trying to
+ * reuse a stream, you should use Builder newBuilder( String streamOrTableName,
+ * BigQueryWriteClient client) instead, so the created Writer will be based on a fresh schema.
+ *
+ * @param streamOrTableName name of the stream that must follow
+ * "projects/[^/]+/datasets/[^/]+/tables/[^/]+/streams/[^/]+"
+ * @param tableSchema The schema of the table when the stream was created, which is passed back
+ * through {@code WriteStream}
+ * @param client
+ * @return Builder
+ */
+ public static Builder newBuilder(
+ String streamOrTableName,
+ TableSchema tableSchema,
+ BigQueryWriteClient client,
+ ToProtoConverter toProtoConverter) {
+ Preconditions.checkNotNull(streamOrTableName, "StreamOrTableName is null.");
+ Preconditions.checkNotNull(tableSchema, "TableSchema is null.");
+ Preconditions.checkNotNull(client, "BigQuery client is null.");
+ Preconditions.checkNotNull(toProtoConverter, "ToProtoConverter is null.");
+ return new Builder<>(streamOrTableName, tableSchema, client, toProtoConverter);
+ }
+
+ /**
+ * newBuilder that constructs a SchemaAwareStreamWriter builder with TableSchema being initialized
+ * by StreamWriter by default.
+ *
+ * @param streamOrTableName name of the stream that must follow
+ * "projects/[^/]+/datasets/[^/]+/tables/[^/]+/streams/[^/]+"
+ * @param client BigQueryWriteClient
+ * @return Builder
+ */
+ public static Builder newBuilder(
+ String streamOrTableName, BigQueryWriteClient client, ToProtoConverter toProtoConverter) {
+ Preconditions.checkNotNull(streamOrTableName, "StreamOrTableName is null.");
+ Preconditions.checkNotNull(client, "BigQuery client is null.");
+ Preconditions.checkNotNull(toProtoConverter, "ToProtoConverter is null.");
+ return new Builder<>(streamOrTableName, null, client, toProtoConverter);
+ }
+
+ /** Closes the underlying StreamWriter. */
+ @Override
+ public void close() {
+ this.streamWriter.close();
+ }
+
+ /**
+ * @return if a writer can no longer be used for writing. It is due to either the
+ * SchemaAwareStreamWriter is explicitly closed or the underlying connection is broken when
+ * connection pool is not used. Client should recreate SchemaAwareStreamWriter in this case.
+ */
+ public boolean isClosed() {
+ return this.streamWriter.isClosed();
+ }
+
+ /** @return if user explicitly closed the writer. */
+ public boolean isUserClosed() {
+ return this.streamWriter.isUserClosed();
+ }
+
+ public static final class Builder {
+ private final String streamName;
+ private final BigQueryWriteClient client;
+ private final TableSchema tableSchema;
+
+ private final ToProtoConverter toProtoConverter;
+ private TransportChannelProvider channelProvider;
+ private CredentialsProvider credentialsProvider;
+ private ExecutorProvider executorProvider;
+ private FlowControlSettings flowControlSettings;
+ private String endpoint;
+ private String traceIdBase;
+ private String traceId;
+ private boolean ignoreUnknownFields = false;
+ // Indicates whether multiplexing mode is enabled.
+ private boolean enableConnectionPool = false;
+ private String location;
+
+ private static final String streamPatternString =
+ "(projects/[^/]+/datasets/[^/]+/tables/[^/]+)/streams/[^/]+";
+ private static final String tablePatternString = "(projects/[^/]+/datasets/[^/]+/tables/[^/]+)";
+
+ private static final Pattern streamPattern = Pattern.compile(streamPatternString);
+ private static final Pattern tablePattern = Pattern.compile(tablePatternString);
+
+ /**
+ * Constructor for SchemaAwareStreamWriter's Builder
+ *
+ * @param streamOrTableName name of the stream that must follow
+ * "projects/[^/]+/datasets/[^/]+/tables/[^/]+/streams/[^/]+" or
+ * "projects/[^/]+/datasets/[^/]+/tables/[^/]+"
+ * @param tableSchema schema used to convert items to proto messages.
+ * @param client
+ * @param toProtoConverter converter used to convert items to proto messages
+ */
+ private Builder(
+ String streamOrTableName,
+ TableSchema tableSchema,
+ BigQueryWriteClient client,
+ ToProtoConverter toProtoConverter) {
+ Matcher streamMatcher = streamPattern.matcher(streamOrTableName);
+ if (!streamMatcher.matches()) {
+ Matcher tableMatcher = tablePattern.matcher(streamOrTableName);
+ if (!tableMatcher.matches()) {
+ throw new IllegalArgumentException("Invalid name: " + streamOrTableName);
+ } else {
+ this.streamName = streamOrTableName + "/_default";
+ }
+ } else {
+ this.streamName = streamOrTableName;
+ }
+ this.client = client;
+ if (tableSchema == null) {
+ GetWriteStreamRequest writeStreamRequest =
+ GetWriteStreamRequest.newBuilder()
+ .setName(this.getStreamName())
+ .setView(WriteStreamView.FULL)
+ .build();
+
+ WriteStream writeStream = this.client.getWriteStream(writeStreamRequest);
+
+ this.tableSchema = writeStream.getTableSchema();
+ this.location = writeStream.getLocation();
+ } else {
+ this.tableSchema = tableSchema;
+ }
+ this.toProtoConverter = toProtoConverter;
+ }
+
+ /**
+ * Setter for the underlying StreamWriter's TransportChannelProvider.
+ *
+ * @param channelProvider
+ * @return Builder
+ */
+ public Builder setChannelProvider(TransportChannelProvider channelProvider) {
+ this.channelProvider =
+ Preconditions.checkNotNull(channelProvider, "ChannelProvider is null.");
+ return this;
+ }
+
+ /**
+ * Setter for the underlying StreamWriter's CredentialsProvider.
+ *
+ * @param credentialsProvider
+ * @return Builder
+ */
+ public Builder setCredentialsProvider(CredentialsProvider credentialsProvider) {
+ this.credentialsProvider =
+ Preconditions.checkNotNull(credentialsProvider, "CredentialsProvider is null.");
+ return this;
+ }
+
+ /**
+ * Setter for the underlying StreamWriter's ExecutorProvider.
+ *
+ * @param executorProvider
+ * @return
+ */
+ public Builder setExecutorProvider(ExecutorProvider executorProvider) {
+ this.executorProvider =
+ Preconditions.checkNotNull(executorProvider, "ExecutorProvider is null.");
+ return this;
+ }
+
+ /**
+ * Setter for the underlying StreamWriter's FlowControlSettings.
+ *
+ * @param flowControlSettings
+ * @return Builder
+ */
+ public Builder setFlowControlSettings(FlowControlSettings flowControlSettings) {
+ this.flowControlSettings =
+ Preconditions.checkNotNull(flowControlSettings, "FlowControlSettings is null.");
+ return this;
+ }
+
+ /**
+ * Stream name on the builder.
+ *
+ * @return Builder
+ */
+ public String getStreamName() {
+ return streamName;
+ }
+
+ /**
+ * Setter for the underlying StreamWriter's Endpoint.
+ *
+ * @param endpoint
+ * @return Builder
+ */
+ public Builder setEndpoint(String endpoint) {
+ this.endpoint = Preconditions.checkNotNull(endpoint, "Endpoint is null.");
+ return this;
+ }
+
+ /**
+ * Setter for a traceId to help identify traffic origin.
+ *
+ * @param traceId
+ * @return Builder
+ */
+ public Builder setTraceId(String traceId) {
+ this.traceId = Preconditions.checkNotNull(traceId, "TraceId is null.");
+ return this;
+ }
+
+ /**
+ * Setter for a traceIdBase to help identify traffic origin.
+ *
+ * @param traceIdBase
+ * @return Builder
+ */
+ public Builder setTraceIdBase(String traceIdBase) {
+ this.traceIdBase = Preconditions.checkNotNull(traceIdBase, "TraceIdBase is null.");
+ return this;
+ }
+
+ /**
+ * Setter for a ignoreUnknownFields, if true, unknown fields to BigQuery will be ignored instead
+ * of error out.
+ *
+ * @param ignoreUnknownFields
+ * @return Builder
+ */
+ public Builder setIgnoreUnknownFields(boolean ignoreUnknownFields) {
+ this.ignoreUnknownFields = ignoreUnknownFields;
+ return this;
+ }
+
+ /**
+ * Enable multiplexing for this writer. In multiplexing mode tables will share the same
+ * connection if possible until the connection is overwhelmed. This feature is still under
+ * development, please contact write api team before using.
+ *
+ * @param enableConnectionPool
+ * @return Builder
+ */
+ public Builder setEnableConnectionPool(boolean enableConnectionPool) {
+ this.enableConnectionPool = enableConnectionPool;
+ return this;
+ }
+
+ /**
+ * Location of the table this stream writer is targeting. Connection pools are shared by
+ * location.
+ *
+ * @param location
+ * @return Builder
+ */
+ public Builder setLocation(String location) {
+ if (this.location != null && !this.location.equals(location)) {
+ throw new IllegalArgumentException(
+ "Specified location " + location + " does not match the system value " + this.location);
+ }
+ this.location = location;
+ return this;
+ }
+
+ /**
+ * Builds SchemaAwareStreamWriter
+ *
+ * @return SchemaAwareStreamWriter
+ */
+ public SchemaAwareStreamWriter build()
+ throws DescriptorValidationException, IllegalArgumentException, IOException,
+ InterruptedException {
+ return new SchemaAwareStreamWriter<>(this);
+ }
+ }
+}
diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java
index b21a52a63d..bfa30c6141 100644
--- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java
+++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java
@@ -518,6 +518,16 @@ public synchronized TableSchema getUpdatedSchema() {
: null;
}
+ /**
+ * Sets the maximum time a request is allowed to be waiting in request waiting queue. Under very
+ * low chance, it's possible for append request to be waiting indefintely for request callback
+ * when Google networking SDK does not detect the networking breakage. The default timeout is 15
+ * minutes. We are investigating the root cause for callback not triggered by networking SDK.
+ */
+ public static void setMaxRequestCallbackWaitTime(Duration waitTime) {
+ ConnectionWorker.MAXIMUM_REQUEST_CALLBACK_WAIT_TIME = waitTime;
+ }
+
long getCreationTimestamp() {
return creationTimestamp;
}
diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ToProtoConverter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ToProtoConverter.java
new file mode 100644
index 0000000000..ca17ed11e7
--- /dev/null
+++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ToProtoConverter.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2021 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigquery.storage.v1;
+
+import com.google.protobuf.Descriptors;
+import com.google.protobuf.DynamicMessage;
+
+public interface ToProtoConverter {
+ DynamicMessage convertToProtoMessage(
+ Descriptors.Descriptor protoSchema,
+ TableSchema tableSchema,
+ T inputObject,
+ boolean ignoreUnknownFields);
+}
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java
index 50532b1e0a..5b01c5a0a2 100644
--- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonStreamWriterTest.java
@@ -23,11 +23,9 @@
import com.google.api.core.ApiFuture;
import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.batching.FlowController;
-import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.api.gax.core.NoCredentialsProvider;
import com.google.api.gax.grpc.testing.LocalChannelProvider;
-import com.google.api.gax.grpc.testing.MockGrpcService;
import com.google.api.gax.grpc.testing.MockServiceHelper;
import com.google.cloud.bigquery.storage.test.JsonTest;
import com.google.cloud.bigquery.storage.test.SchemaTest;
@@ -52,7 +50,6 @@
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
-import java.util.logging.Logger;
import org.json.JSONArray;
import org.json.JSONObject;
import org.junit.After;
@@ -67,13 +64,10 @@
@RunWith(JUnit4.class)
public class JsonStreamWriterTest {
- private static final Logger LOG = Logger.getLogger(JsonStreamWriterTest.class.getName());
- private static int NUMERIC_SCALE = 9;
+ private static final int NUMERIC_SCALE = 9;
private static final String TEST_STREAM = "projects/p/datasets/d/tables/t/streams/_default";
private static final String TEST_STREAM_2 = "projects/p/datasets/d2/tables/t2/streams/_default";
private static final String TEST_TABLE = "projects/p/datasets/d/tables/t";
- private static final ExecutorProvider SINGLE_THREAD_EXECUTOR =
- InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(1).build();
private static LocalChannelProvider channelProvider;
private FakeScheduledExecutorService fakeExecutor;
private FakeBigQueryWrite testBigQueryWrite;
@@ -136,8 +130,7 @@ public JsonStreamWriterTest() throws DescriptorValidationException {}
public void setUp() throws Exception {
testBigQueryWrite = new FakeBigQueryWrite();
serviceHelper =
- new MockServiceHelper(
- UUID.randomUUID().toString(), Arrays.asList(testBigQueryWrite));
+ new MockServiceHelper(UUID.randomUUID().toString(), Arrays.asList(testBigQueryWrite));
serviceHelper.start();
channelProvider = serviceHelper.createChannelProvider();
fakeExecutor = new FakeScheduledExecutorService();
@@ -638,7 +631,7 @@ public void testCreateDefaultStream_withNoClientPassedIn() throws Exception {
}
@Test
- public void testCreateDefaultStreamWrongLocation() throws Exception {
+ public void testCreateDefaultStreamWrongLocation() {
TableSchema tableSchema =
TableSchema.newBuilder().addFields(0, TEST_INT).addFields(1, TEST_STRING).build();
testBigQueryWrite.addResponse(
@@ -1098,7 +1091,7 @@ public void testWithoutIgnoreUnknownFieldsUpdateFail() throws Exception {
Assert.fail("expected ExecutionException");
} catch (AppendSerializationError ex) {
assertEquals(
- "JSONObject has fields unknown to BigQuery: root.test_unknown.",
+ "The source object has fields unknown to BigQuery: root.test_unknown.",
ex.getRowIndexToErrorMessage().get(1));
assertEquals(TEST_STREAM, ex.getStreamName());
}
@@ -1219,7 +1212,7 @@ public void testMultipleAppendSerializationErrors()
appendSerializationError.getRowIndexToErrorMessage();
assertEquals(2, rowIndexToErrorMessage.size());
assertEquals(
- "JSONObject has fields unknown to BigQuery: root.not_foo.",
+ "The source object has fields unknown to BigQuery: root.not_foo.",
rowIndexToErrorMessage.get(0));
assertEquals(
"Field root.foo failed to convert to STRING. Error: JSONObject does not have a string field at root.foo.",
@@ -1310,7 +1303,7 @@ public void testAppendWithMissingValueMap() throws Exception {
try (JsonStreamWriter writer =
getTestJsonStreamWriterBuilder(TEST_STREAM, tableSchema).setTraceId("test:empty").build()) {
- Map missingValueMap = new HashMap();
+ Map missingValueMap = new HashMap<>();
missingValueMap.put("col1", AppendRowsRequest.MissingValueInterpretation.NULL_VALUE);
missingValueMap.put("col3", AppendRowsRequest.MissingValueInterpretation.DEFAULT_VALUE);
writer.setMissingValueInterpretationMap(missingValueMap);
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonToProtoMessageTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonToProtoMessageTest.java
index 91785ce0ec..5c44d014d4 100644
--- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonToProtoMessageTest.java
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/JsonToProtoMessageTest.java
@@ -541,7 +541,7 @@ public void testDifferentNameCasing() throws Exception {
json.put("inT", 1);
json.put("lONg", 1L);
DynamicMessage protoMsg =
- JsonToProtoMessage.convertJsonToProtoMessage(TestInt64.getDescriptor(), json);
+ JsonToProtoMessage.INSTANCE.convertToProtoMessage(TestInt64.getDescriptor(), json);
assertEquals(expectedProto, protoMsg);
}
@@ -554,7 +554,7 @@ public void testBool() throws Exception {
json.put("uppercase", "TRUE");
json.put("lowercase", "false");
DynamicMessage protoMsg =
- JsonToProtoMessage.convertJsonToProtoMessage(TestBool.getDescriptor(), json);
+ JsonToProtoMessage.INSTANCE.convertToProtoMessage(TestBool.getDescriptor(), json);
assertEquals(expectedProto, protoMsg);
}
@@ -569,7 +569,7 @@ public void testInt64() throws Exception {
json.put("long", 1L);
json.put("string", "1");
DynamicMessage protoMsg =
- JsonToProtoMessage.convertJsonToProtoMessage(TestInt64.getDescriptor(), json);
+ JsonToProtoMessage.INSTANCE.convertToProtoMessage(TestInt64.getDescriptor(), json);
assertEquals(expectedProto, protoMsg);
}
@@ -583,7 +583,7 @@ public void testInt32() throws Exception {
json.put("int", 1);
json.put("string", 1);
DynamicMessage protoMsg =
- JsonToProtoMessage.convertJsonToProtoMessage(TestInt32.getDescriptor(), json);
+ JsonToProtoMessage.INSTANCE.convertToProtoMessage(TestInt32.getDescriptor(), json);
assertEquals(expectedProto, protoMsg);
}
@@ -595,7 +595,7 @@ public void testInt32NotMatchInt64() throws Exception {
json.put("int", 1L);
try {
DynamicMessage protoMsg =
- JsonToProtoMessage.convertJsonToProtoMessage(TestInt32.getDescriptor(), json);
+ JsonToProtoMessage.INSTANCE.convertToProtoMessage(TestInt32.getDescriptor(), json);
Assert.fail("should fail");
} catch (IllegalArgumentException e) {
assertEquals("JSONObject does not have a int32 field at root.int.", e.getMessage());
@@ -615,7 +615,7 @@ public void testDateTimeMismatch() throws Exception {
json.put("datetime", 1.0);
try {
DynamicMessage protoMsg =
- JsonToProtoMessage.convertJsonToProtoMessage(
+ JsonToProtoMessage.INSTANCE.convertToProtoMessage(
TestDatetime.getDescriptor(), tableSchema, json);
Assert.fail("should fail");
} catch (IllegalArgumentException e) {
@@ -636,7 +636,8 @@ public void testTimeMismatch() throws Exception {
json.put("time", new JSONArray(new Double[] {1.0}));
try {
DynamicMessage protoMsg =
- JsonToProtoMessage.convertJsonToProtoMessage(TestTime.getDescriptor(), tableSchema, json);
+ JsonToProtoMessage.INSTANCE.convertToProtoMessage(
+ TestTime.getDescriptor(), tableSchema, json);
Assert.fail("should fail");
} catch (IllegalArgumentException e) {
assertEquals("JSONObject does not have a int64 field at root.time[0].", e.getMessage());
@@ -657,7 +658,7 @@ public void testMixedCaseFieldNames() throws Exception {
json.put("fooBar", "hello");
DynamicMessage protoMsg =
- JsonToProtoMessage.convertJsonToProtoMessage(
+ JsonToProtoMessage.INSTANCE.convertToProtoMessage(
TestMixedCaseFieldNames.getDescriptor(), tableSchema, json);
}
@@ -682,7 +683,7 @@ public void testDouble() throws Exception {
json.put("long", 8L);
json.put("string", "9.1");
DynamicMessage protoMsg =
- JsonToProtoMessage.convertJsonToProtoMessage(TestDouble.getDescriptor(), json);
+ JsonToProtoMessage.INSTANCE.convertToProtoMessage(TestDouble.getDescriptor(), json);
assertEquals(expectedProto, protoMsg);
}
@@ -705,7 +706,7 @@ public void testDoubleHighPrecision() throws Exception {
JSONObject json = new JSONObject();
json.put("numeric", 3.400500512978076);
DynamicMessage protoMsg =
- JsonToProtoMessage.convertJsonToProtoMessage(
+ JsonToProtoMessage.INSTANCE.convertToProtoMessage(
TestNumeric.getDescriptor(), tableSchema, json);
assertEquals(expectedProto, protoMsg);
}
@@ -735,7 +736,7 @@ public void testDoubleHighPrecision_RepeatedField() throws Exception {
JSONObject json = new JSONObject();
json.put("bignumeric", ImmutableList.of(3.400500512978076, 0.10000000000055, 0.12));
DynamicMessage protoMsg =
- JsonToProtoMessage.convertJsonToProtoMessage(
+ JsonToProtoMessage.INSTANCE.convertToProtoMessage(
TestBignumeric.getDescriptor(), tableSchema, json);
assertEquals(expectedProto, protoMsg);
}
@@ -775,7 +776,7 @@ public void testTimestamp() throws Exception {
json.put("test_timezone", "2022-04-05 09:06:11 PST");
json.put("test_saformat", "2018/08/19 12:11");
DynamicMessage protoMsg =
- JsonToProtoMessage.convertJsonToProtoMessage(
+ JsonToProtoMessage.INSTANCE.convertToProtoMessage(
TestTimestamp.getDescriptor(), tableSchema, json);
assertEquals(expectedProto, protoMsg);
}
@@ -792,7 +793,8 @@ public void testDate() throws Exception {
json.put("test_string", "2021-11-04");
json.put("test_long", 18935L);
DynamicMessage protoMsg =
- JsonToProtoMessage.convertJsonToProtoMessage(TestDate.getDescriptor(), tableSchema, json);
+ JsonToProtoMessage.INSTANCE.convertToProtoMessage(
+ TestDate.getDescriptor(), tableSchema, json);
assertEquals(expectedProto, protoMsg);
}
@@ -804,7 +806,7 @@ public void testAllTypes() throws Exception {
try {
LOG.info("Testing " + json + " over " + entry.getKey().getFullName());
DynamicMessage protoMsg =
- JsonToProtoMessage.convertJsonToProtoMessage(entry.getKey(), json);
+ JsonToProtoMessage.INSTANCE.convertToProtoMessage(entry.getKey(), json);
LOG.info("Convert Success!");
assertEquals(protoMsg, AllTypesToCorrectProto.get(entry.getKey())[success]);
success += 1;
@@ -833,7 +835,7 @@ public void testAllRepeatedTypesWithLimits() throws Exception {
try {
LOG.info("Testing " + json + " over " + entry.getKey().getFullName());
DynamicMessage protoMsg =
- JsonToProtoMessage.convertJsonToProtoMessage(entry.getKey(), json);
+ JsonToProtoMessage.INSTANCE.convertToProtoMessage(entry.getKey(), json);
LOG.info("Convert Success!");
assertEquals(
protoMsg.toString(),
@@ -869,7 +871,7 @@ public void testOptional() throws Exception {
json.put("byte", 1);
DynamicMessage protoMsg =
- JsonToProtoMessage.convertJsonToProtoMessage(TestInt64.getDescriptor(), json);
+ JsonToProtoMessage.INSTANCE.convertToProtoMessage(TestInt64.getDescriptor(), json);
assertEquals(expectedProto, protoMsg);
}
@@ -881,7 +883,8 @@ public void testRepeatedIsOptional() throws Exception {
json.put("required_double", 1.1);
DynamicMessage protoMsg =
- JsonToProtoMessage.convertJsonToProtoMessage(TestRepeatedIsOptional.getDescriptor(), json);
+ JsonToProtoMessage.INSTANCE.convertToProtoMessage(
+ TestRepeatedIsOptional.getDescriptor(), json);
assertEquals(expectedProto, protoMsg);
}
@@ -891,7 +894,7 @@ public void testRequired() throws Exception {
json.put("optional_double", 1.1);
try {
DynamicMessage protoMsg =
- JsonToProtoMessage.convertJsonToProtoMessage(TestRequired.getDescriptor(), json);
+ JsonToProtoMessage.INSTANCE.convertToProtoMessage(TestRequired.getDescriptor(), json);
Assert.fail("should fail");
} catch (IllegalArgumentException e) {
assertEquals(
@@ -911,7 +914,7 @@ public void testStructSimple() throws Exception {
json.put("test_field_type", stringType);
DynamicMessage protoMsg =
- JsonToProtoMessage.convertJsonToProtoMessage(MessageType.getDescriptor(), json);
+ JsonToProtoMessage.INSTANCE.convertToProtoMessage(MessageType.getDescriptor(), json);
assertEquals(expectedProto, protoMsg);
}
@@ -923,7 +926,7 @@ public void testStructSimpleFail() throws Exception {
json.put("test_field_type", stringType);
try {
DynamicMessage protoMsg =
- JsonToProtoMessage.convertJsonToProtoMessage(MessageType.getDescriptor(), json);
+ JsonToProtoMessage.INSTANCE.convertToProtoMessage(MessageType.getDescriptor(), json);
Assert.fail("should fail");
} catch (IllegalArgumentException e) {
assertEquals(
@@ -1057,7 +1060,7 @@ public void testStructComplex() throws Exception {
json.put("test_interval", "0-0 0 0:0:0.000005");
json.put("test_json", new JSONArray(new String[] {"{'a':'b'}"}));
DynamicMessage protoMsg =
- JsonToProtoMessage.convertJsonToProtoMessage(
+ JsonToProtoMessage.INSTANCE.convertToProtoMessage(
ComplexRoot.getDescriptor(), COMPLEX_TABLE_SCHEMA, json);
assertEquals(expectedProto, protoMsg);
}
@@ -1083,7 +1086,7 @@ public void testStructComplexFail() throws Exception {
try {
DynamicMessage protoMsg =
- JsonToProtoMessage.convertJsonToProtoMessage(ComplexRoot.getDescriptor(), json);
+ JsonToProtoMessage.INSTANCE.convertToProtoMessage(ComplexRoot.getDescriptor(), json);
Assert.fail("should fail");
} catch (IllegalArgumentException e) {
assertEquals(
@@ -1097,7 +1100,7 @@ public void testRepeatedWithMixedTypes() throws Exception {
json.put("test_repeated", new JSONArray("[1.1, 2.2, true]"));
try {
DynamicMessage protoMsg =
- JsonToProtoMessage.convertJsonToProtoMessage(RepeatedDouble.getDescriptor(), json);
+ JsonToProtoMessage.INSTANCE.convertToProtoMessage(RepeatedDouble.getDescriptor(), json);
Assert.fail("should fail");
} catch (IllegalArgumentException e) {
assertEquals(
@@ -1140,7 +1143,7 @@ public void testNestedRepeatedComplex() throws Exception {
json.put("repeated_string", jsonRepeatedString);
DynamicMessage protoMsg =
- JsonToProtoMessage.convertJsonToProtoMessage(NestedRepeated.getDescriptor(), json);
+ JsonToProtoMessage.INSTANCE.convertToProtoMessage(NestedRepeated.getDescriptor(), json);
assertEquals(protoMsg, expectedProto);
}
@@ -1159,7 +1162,7 @@ public void testNestedRepeatedComplexFail() throws Exception {
try {
DynamicMessage protoMsg =
- JsonToProtoMessage.convertJsonToProtoMessage(NestedRepeated.getDescriptor(), json);
+ JsonToProtoMessage.INSTANCE.convertToProtoMessage(NestedRepeated.getDescriptor(), json);
Assert.fail("should fail");
} catch (IllegalArgumentException e) {
assertEquals(
@@ -1181,7 +1184,7 @@ public void testEmptySecondLevelObject() throws Exception {
json.put("complex_lvl2", complexLvl2);
DynamicMessage protoMsg =
- JsonToProtoMessage.convertJsonToProtoMessage(ComplexLvl1.getDescriptor(), json);
+ JsonToProtoMessage.INSTANCE.convertToProtoMessage(ComplexLvl1.getDescriptor(), json);
assertEquals(expectedProto, protoMsg);
}
@@ -1193,10 +1196,11 @@ public void testAllowUnknownFieldsError() throws Exception {
try {
DynamicMessage protoMsg =
- JsonToProtoMessage.convertJsonToProtoMessage(RepeatedInt64.getDescriptor(), json);
+ JsonToProtoMessage.INSTANCE.convertToProtoMessage(RepeatedInt64.getDescriptor(), json);
Assert.fail("Should fail");
- } catch (Exceptions.JsonDataHasUnknownFieldException e) {
- assertEquals("JSONObject has fields unknown to BigQuery: root.string.", e.getMessage());
+ } catch (Exceptions.DataHasUnknownFieldException e) {
+ assertEquals(
+ "The source object has fields unknown to BigQuery: root.string.", e.getMessage());
assertEquals("root.string", e.getFieldName());
}
}
@@ -1207,7 +1211,7 @@ public void testEmptyProtoMessage() throws Exception {
json.put("test_repeated", new JSONArray(new int[0]));
DynamicMessage protoMsg =
- JsonToProtoMessage.convertJsonToProtoMessage(RepeatedInt64.getDescriptor(), json);
+ JsonToProtoMessage.INSTANCE.convertToProtoMessage(RepeatedInt64.getDescriptor(), json);
assertEquals(protoMsg.getAllFields().size(), 0);
}
@@ -1216,7 +1220,7 @@ public void testEmptyJSONObject() throws Exception {
JSONObject json = new JSONObject();
try {
DynamicMessage protoMsg =
- JsonToProtoMessage.convertJsonToProtoMessage(Int64Type.getDescriptor(), json);
+ JsonToProtoMessage.INSTANCE.convertToProtoMessage(Int64Type.getDescriptor(), json);
Assert.fail("Should fail");
} catch (IllegalStateException e) {
assertEquals("JSONObject is empty.", e.getMessage());
@@ -1227,7 +1231,7 @@ public void testEmptyJSONObject() throws Exception {
public void testNullJson() throws Exception {
try {
DynamicMessage protoMsg =
- JsonToProtoMessage.convertJsonToProtoMessage(Int64Type.getDescriptor(), null);
+ JsonToProtoMessage.INSTANCE.convertToProtoMessage(Int64Type.getDescriptor(), null);
Assert.fail("Should fail");
} catch (NullPointerException e) {
assertEquals("JSONObject is null.", e.getMessage());
@@ -1238,7 +1242,7 @@ public void testNullJson() throws Exception {
public void testNullDescriptor() throws Exception {
try {
DynamicMessage protoMsg =
- JsonToProtoMessage.convertJsonToProtoMessage(null, new JSONObject());
+ JsonToProtoMessage.INSTANCE.convertToProtoMessage(null, new JSONObject());
Assert.fail("Should fail");
} catch (NullPointerException e) {
assertEquals("Protobuf descriptor is null.", e.getMessage());
@@ -1255,11 +1259,12 @@ public void testAllowUnknownFieldsSecondLevel() throws Exception {
try {
DynamicMessage protoMsg =
- JsonToProtoMessage.convertJsonToProtoMessage(ComplexLvl1.getDescriptor(), json);
+ JsonToProtoMessage.INSTANCE.convertToProtoMessage(ComplexLvl1.getDescriptor(), json);
Assert.fail("Should fail");
} catch (IllegalArgumentException e) {
assertEquals(
- "JSONObject has fields unknown to BigQuery: root.complex_lvl2.no_match.", e.getMessage());
+ "The source object has fields unknown to BigQuery: root.complex_lvl2.no_match.",
+ e.getMessage());
}
}
@@ -1276,7 +1281,7 @@ public void testTopLevelMatchSecondLevelMismatch() throws Exception {
json.put("complex_lvl2", complex_lvl2);
DynamicMessage protoMsg =
- JsonToProtoMessage.convertJsonToProtoMessage(ComplexLvl1.getDescriptor(), json);
+ JsonToProtoMessage.INSTANCE.convertToProtoMessage(ComplexLvl1.getDescriptor(), json);
assertEquals(expectedProto, protoMsg);
}
@@ -1287,7 +1292,7 @@ public void testJsonNullValue() throws Exception {
json.put("long", JSONObject.NULL);
json.put("int", 1);
DynamicMessage protoMsg =
- JsonToProtoMessage.convertJsonToProtoMessage(TestInt64.getDescriptor(), json);
+ JsonToProtoMessage.INSTANCE.convertToProtoMessage(TestInt64.getDescriptor(), json);
assertEquals(expectedProto, protoMsg);
}
@@ -1298,7 +1303,7 @@ public void testJsonAllFieldsNullValue() throws Exception {
json.put("long", JSONObject.NULL);
json.put("int", JSONObject.NULL);
DynamicMessage protoMsg =
- JsonToProtoMessage.convertJsonToProtoMessage(TestInt64.getDescriptor(), json);
+ JsonToProtoMessage.INSTANCE.convertToProtoMessage(TestInt64.getDescriptor(), json);
assertEquals(expectedProto, protoMsg);
}
@@ -1319,7 +1324,8 @@ public void testBadJsonFieldRepeated() throws Exception {
try {
DynamicMessage protoMsg =
- JsonToProtoMessage.convertJsonToProtoMessage(RepeatedBytes.getDescriptor(), ts, json);
+ JsonToProtoMessage.INSTANCE.convertToProtoMessage(
+ RepeatedBytes.getDescriptor(), ts, json);
Assert.fail("Should fail");
} catch (Exceptions.FieldParseError ex) {
assertEquals(ex.getBqType(), "NUMERIC");
@@ -1344,7 +1350,8 @@ public void testBadJsonFieldIntRepeated() throws Exception {
try {
DynamicMessage protoMsg =
- JsonToProtoMessage.convertJsonToProtoMessage(RepeatedInt32.getDescriptor(), ts, json);
+ JsonToProtoMessage.INSTANCE.convertToProtoMessage(
+ RepeatedInt32.getDescriptor(), ts, json);
Assert.fail("Should fail");
} catch (IllegalArgumentException ex) {
assertEquals(ex.getMessage(), "Text 'blah' could not be parsed at index 0");
@@ -1375,7 +1382,7 @@ public void testNullRepeatedField() throws Exception {
json.put("test_repeated", JSONObject.NULL);
DynamicMessage protoMsg =
- JsonToProtoMessage.convertJsonToProtoMessage(RepeatedInt32.getDescriptor(), ts, json);
+ JsonToProtoMessage.INSTANCE.convertToProtoMessage(RepeatedInt32.getDescriptor(), ts, json);
assertTrue(protoMsg.getAllFields().isEmpty());
// Missing repeated field.
@@ -1383,7 +1390,7 @@ public void testNullRepeatedField() throws Exception {
json.put("test_non_repeated", JSONObject.NULL);
protoMsg =
- JsonToProtoMessage.convertJsonToProtoMessage(RepeatedInt32.getDescriptor(), ts, json);
+ JsonToProtoMessage.INSTANCE.convertToProtoMessage(RepeatedInt32.getDescriptor(), ts, json);
assertTrue(protoMsg.getAllFields().isEmpty());
}
@@ -1406,10 +1413,11 @@ public void testDoubleAndFloatToNumericConversion() {
JSONObject json = new JSONObject();
json.put("numeric", new Double(24.678));
DynamicMessage protoMsg =
- JsonToProtoMessage.convertJsonToProtoMessage(TestNumeric.getDescriptor(), ts, json);
+ JsonToProtoMessage.INSTANCE.convertToProtoMessage(TestNumeric.getDescriptor(), ts, json);
assertEquals(expectedProto, protoMsg);
json.put("numeric", new Float(24.678));
- protoMsg = JsonToProtoMessage.convertJsonToProtoMessage(TestNumeric.getDescriptor(), ts, json);
+ protoMsg =
+ JsonToProtoMessage.INSTANCE.convertToProtoMessage(TestNumeric.getDescriptor(), ts, json);
assertEquals(expectedProto, protoMsg);
}
@@ -1434,7 +1442,7 @@ public void testBigDecimalToBigNumericConversion() {
JSONObject json = new JSONObject();
json.put("bignumeric", Collections.singletonList(new BigDecimal("24.6789012345")));
DynamicMessage protoMsg =
- JsonToProtoMessage.convertJsonToProtoMessage(TestBignumeric.getDescriptor(), ts, json);
+ JsonToProtoMessage.INSTANCE.convertToProtoMessage(TestBignumeric.getDescriptor(), ts, json);
assertEquals(expectedProto, protoMsg);
}
@@ -1458,11 +1466,11 @@ public void testDoubleAndFloatToRepeatedBigNumericConversion() {
JSONObject json = new JSONObject();
json.put("bignumeric", Collections.singletonList(new Double(24.678)));
DynamicMessage protoMsg =
- JsonToProtoMessage.convertJsonToProtoMessage(TestBignumeric.getDescriptor(), ts, json);
+ JsonToProtoMessage.INSTANCE.convertToProtoMessage(TestBignumeric.getDescriptor(), ts, json);
assertEquals(expectedProto, protoMsg);
json.put("bignumeric", Collections.singletonList(new Float(24.678)));
protoMsg =
- JsonToProtoMessage.convertJsonToProtoMessage(TestBignumeric.getDescriptor(), ts, json);
+ JsonToProtoMessage.INSTANCE.convertToProtoMessage(TestBignumeric.getDescriptor(), ts, json);
assertEquals(expectedProto, protoMsg);
}
}
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java
index af36273102..bc6dd71690 100644
--- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java
@@ -15,6 +15,7 @@
*/
package com.google.cloud.bigquery.storage.v1;
+import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertThrows;
@@ -113,6 +114,7 @@ public StreamWriterTest() throws DescriptorValidationException {}
@Before
public void setUp() throws Exception {
testBigQueryWrite = new FakeBigQueryWrite();
+ StreamWriter.setMaxRequestCallbackWaitTime(java.time.Duration.ofSeconds(10000));
ConnectionWorker.setMaxInflightQueueWaitTime(300000);
serviceHelper =
new MockServiceHelper(
@@ -947,6 +949,35 @@ public void testMessageTooLarge() throws Exception {
writer.close();
}
+ @Test
+ public void testThrowExceptionWhileWithinAppendLoop_MaxWaitTimeExceed() throws Exception {
+ ProtoSchema schema1 = createProtoSchema("foo");
+ StreamWriter.setMaxRequestCallbackWaitTime(java.time.Duration.ofSeconds(1));
+ StreamWriter writer =
+ StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(schema1).build();
+ testBigQueryWrite.setResponseSleep(org.threeten.bp.Duration.ofSeconds(3));
+
+ long appendCount = 10;
+ for (int i = 0; i < appendCount; i++) {
+ testBigQueryWrite.addResponse(createAppendResponse(i));
+ }
+
+ // In total insert 5 requests,
+ List> futures = new ArrayList<>();
+ for (int i = 0; i < appendCount; i++) {
+ futures.add(writer.append(createProtoRows(new String[] {String.valueOf(i)}), i));
+ }
+
+ for (int i = 0; i < appendCount; i++) {
+ int finalI = i;
+ ExecutionException ex =
+ assertThrows(
+ ExecutionException.class,
+ () -> futures.get(finalI).get().getAppendResult().getOffset().getValue());
+ assertThat(ex.getCause()).hasMessageThat().contains("Request has waited in inflight queue");
+ }
+ }
+
@Test
public void testAppendWithResetSuccess() throws Exception {
try (StreamWriter writer = getTestStreamWriter()) {
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java
index c80bb960ac..a068f6d635 100644
--- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryWriteManualClientTest.java
@@ -742,9 +742,7 @@ public void testJsonStreamWriterWithMessagesOver10M()
new ArrayList>(totalRequest);
// Sends a total of 30MB over the wire.
try (JsonStreamWriter jsonStreamWriter =
- JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema())
- .setReconnectAfter10M(true)
- .build()) {
+ JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema()).build()) {
for (int k = 0; k < totalRequest; k++) {
JSONObject row = new JSONObject();
row.put("col1", "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa");
diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/JsonToProtoMessageTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/JsonToProtoMessageTest.java
index c340d22e9a..9827e72588 100644
--- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/JsonToProtoMessageTest.java
+++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/JsonToProtoMessageTest.java
@@ -618,7 +618,7 @@ public void testMixedCasedFieldNames() throws Exception {
json.put("fooBar", "hello");
DynamicMessage protoMsg =
- com.google.cloud.bigquery.storage.v1.JsonToProtoMessage.convertJsonToProtoMessage(
+ com.google.cloud.bigquery.storage.v1.JsonToProtoMessage.INSTANCE.convertToProtoMessage(
TestMixedCaseFieldNames.getDescriptor(), tableSchema, json);
}
diff --git a/grpc-google-cloud-bigquerystorage-v1/pom.xml b/grpc-google-cloud-bigquerystorage-v1/pom.xml
index a0681d7dbd..6434762235 100644
--- a/grpc-google-cloud-bigquerystorage-v1/pom.xml
+++ b/grpc-google-cloud-bigquerystorage-v1/pom.xml
@@ -4,13 +4,13 @@
4.0.0
com.google.api.grpc
grpc-google-cloud-bigquerystorage-v1
- 2.34.2
+ 2.35.0
grpc-google-cloud-bigquerystorage-v1
GRPC library for grpc-google-cloud-bigquerystorage-v1
com.google.cloud
google-cloud-bigquerystorage-parent
- 2.34.2
+ 2.35.0
diff --git a/grpc-google-cloud-bigquerystorage-v1beta1/pom.xml b/grpc-google-cloud-bigquerystorage-v1beta1/pom.xml
index 913217f564..5b2a724040 100644
--- a/grpc-google-cloud-bigquerystorage-v1beta1/pom.xml
+++ b/grpc-google-cloud-bigquerystorage-v1beta1/pom.xml
@@ -4,13 +4,13 @@
4.0.0
com.google.api.grpc
grpc-google-cloud-bigquerystorage-v1beta1
- 0.158.2
+ 0.159.0
grpc-google-cloud-bigquerystorage-v1beta1
GRPC library for grpc-google-cloud-bigquerystorage-v1beta1
com.google.cloud
google-cloud-bigquerystorage-parent
- 2.34.2
+ 2.35.0
diff --git a/grpc-google-cloud-bigquerystorage-v1beta2/pom.xml b/grpc-google-cloud-bigquerystorage-v1beta2/pom.xml
index 0d395ca526..074640102e 100644
--- a/grpc-google-cloud-bigquerystorage-v1beta2/pom.xml
+++ b/grpc-google-cloud-bigquerystorage-v1beta2/pom.xml
@@ -4,13 +4,13 @@
4.0.0
com.google.api.grpc
grpc-google-cloud-bigquerystorage-v1beta2
- 0.158.2
+ 0.159.0
grpc-google-cloud-bigquerystorage-v1beta2
GRPC library for grpc-google-cloud-bigquerystorage-v1beta2
com.google.cloud
google-cloud-bigquerystorage-parent
- 2.34.2
+ 2.35.0
diff --git a/pom.xml b/pom.xml
index 7a39cb2b63..3d7f898568 100644
--- a/pom.xml
+++ b/pom.xml
@@ -4,7 +4,7 @@
com.google.cloud
google-cloud-bigquerystorage-parent
pom
- 2.34.2
+ 2.35.0
BigQuery Storage Parent
https://github.com/googleapis/java-bigquerystorage
@@ -76,44 +76,44 @@
com.google.cloud
google-cloud-shared-dependencies
- 3.6.0
+ 3.7.0
pom
import
com.google.api.grpc
proto-google-cloud-bigquerystorage-v1beta1
- 0.158.2
+ 0.159.0
com.google.api.grpc
proto-google-cloud-bigquerystorage-v1beta2
- 0.158.2
+ 0.159.0
com.google.api.grpc
proto-google-cloud-bigquerystorage-v1
- 2.34.2
+ 2.35.0
com.google.api.grpc
grpc-google-cloud-bigquerystorage-v1beta1
- 0.158.2
+ 0.159.0
com.google.api.grpc
grpc-google-cloud-bigquerystorage-v1beta2
- 0.158.2
+ 0.159.0
com.google.api.grpc
grpc-google-cloud-bigquerystorage-v1
- 2.34.2
+ 2.35.0
com.google.cloud
google-cloud-bigquerystorage
- 2.34.2
+ 2.35.0
org.json
@@ -132,7 +132,7 @@
com.google.cloud
google-cloud-bigquery
- 2.24.3
+ 2.24.4
test
diff --git a/proto-google-cloud-bigquerystorage-v1/pom.xml b/proto-google-cloud-bigquerystorage-v1/pom.xml
index 746ad3a08d..dd454b2949 100644
--- a/proto-google-cloud-bigquerystorage-v1/pom.xml
+++ b/proto-google-cloud-bigquerystorage-v1/pom.xml
@@ -4,13 +4,13 @@
4.0.0
com.google.api.grpc
proto-google-cloud-bigquerystorage-v1
- 2.34.2
+ 2.35.0
proto-google-cloud-bigquerystorage-v1
PROTO library for proto-google-cloud-bigquerystorage-v1
com.google.cloud
google-cloud-bigquerystorage-parent
- 2.34.2
+ 2.35.0
diff --git a/proto-google-cloud-bigquerystorage-v1beta1/pom.xml b/proto-google-cloud-bigquerystorage-v1beta1/pom.xml
index 91db33cd81..ea17b35fe4 100644
--- a/proto-google-cloud-bigquerystorage-v1beta1/pom.xml
+++ b/proto-google-cloud-bigquerystorage-v1beta1/pom.xml
@@ -4,13 +4,13 @@
4.0.0
com.google.api.grpc
proto-google-cloud-bigquerystorage-v1beta1
- 0.158.2
+ 0.159.0
proto-google-cloud-bigquerystorage-v1beta1
PROTO library for proto-google-cloud-bigquerystorage-v1beta1
com.google.cloud
google-cloud-bigquerystorage-parent
- 2.34.2
+ 2.35.0
diff --git a/proto-google-cloud-bigquerystorage-v1beta2/pom.xml b/proto-google-cloud-bigquerystorage-v1beta2/pom.xml
index eb9de4ef7a..9bf7cd73fb 100644
--- a/proto-google-cloud-bigquerystorage-v1beta2/pom.xml
+++ b/proto-google-cloud-bigquerystorage-v1beta2/pom.xml
@@ -4,13 +4,13 @@
4.0.0
com.google.api.grpc
proto-google-cloud-bigquerystorage-v1beta2
- 0.158.2
+ 0.159.0
proto-google-cloud-bigquerystorage-v1beta2
PROTO library for proto-google-cloud-bigquerystorage-v1beta2
com.google.cloud
google-cloud-bigquerystorage-parent
- 2.34.2
+ 2.35.0
diff --git a/samples/install-without-bom/pom.xml b/samples/install-without-bom/pom.xml
index ecfcdc5934..47a7bc6d60 100644
--- a/samples/install-without-bom/pom.xml
+++ b/samples/install-without-bom/pom.xml
@@ -30,14 +30,14 @@
com.google.cloud
google-cloud-bigquerystorage
- 2.34.1
+ 2.34.2
com.google.cloud
google-cloud-bigquery
- 2.24.3
+ 2.24.4
org.apache.avro
diff --git a/samples/snapshot/pom.xml b/samples/snapshot/pom.xml
index e69f721897..83bfcef82a 100644
--- a/samples/snapshot/pom.xml
+++ b/samples/snapshot/pom.xml
@@ -29,14 +29,14 @@
com.google.cloud
google-cloud-bigquerystorage
- 2.34.2
+ 2.35.0
com.google.cloud
google-cloud-bigquery
- 2.24.3
+ 2.24.4
org.apache.avro
diff --git a/samples/snippets/pom.xml b/samples/snippets/pom.xml
index 6009b89583..7dee80372b 100644
--- a/samples/snippets/pom.xml
+++ b/samples/snippets/pom.xml
@@ -48,7 +48,7 @@
com.google.cloud
google-cloud-bigquery
- 2.24.3
+ 2.24.4
org.apache.avro
diff --git a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java
index 8bd384c325..0266b6ae9d 100644
--- a/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java
+++ b/samples/snippets/src/main/java/com/example/bigquerystorage/WriteToDefaultStream.java
@@ -40,6 +40,7 @@
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.Phaser;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.concurrent.GuardedBy;
import org.json.JSONArray;
import org.json.JSONObject;
@@ -123,6 +124,7 @@ private static class AppendContext {
private static class DataWriter {
private static final int MAX_RETRY_COUNT = 3;
+ private static final int MAX_RECREATE_COUNT = 3;
private static final ImmutableList RETRIABLE_ERROR_CODES =
ImmutableList.of(
Code.INTERNAL,
@@ -140,6 +142,8 @@ private static class DataWriter {
@GuardedBy("lock")
private RuntimeException error = null;
+ private AtomicInteger recreateCount = new AtomicInteger(0);
+
public void initialize(TableName parentTable)
throws DescriptorValidationException, IOException, InterruptedException {
// Use the JSON stream writer to send records in JSON format. Specify the table name to write
@@ -151,8 +155,17 @@ public void initialize(TableName parentTable)
}
public void append(AppendContext appendContext)
- throws DescriptorValidationException, IOException {
+ throws DescriptorValidationException, IOException, InterruptedException {
synchronized (this.lock) {
+ if (!streamWriter.isUserClosed()
+ && streamWriter.isClosed()
+ && recreateCount.getAndIncrement() < MAX_RECREATE_COUNT) {
+ streamWriter =
+ JsonStreamWriter.newBuilder(
+ streamWriter.getStreamName(), BigQueryWriteClient.create())
+ .build();
+ this.error = null;
+ }
// If earlier appends have failed, we need to reset before continuing.
if (this.error != null) {
throw this.error;
@@ -194,6 +207,7 @@ public AppendCompleteCallback(DataWriter parent, AppendContext appendContext) {
public void onSuccess(AppendRowsResponse response) {
System.out.format("Append success\n");
+ this.parent.recreateCount.set(0);
done();
}
@@ -241,6 +255,8 @@ public void onFailure(Throwable throwable) {
throw new RuntimeException(e);
} catch (IOException e) {
throw new RuntimeException(e);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
}
}
// Mark the existing attempt as done since we got a response for it
diff --git a/tutorials/JsonWriterDefaultStream/pom.xml b/tutorials/JsonWriterDefaultStream/pom.xml
index 7e1aca52c1..a458351679 100644
--- a/tutorials/JsonWriterDefaultStream/pom.xml
+++ b/tutorials/JsonWriterDefaultStream/pom.xml
@@ -19,12 +19,12 @@
com.google.cloud
google-cloud-bigquerystorage
- 2.34.1
+ 2.34.2
com.google.cloud
google-cloud-bigquery
- 2.24.3
+ 2.24.4
org.apache.avro
diff --git a/versions.txt b/versions.txt
index 3bb1693b4f..f4a6afe357 100644
--- a/versions.txt
+++ b/versions.txt
@@ -1,10 +1,10 @@
# Format:
# module:released-version:current-version
-google-cloud-bigquerystorage:2.34.2:2.34.2
-grpc-google-cloud-bigquerystorage-v1beta1:0.158.2:0.158.2
-grpc-google-cloud-bigquerystorage-v1beta2:0.158.2:0.158.2
-grpc-google-cloud-bigquerystorage-v1:2.34.2:2.34.2
-proto-google-cloud-bigquerystorage-v1beta1:0.158.2:0.158.2
-proto-google-cloud-bigquerystorage-v1beta2:0.158.2:0.158.2
-proto-google-cloud-bigquerystorage-v1:2.34.2:2.34.2
+google-cloud-bigquerystorage:2.35.0:2.35.0
+grpc-google-cloud-bigquerystorage-v1beta1:0.159.0:0.159.0
+grpc-google-cloud-bigquerystorage-v1beta2:0.159.0:0.159.0
+grpc-google-cloud-bigquerystorage-v1:2.35.0:2.35.0
+proto-google-cloud-bigquerystorage-v1beta1:0.159.0:0.159.0
+proto-google-cloud-bigquerystorage-v1beta2:0.159.0:0.159.0
+proto-google-cloud-bigquerystorage-v1:2.35.0:2.35.0