Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.base.Preconditions;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Descriptors.Descriptor;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import com.google.protobuf.Message;
import java.io.IOException;
import java.util.logging.Logger;
Expand All @@ -34,7 +35,10 @@
/**
* A StreamWriter that can write JSON data (JSONObjects) to BigQuery tables. The JsonStreamWriter is
* built on top of a StreamWriter, and it simply converts all JSON data to protobuf messages then
* calls StreamWriter's append() method to write to BigQuery tables.
* calls StreamWriter's append() method to write to BigQuery tables. It maintains all StreamWriter
* functions, but also provides an additional feature: schema update support, where if the BigQuery
* table schema is updated, users will be able to ingest data on the new schema after some time (in
* order of minutes).
*/
public class JsonStreamWriter implements AutoCloseable {
private static String streamPatternString =
Expand Down Expand Up @@ -81,27 +85,49 @@ private JsonStreamWriter(Builder builder)
/**
* Writes a JSONArray that contains JSONObjects to the BigQuery table by first converting the JSON
* data to protobuf messages, then using StreamWriter's append() to write the data at current end
* of stream.
* of stream. If there is a schema update, the current StreamWriter is closed. A new StreamWriter
* is created with the updated TableSchema.
*
* @param jsonArr The JSON array that contains JSONObjects to be written
* @return ApiFuture<AppendRowsResponse> returns an AppendRowsResponse message wrapped in an
* ApiFuture
*/
public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr) {
public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr)
throws IOException, DescriptorValidationException {
return append(jsonArr, -1);
}

/**
* Writes a JSONArray that contains JSONObjects to the BigQuery table by first converting the JSON
* data to protobuf messages, then using StreamWriter's append() to write the data at the
* specified offset.
* specified offset. If there is a schema update, the current StreamWriter is closed. A new
* StreamWriter is created with the updated TableSchema.
*
* @param jsonArr The JSON array that contains JSONObjects to be written
* @param offset Offset for deduplication
* @return ApiFuture<AppendRowsResponse> returns an AppendRowsResponse message wrapped in an
* ApiFuture
*/
public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr, long offset) {
public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr, long offset)
throws IOException, DescriptorValidationException {
// Handle schema updates in a Thread-safe way by locking down the operation
synchronized (this) {
TableSchema updatedSchema = this.streamWriter.getUpdatedSchema();
if (updatedSchema != null) {
// Close the StreamWriter
this.streamWriter.close();
// Update JsonStreamWriter's TableSchema and Descriptor
this.tableSchema = updatedSchema;
this.descriptor =
BQTableSchemaToProtoDescriptor.convertBQTableSchemaToProtoDescriptor(updatedSchema);
// Create a new underlying StreamWriter with the updated TableSchema and Descriptor
this.streamWriter =
streamWriterBuilder
.setWriterSchema(ProtoSchemaConverter.convert(this.descriptor))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need to propagate anything else to the new writer like stream id, existing trace id, etc or does it all come from properties stored on the JsonStreamWriter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, those things are needed and are already being populated in the streamWriterBuilder.

.build();
}
}

ProtoRows.Builder rowsBuilder = ProtoRows.newBuilder();
// Any error in convertJsonToProtoMessage will throw an
// IllegalArgumentException/IllegalStateException/NullPointerException and will halt processing
Expand Down Expand Up @@ -155,9 +181,9 @@ private void setStreamWriterSettings(
streamWriterBuilder.setEndpoint(endpoint);
}
if (traceId != null) {
streamWriterBuilder.setTraceId("JsonWriterBeta_" + traceId);
streamWriterBuilder.setTraceId("JsonWriter_" + traceId);
} else {
streamWriterBuilder.setTraceId("JsonWriterBeta:null");
streamWriterBuilder.setTraceId("JsonWriter:null");
}
if (flowControlSettings != null) {
if (flowControlSettings.getMaxOutstandingRequestBytes() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,6 @@
* A BigQuery Stream Writer that can be used to write data into BigQuery Table.
*
* <p>TODO: Support batching.
*
* <p>TODO: Support schema change.
*/
public class StreamWriter implements AutoCloseable {
private static final Logger log = Logger.getLogger(StreamWriter.class.getName());
Expand Down Expand Up @@ -135,6 +133,12 @@ public class StreamWriter implements AutoCloseable {
@GuardedBy("lock")
private final Deque<AppendRequestAndResponse> inflightRequestQueue;

/*
* Contains the updated TableSchema.
*/
@GuardedBy("lock")
private TableSchema updatedSchema;

/*
* A client used to interact with BigQuery.
*/
Expand Down Expand Up @@ -526,6 +530,9 @@ private void cleanupInflightRequests() {
private void requestCallback(AppendRowsResponse response) {
AppendRequestAndResponse requestWrapper;
this.lock.lock();
if (response.hasUpdatedSchema()) {
this.updatedSchema = response.getUpdatedSchema();
}
try {
// Had a successful connection with at least one result, reset retries.
// conectionRetryCountWithoutCallback is reset so that only multiple retries, without
Expand Down Expand Up @@ -622,7 +629,12 @@ public static StreamWriter.Builder newBuilder(String streamName) {
return new StreamWriter.Builder(streamName);
}

/** A builder of {@link StreamWriterV2}s. */
/** Thread-safe getter of updated TableSchema */
public synchronized TableSchema getUpdatedSchema() {
return this.updatedSchema;
}

/** A builder of {@link StreamWriter}s. */
public static final class Builder {

private static final long DEFAULT_MAX_INFLIGHT_REQUESTS = 1000L;
Expand All @@ -649,6 +661,8 @@ public static final class Builder {

private String traceId = null;

private TableSchema updatedTableSchema = null;

private Builder(String streamName) {
this.streamName = Preconditions.checkNotNull(streamName);
this.client = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package com.google.cloud.bigquery.storage.v1;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import com.google.api.core.ApiFuture;
import com.google.api.gax.core.ExecutorProvider;
Expand All @@ -26,6 +27,7 @@
import com.google.api.gax.grpc.testing.MockServiceHelper;
import com.google.cloud.bigquery.storage.test.JsonTest;
import com.google.cloud.bigquery.storage.test.Test.FooType;
import com.google.cloud.bigquery.storage.test.Test.UpdatedFooType;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import com.google.protobuf.Int64Value;
import com.google.protobuf.Timestamp;
Expand Down Expand Up @@ -193,7 +195,7 @@ public void testSingleAppendSimpleJson() throws Exception {
.getSerializedRows(0),
expectedProto.toByteString());
assertEquals(
testBigQueryWrite.getAppendRequests().get(0).getTraceId(), "JsonWriterBeta_test:empty");
testBigQueryWrite.getAppendRequests().get(0).getTraceId(), "JsonWriter_test:empty");
}
}

Expand Down Expand Up @@ -284,8 +286,7 @@ public void testSingleAppendMultipleSimpleJson() throws Exception {
.getProtoRows()
.getRows()
.getSerializedRowsCount());
assertEquals(
testBigQueryWrite.getAppendRequests().get(0).getTraceId(), "JsonWriterBeta:null");
assertEquals(testBigQueryWrite.getAppendRequests().get(0).getTraceId(), "JsonWriter:null");
for (int i = 0; i < 4; i++) {
assertEquals(
testBigQueryWrite
Expand Down Expand Up @@ -388,4 +389,111 @@ public void testCreateDefaultStream() throws Exception {
assertEquals("projects/p/datasets/d/tables/t/_default", writer.getStreamName());
}
}

@Test
public void testSimpleSchemaUpdate() throws Exception {
try (JsonStreamWriter writer =
getTestJsonStreamWriterBuilder(TEST_STREAM, TABLE_SCHEMA).build()) {
testBigQueryWrite.addResponse(
AppendRowsResponse.newBuilder()
.setAppendResult(
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(0)).build())
.setUpdatedSchema(UPDATED_TABLE_SCHEMA)
.build());
testBigQueryWrite.addResponse(
AppendRowsResponse.newBuilder()
.setAppendResult(
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(1)).build())
.build());
testBigQueryWrite.addResponse(
AppendRowsResponse.newBuilder()
.setAppendResult(
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(2)).build())
.build());
testBigQueryWrite.addResponse(
AppendRowsResponse.newBuilder()
.setAppendResult(
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(3)).build())
.build());
// First append
JSONObject foo = new JSONObject();
foo.put("foo", "aaa");
JSONArray jsonArr = new JSONArray();
jsonArr.put(foo);

ApiFuture<AppendRowsResponse> appendFuture1 = writer.append(jsonArr);
ApiFuture<AppendRowsResponse> appendFuture2 = writer.append(jsonArr);
ApiFuture<AppendRowsResponse> appendFuture3 = writer.append(jsonArr);

assertEquals(0L, appendFuture1.get().getAppendResult().getOffset().getValue());
assertEquals(1L, appendFuture2.get().getAppendResult().getOffset().getValue());
assertEquals(
1,
testBigQueryWrite
.getAppendRequests()
.get(0)
.getProtoRows()
.getRows()
.getSerializedRowsCount());
assertEquals(
testBigQueryWrite
.getAppendRequests()
.get(0)
.getProtoRows()
.getRows()
.getSerializedRows(0),
FooType.newBuilder().setFoo("aaa").build().toByteString());

assertEquals(2L, appendFuture3.get().getAppendResult().getOffset().getValue());
assertEquals(
1,
testBigQueryWrite
.getAppendRequests()
.get(1)
.getProtoRows()
.getRows()
.getSerializedRowsCount());
assertEquals(
testBigQueryWrite
.getAppendRequests()
.get(1)
.getProtoRows()
.getRows()
.getSerializedRows(0),
FooType.newBuilder().setFoo("aaa").build().toByteString());

// Second append with updated schema.
JSONObject updatedFoo = new JSONObject();
updatedFoo.put("foo", "aaa");
updatedFoo.put("bar", "bbb");
JSONArray updatedJsonArr = new JSONArray();
updatedJsonArr.put(updatedFoo);

ApiFuture<AppendRowsResponse> appendFuture4 = writer.append(updatedJsonArr);

assertEquals(3L, appendFuture4.get().getAppendResult().getOffset().getValue());
assertEquals(4, testBigQueryWrite.getAppendRequests().size());
assertEquals(
1,
testBigQueryWrite
.getAppendRequests()
.get(3)
.getProtoRows()
.getRows()
.getSerializedRowsCount());
assertEquals(
testBigQueryWrite
.getAppendRequests()
.get(3)
.getProtoRows()
.getRows()
.getSerializedRows(0),
UpdatedFooType.newBuilder().setFoo("aaa").setBar("bbb").build().toByteString());

assertTrue(testBigQueryWrite.getAppendRequests().get(0).getProtoRows().hasWriterSchema());
assertTrue(
testBigQueryWrite.getAppendRequests().get(2).getProtoRows().hasWriterSchema()
|| testBigQueryWrite.getAppendRequests().get(3).getProtoRows().hasWriterSchema());
}
}
}
Loading