Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
b746ff8
feat: add support for StorageError
stephaniewang526 Oct 29, 2021
2acdc53
🦉 Updates from OwlBot
gcf-owl-bot[bot] Oct 29, 2021
472aae6
add license header
stephaniewang526 Oct 29, 2021
ab9c38f
Merge remote-tracking branch 'origin/storageError' into storageError
stephaniewang526 Oct 29, 2021
5e52f7a
add error parsing in doneCallback
stephaniewang526 Nov 1, 2021
a78a524
address feedback
stephaniewang526 Nov 2, 2021
bd0b2c0
add test case for onDone
stephaniewang526 Nov 4, 2021
2e0a6fe
Merge branch 'main' into storageError
stephaniewang526 Nov 4, 2021
f868333
🦉 Updates from OwlBot
gcf-owl-bot[bot] Nov 4, 2021
84d97e8
Merge branch 'storageError' of https://github.com/googleapis/java-big…
gcf-owl-bot[bot] Nov 4, 2021
8a2f95f
refactor code
stephaniewang526 Nov 4, 2021
eccee65
Merge remote-tracking branch 'origin/storageError' into storageError
stephaniewang526 Nov 4, 2021
2833886
lint
stephaniewang526 Nov 4, 2021
d321f17
🦉 Updates from OwlBot
gcf-owl-bot[bot] Nov 4, 2021
89dca1b
Merge branch 'storageError' of https://github.com/googleapis/java-big…
gcf-owl-bot[bot] Nov 4, 2021
436f40b
apply changes from code review
stephaniewang526 Nov 5, 2021
c88dad2
add integration test for onDoneCallback
stephaniewang526 Nov 8, 2021
4a7c96f
Merge remote-tracking branch 'origin/storageError' into storageError
stephaniewang526 Nov 8, 2021
1da0f7a
make error check more vigorous
stephaniewang526 Nov 12, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
/*
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigquery.storage.v1;

import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.Any;
import com.google.protobuf.InvalidProtocolBufferException;
import io.grpc.Status;
import io.grpc.Status.Code;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nullable;

/** Exceptions for Storage Client Libraries. */
public final class Exceptions {
/** Main Storage Exception. Might contain map of streams to errors for that stream. */
public static class StorageException extends RuntimeException {

private final ImmutableMap<String, GrpcStatusCode> errors;
private final String streamName;

private StorageException() {
this(null, null, null, ImmutableMap.of());
}

private StorageException(
@Nullable String message,
@Nullable Throwable cause,
@Nullable String streamName,
ImmutableMap<String, GrpcStatusCode> errors) {
super(message, cause);
this.streamName = streamName;
this.errors = errors;
}

public ImmutableMap<String, GrpcStatusCode> getErrors() {
return errors;
}

public String getStreamName() {
return streamName;
}
}

/** Stream has already been finalized. */
public static final class StreamFinalizedException extends StorageException {
protected StreamFinalizedException(String name, String message, Throwable cause) {
super(message, cause, name, ImmutableMap.of());
}
}

/**
* There was a schema mismatch due to bigquery table with fewer fields than the input message.
* This can be resolved by updating the table's schema with the message schema.
*/
public static final class SchemaMismatchedException extends StorageException {
protected SchemaMismatchedException(String name, String message, Throwable cause) {
super(message, cause, name, ImmutableMap.of());
}
}

private static StorageError toStorageError(com.google.rpc.Status rpcStatus) {
for (Any detail : rpcStatus.getDetailsList()) {
if (detail.is(StorageError.class)) {
try {
return detail.unpack(StorageError.class);
} catch (InvalidProtocolBufferException protoException) {
throw new IllegalStateException(protoException);
}
}
}
return null;
}

/**
* Converts a c.g.rpc.Status into a StorageException, if possible. Examines the embedded
* StorageError, and potentially returns a {@link StreamFinalizedException} or {@link
* SchemaMismatchedException} (both derive from StorageException). If there is no StorageError, or
* the StorageError is a different error it will return NULL.
*/
@Nullable
public static StorageException toStorageException(
com.google.rpc.Status rpcStatus, Throwable exception) {
StorageError error = toStorageError(rpcStatus);
if (error == null) {
return null;
}
switch (error.getCode()) {
case STREAM_FINALIZED:
return new StreamFinalizedException(error.getEntity(), error.getErrorMessage(), exception);

case SCHEMA_MISMATCH_EXTRA_FIELDS:
return new SchemaMismatchedException(error.getEntity(), error.getErrorMessage(), exception);

default:
return null;
}
}

/**
* Converts a Throwable into a StorageException, if possible. Examines the embedded error message,
* and potentially returns a {@link StreamFinalizedException} or {@link SchemaMismatchedException}
* (both derive from StorageException). If there is no StorageError, or the StorageError is a
* different error it will return NULL.
*/
@Nullable
public static StorageException toStorageException(Throwable exception) {
// TODO: switch to using rpcStatus when cl/408735437 is rolled out
// com.google.rpc.Status rpcStatus = StatusProto.fromThrowable(exception);
Status grpcStatus = Status.fromThrowable(exception);
String message = exception.getMessage();
String streamPatternString = "projects/[^/]+/datasets/[^/]+/tables/[^/]+/streams/[^/]+";
Pattern streamPattern = Pattern.compile(streamPatternString);
if (message == null) {
return null;
}
// TODO: SWTICH TO CHECK SCHEMA_MISMATCH_EXTRA_FIELDS IN THE ERROR CODE
if (grpcStatus.getCode().equals(Code.INVALID_ARGUMENT)
&& message.toLowerCase().contains("input schema has more fields than bigquery schema")) {
Matcher streamMatcher = streamPattern.matcher(message);
String entity = streamMatcher.find() ? streamMatcher.group() : "streamName unkown";
return new SchemaMismatchedException(entity, message, exception);
}
// TODO: SWTICH TO CHECK STREAM_FINALIZED IN THE ERROR CODE
if (grpcStatus.getCode().equals(Code.INVALID_ARGUMENT)
&& message.toLowerCase().contains("stream has been finalized and cannot be appended")) {
Matcher streamMatcher = streamPattern.matcher(message);
String entity = streamMatcher.find() ? streamMatcher.group() : "streamName unkown";
return new StreamFinalizedException(entity, message, exception);
}
return null;
}

private Exceptions() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -460,11 +460,17 @@ private void requestCallback(AppendRowsResponse response) {
this.lock.unlock();
}
if (response.hasError()) {
StatusRuntimeException exception =
new StatusRuntimeException(
Status.fromCodeValue(response.getError().getCode())
.withDescription(response.getError().getMessage()));
requestWrapper.appendResult.setException(exception);
Exceptions.StorageException storageException =
Exceptions.toStorageException(response.getError(), null);
if (storageException != null) {
requestWrapper.appendResult.setException(storageException);
} else {
StatusRuntimeException exception =
new StatusRuntimeException(
Status.fromCodeValue(response.getError().getCode())
.withDescription(response.getError().getMessage()));
requestWrapper.appendResult.setException(exception);
}
} else {
requestWrapper.appendResult.set(response);
}
Expand All @@ -482,6 +488,10 @@ private void doneCallback(Throwable finalStatus) {
} finally {
this.lock.unlock();
}
Exceptions.StorageException storageException = Exceptions.toStorageException(finalStatus);
if (storageException != null) {
this.connectionFinalStatus = storageException;
}
}

@GuardedBy("lock")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.cloud.bigquery.storage.test.Test.FooType;
import com.google.cloud.bigquery.storage.v1.StorageError.StorageErrorCode;
import com.google.common.base.Strings;
import com.google.protobuf.Any;
import com.google.protobuf.DescriptorProtos;
import com.google.protobuf.Int64Value;
import io.grpc.Status;
Expand Down Expand Up @@ -304,6 +306,96 @@ public void testAppendSuccessAndInStreamError() throws Exception {
writer.close();
}

@Test
public void testAppendFailedSchemaError() throws Exception {
StreamWriter writer = getTestStreamWriter();

StorageError storageError =
StorageError.newBuilder()
.setCode(StorageErrorCode.SCHEMA_MISMATCH_EXTRA_FIELDS)
.setEntity("foobar")
.build();
com.google.rpc.Status statusProto =
com.google.rpc.Status.newBuilder()
.setCode(Code.INVALID_ARGUMENT.getHttpStatusCode())
.addDetails(Any.pack(storageError))
.build();

testBigQueryWrite.addResponse(createAppendResponse(0));
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().setError(statusProto).build());
testBigQueryWrite.addResponse(createAppendResponse(1));

ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
ApiFuture<AppendRowsResponse> appendFuture2 = sendTestMessage(writer, new String[] {"B"});
ApiFuture<AppendRowsResponse> appendFuture3 = sendTestMessage(writer, new String[] {"C"});

assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue());
Exceptions.SchemaMismatchedException actualError =
assertFutureException(Exceptions.SchemaMismatchedException.class, appendFuture2);
assertEquals("foobar", actualError.getStreamName());
assertEquals(1, appendFuture3.get().getAppendResult().getOffset().getValue());

writer.close();
}

@Test
public void testAppendFailedOnDone() throws Exception {
StreamWriter writer = getTestStreamWriter();

StatusRuntimeException exception =
new StatusRuntimeException(
io.grpc.Status.INVALID_ARGUMENT.withDescription(
"io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Input schema has more fields than BigQuery schema"));

testBigQueryWrite.addResponse(createAppendResponse(0));
testBigQueryWrite.addException(exception);

ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
ApiFuture<AppendRowsResponse> appendFuture2 = sendTestMessage(writer, new String[] {"B"});

assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue());
Exceptions.SchemaMismatchedException actualError =
assertFutureException(Exceptions.SchemaMismatchedException.class, appendFuture2);
assertTrue(
actualError
.getMessage()
.contains(
"io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Input schema has more fields than BigQuery schema"));

writer.close();
}

// TODO(stephwang): update test case to below when toStorageException is updated
// @Test
// public void testAppendFailedOnDone2() throws Exception {
// StreamWriter writer = getTestStreamWriter();
//
// StorageError storageError =
// StorageError.newBuilder()
// .setCode(StorageErrorCode.SCHEMA_MISMATCH_EXTRA_FIELDS)
// .setEntity("foobar")
// .build();
// com.google.rpc.Status statusProto =
// com.google.rpc.Status.newBuilder()
// .addDetails(Any.pack(storageError))
// .build();
//
// StatusRuntimeException exception = StatusProto.toStatusRuntimeException(statusProto);
//
// testBigQueryWrite.addResponse(createAppendResponse(0));
// testBigQueryWrite.addException(exception);
//
// ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
// ApiFuture<AppendRowsResponse> appendFuture2 = sendTestMessage(writer, new String[] {"B"});
//
// assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue());
// Exceptions.SchemaMismatchedException actualError =
// assertFutureException(Exceptions.SchemaMismatchedException.class, appendFuture2);
// assertEquals("foobar", actualError.getStreamName());
//
// writer.close();
// }

@Test
public void longIdleBetweenAppends() throws Exception {
StreamWriter writer = getTestStreamWriter();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,15 @@ ProtoRows CreateProtoRows(String[] messages) {
return rows.build();
}

ProtoRows CreateProtoRowsMultipleColumns(String[] messages) {
ProtoRows.Builder rows = ProtoRows.newBuilder();
for (String message : messages) {
UpdatedFooType foo = UpdatedFooType.newBuilder().setFoo(message).setBar(message).build();
rows.addSerializedRows(foo.toByteString());
}
return rows.build();
}

ProtoRows CreateProtoRowsComplex(String[] messages) {
ProtoRows.Builder rows = ProtoRows.newBuilder();
for (String message : messages) {
Expand Down Expand Up @@ -499,6 +508,68 @@ public void testStreamError() throws IOException, InterruptedException, Executio
}
}

@Test
public void testStreamSchemaMisMatchError() throws IOException, InterruptedException {
WriteStream writeStream =
client.createWriteStream(
CreateWriteStreamRequest.newBuilder()
.setParent(tableId)
.setWriteStream(
WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build())
.build());

try (StreamWriter streamWriter =
StreamWriter.newBuilder(writeStream.getName())
.setWriterSchema(ProtoSchemaConverter.convert(UpdatedFooType.getDescriptor()))
.build()) {
// Create a proto row that has extra fields than the table schema defined which should trigger
// the SCHEMA_MISMATCH_EXTRA_FIELDS error
ApiFuture<AppendRowsResponse> response =
streamWriter.append(CreateProtoRowsMultipleColumns(new String[] {"a"}), /*offset=*/ 0);
try {
response.get();
Assert.fail("Should fail");
} catch (ExecutionException e) {
// TODO(stephwang): update test case when toStroageException is updated
assertThat(e.getCause().getMessage())
.contains(
"io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Input schema has more fields than BigQuery schema");
}
}
}

@Test
public void testStreamFinalizedError()
throws IOException, InterruptedException, ExecutionException {
WriteStream writeStream =
client.createWriteStream(
CreateWriteStreamRequest.newBuilder()
.setParent(tableId)
.setWriteStream(
WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build())
.build());
try (StreamWriter streamWriter =
StreamWriter.newBuilder(writeStream.getName())
.setWriterSchema(ProtoSchemaConverter.convert(UpdatedFooType.getDescriptor()))
.build()) {
// Finalize the stream in order to trigger STREAM_FINALIZED error
client.finalizeWriteStream(
FinalizeWriteStreamRequest.newBuilder().setName(writeStream.getName()).build());
// Try to append to a finalized stream
ApiFuture<AppendRowsResponse> response =
streamWriter.append(CreateProtoRowsMultipleColumns(new String[] {"a"}), /*offset=*/ 0);
try {
response.get();
Assert.fail("Should fail");
} catch (ExecutionException e) {
// //TODO(stephwang): update test case when toStroageException is updated
assertThat(e.getCause().getMessage())
.contains(
"io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Stream has been finalized and cannot be appended");
}
}
}

@Test
public void testStreamReconnect() throws IOException, InterruptedException, ExecutionException {
WriteStream writeStream =
Expand Down