Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
5a63d95
feat: Split writer into connection worker and wrapper, this is a
GaoleMeng Sep 9, 2022
5a13302
feat: add connection worker pool skeleton, used for multiplexing client
GaoleMeng Sep 13, 2022
0297204
Merge branch 'main' into main
GaoleMeng Sep 14, 2022
8a81ad3
feat: add Load api for connection worker for multiplexing client
GaoleMeng Sep 14, 2022
68fd040
Merge remote-tracking branch 'upstream/main'
GaoleMeng Sep 14, 2022
3106dae
Merge remote-tracking branch 'upstream/main'
GaoleMeng Sep 15, 2022
5bf04e5
Merge branch 'googleapis:main' into main
GaoleMeng Sep 15, 2022
2fc7551
Merge branch 'main' of https://github.com/GaoleMeng/java-bigquerystorage
GaoleMeng Sep 15, 2022
7a6d919
feat: add multiplexing support to connection worker. We will treat every
GaoleMeng Sep 15, 2022
3ba7659
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 16, 2022
f379a78
Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 16, 2022
9307776
Merge branch 'main' of https://github.com/GaoleMeng/java-bigquerystorage
GaoleMeng Sep 16, 2022
de73013
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 16, 2022
19005a1
feat: port the multiplexing client core algorithm and basic tests
GaoleMeng Sep 19, 2022
c5d14ba
Merge branch 'googleapis:main' into main
GaoleMeng Sep 19, 2022
644360a
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 20, 2022
3099d82
Merge branch 'googleapis:main' into main
GaoleMeng Sep 20, 2022
e707dd6
Merge branch 'googleapis:main' into main
GaoleMeng Sep 20, 2022
9e7a8fa
Merge branch 'main' of https://github.com/GaoleMeng/java-bigquerystorage
GaoleMeng Sep 20, 2022
31f1755
Merge branch 'googleapis:main' into main
GaoleMeng Sep 20, 2022
44c36fc
feat: wire multiplexing connection pool to stream writer
GaoleMeng Sep 20, 2022
87a4036
feat: some fixes for multiplexing client
GaoleMeng Sep 23, 2022
c92ea1b
Merge remote-tracking branch 'upstream/main'
GaoleMeng Sep 23, 2022
019520c
Merge branch 'googleapis:main' into main
GaoleMeng Sep 26, 2022
47893df
feat: fix some todos, and reject the mixed behavior of passed in clie…
GaoleMeng Sep 27, 2022
8bd4e6a
Merge remote-tracking branch 'upstream/main'
GaoleMeng Sep 27, 2022
83409b0
Merge remote-tracking branch 'upstream/main'
GaoleMeng Sep 27, 2022
f7dd72d
Merge branch 'googleapis:main' into main
GaoleMeng Sep 27, 2022
a48399f
Merge branch 'googleapis:main' into main
GaoleMeng Sep 29, 2022
6789bc9
feat: fix the bug that we may peek into the write_stream field but it's
GaoleMeng Sep 29, 2022
46b4e6c
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 29, 2022
dfd4dd9
Merge branch 'googleapis:main' into main
GaoleMeng Sep 29, 2022
d68ae70
feat: fix the bug that we may peek into the write_stream field but it's
GaoleMeng Sep 29, 2022
2983fe9
Merge branch 'main' of https://github.com/GaoleMeng/java-bigquerystorage
GaoleMeng Sep 29, 2022
d406256
Merge branch 'googleapis:main' into main
GaoleMeng Oct 13, 2022
22e9e07
feat: add getInflightWaitSeconds implementation
GaoleMeng Oct 13, 2022
fdb4e1c
Merge branch 'googleapis:main' into main
GaoleMeng Oct 21, 2022
0469474
Merge branch 'googleapis:main' into main
GaoleMeng Nov 2, 2022
d1b7740
feat: Add schema comparision in connection loop to ensure schema upda…
GaoleMeng Nov 3, 2022
e4cd529
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Nov 4, 2022
74ff1c4
Merge branch 'googleapis:main' into main
GaoleMeng Nov 4, 2022
762f49e
feat: add schema update support to multiplexing
GaoleMeng Nov 5, 2022
de456c2
Merge branch 'googleapis:main' into main
GaoleMeng Nov 11, 2022
c2f6edc
Merge branch 'googleapis:main' into main
GaoleMeng Nov 15, 2022
2487227
fix: fix windows build bug: windows Instant resolution is different with
GaoleMeng Nov 15, 2022
084d6d1
fix: fix another failing tests for windows build
GaoleMeng Nov 16, 2022
89c9701
Merge branch 'main' of https://github.com/GaoleMeng/java-bigquerystorage
GaoleMeng Nov 16, 2022
8441518
fix: fix another test failure for Windows build
GaoleMeng Nov 16, 2022
d249add
Merge branch 'googleapis:main' into main
GaoleMeng Nov 30, 2022
83aa7ff
feat: Change new thread for each retry to be a thread pool to avoid
GaoleMeng Nov 30, 2022
92a9c36
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Nov 30, 2022
a713a52
Merge branch 'googleapis:main' into main
GaoleMeng Nov 30, 2022
a042d5c
fix: add back the background executor provider that's accidentally
GaoleMeng Nov 30, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
feat: some fixes for multiplexing client
  • Loading branch information
GaoleMeng committed Sep 23, 2022
commit 87a403671697ae9b4e7ed8d6e1a9bde867396079
20 changes: 20 additions & 0 deletions google-cloud-bigquerystorage/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,24 @@
<className>com/google/cloud/bigquery/storage/v1/ConnectionWorker</className>
<method>com.google.api.core.ApiFuture append(com.google.cloud.bigquery.storage.v1.ProtoRows)</method>
</difference>
<difference>
<differenceType>7002</differenceType>
<className>com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool$Settings$Builder</className>
<method>com.google.cloud.bigquery.storage.v1.ConnectionWorkerPool$Settings$Builder setMaxConnectionsPerPool(int)</method>
</difference>
<difference>
<differenceType>7013</differenceType>
<className>com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool$Settings$Builder</className>
<method>com.google.cloud.bigquery.storage.v1.ConnectionWorkerPool$Settings$Builder setMaxConnectionsPerRegion(int)</method>
</difference>
<difference>
<differenceType>7002</differenceType>
<className>com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool$Settings$Builder</className>
<method>com.google.cloud.bigquery.storage.v1.ConnectionWorkerPool$Settings$Builder setMinConnectionsPerPool(int)</method>
</difference>
<difference>
<differenceType>7013</differenceType>
<className>com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool$Settings$Builder</className>
<method>com.google.cloud.bigquery.storage.v1.ConnectionWorkerPool$Settings$Builder setMinConnectionsPerRegion(int)</method>
</difference>
</differences>
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.google.api.core.ApiFuture;
import com.google.api.gax.batching.FlowController;
import com.google.api.gax.rpc.FixedHeaderProvider;
import com.google.auto.value.AutoValue;
import com.google.cloud.bigquery.storage.v1.ConnectionWorker.Load;
import com.google.common.base.Stopwatch;
Expand Down Expand Up @@ -153,24 +154,24 @@ public abstract static class Settings {
* The minimum connections each pool created before trying to reuse the previously created
* connection in multiplexing mode.
*/
abstract int minConnectionsPerPool();
abstract int minConnectionsPerRegion();

/** The maximum connections per connection pool. */
abstract int maxConnectionsPerPool();
abstract int maxConnectionsPerRegion();

public static Builder builder() {
return new AutoValue_ConnectionWorkerPool_Settings.Builder()
.setMinConnectionsPerPool(2)
.setMaxConnectionsPerPool(10);
.setMinConnectionsPerRegion(2)
.setMaxConnectionsPerRegion(10);
}

/** Builder for the options to config {@link ConnectionWorkerPool}. */
@AutoValue.Builder
public abstract static class Builder {
// TODO(gaole) rename to per location for easier understanding.
public abstract Builder setMinConnectionsPerPool(int value);
public abstract Builder setMinConnectionsPerRegion(int value);

public abstract Builder setMaxConnectionsPerPool(int value);
public abstract Builder setMaxConnectionsPerRegion(int value);

public abstract Settings build();
}
Expand All @@ -192,7 +193,7 @@ public ConnectionWorkerPool(
this.traceId = traceId;
this.client = client;
this.ownsBigQueryWriteClient = ownsBigQueryWriteClient;
this.currentMaxConnectionCount = settings.minConnectionsPerPool();
this.currentMaxConnectionCount = settings.minConnectionsPerRegion();
}

/**
Expand Down Expand Up @@ -266,13 +267,13 @@ private ConnectionWorker createOrReuseConnectionWorker(
ImmutableList.copyOf(connectionWorkerPool));
if (!existingBestConnection.getLoad().isOverwhelmed()) {
return existingBestConnection;
} else if (currentMaxConnectionCount < settings.maxConnectionsPerPool()) {
} else if (currentMaxConnectionCount < settings.maxConnectionsPerRegion()) {
// At this point, we have reached the connection cap and the selected connection is
// overwhelmed, we can try scale up the connection pool.
// The connection count will go up one by one until `maxConnectionsPerPool` is reached.
currentMaxConnectionCount += 1;
if (currentMaxConnectionCount > settings.maxConnectionsPerPool()) {
currentMaxConnectionCount = settings.maxConnectionsPerPool();
if (currentMaxConnectionCount > settings.maxConnectionsPerRegion()) {
currentMaxConnectionCount = settings.maxConnectionsPerRegion();
}
return createConnectionWorker(streamWriter.getStreamName(), streamWriter.getProtoSchema());
} else {
Expand Down Expand Up @@ -323,6 +324,20 @@ private ConnectionWorker createConnectionWorker(String streamName, ProtoSchema w
// Though atomic integer is super lightweight, add extra if check in case adding future logic.
testValueCreateConnectionCount.getAndIncrement();
}
// TODO(gaole): figure out a better way to handle header / request body mismatch
// currently we use different header for the client in each connection worker to be different
// as the backend require the header to have the same write_stream field as request body.
BigQueryWriteClient clientAfterModification = client;
if (ownsBigQueryWriteClient) {
BigQueryWriteSettings settings = client.getSettings();
BigQueryWriteSettings stubSettings =
settings.toBuilder()
.setHeaderProvider(
FixedHeaderProvider.create(
"x-goog-request-params", "write_stream=" + streamName))
.build();
clientAfterModification = BigQueryWriteClient.create(stubSettings);
}
ConnectionWorker connectionWorker =
new ConnectionWorker(
streamName,
Expand All @@ -331,7 +346,7 @@ private ConnectionWorker createConnectionWorker(String streamName, ProtoSchema w
maxInflightBytes,
limitExceededBehavior,
traceId,
client,
clientAfterModification,
ownsBigQueryWriteClient);
connectionWorkerPool.add(connectionWorker);
log.info(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.bigquery.storage.v1.Exceptions.AppendSerializtionError;
import com.google.cloud.bigquery.storage.v1.StreamWriter.SingleConnectionOrConnectionPool.Kind;
import com.google.common.base.Preconditions;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Descriptors.Descriptor;
Expand Down Expand Up @@ -60,6 +61,7 @@ public class JsonStreamWriter implements AutoCloseable {
private long totalMessageSize = 0;
private long absTotal = 0;
private ProtoSchema protoSchema;
private boolean enableConnectionPool = false;

/**
* Constructs the JsonStreamWriter
Expand Down Expand Up @@ -87,6 +89,7 @@ private JsonStreamWriter(Builder builder)
builder.endpoint,
builder.flowControlSettings,
builder.traceId);
streamWriterBuilder.setEnableConnectionPool(builder.enableConnectionPool);
this.streamWriter = streamWriterBuilder.build();
this.streamName = builder.streamName;
this.tableSchema = builder.tableSchema;
Expand Down Expand Up @@ -124,8 +127,10 @@ public ApiFuture<AppendRowsResponse> append(JSONArray jsonArr, long offset)
throws IOException, DescriptorValidationException {
// Handle schema updates in a Thread-safe way by locking down the operation
synchronized (this) {
TableSchema updatedSchema = this.streamWriter.getUpdatedSchema();
if (updatedSchema != null) {
// Update schema only work when connection pool is not enabled.
if (this.streamWriter.getConnectionOperationType() == Kind.CONNECTION_WORKER
&& this.streamWriter.getUpdatedSchema() != null) {
TableSchema updatedSchema = this.streamWriter.getUpdatedSchema();
// Close the StreamWriter
this.streamWriter.close();
// Update JsonStreamWriter's TableSchema and Descriptor
Expand Down Expand Up @@ -301,6 +306,9 @@ public static final class Builder {
private String traceId;
private boolean ignoreUnknownFields = false;
private boolean reconnectAfter10M = false;
// Indicte whether multiplexing mode is enabled.
private boolean enableConnectionPool = false;
private String location;

private static String streamPatternString =
"(projects/[^/]+/datasets/[^/]+/tables/[^/]+)/streams/[^/]+";
Expand Down Expand Up @@ -427,6 +435,31 @@ public Builder setReconnectAfter10M(boolean reconnectAfter10M) {
return this;
}

/**
* Enable multiplexing for this writer. In multiplexing mode tables will share the same
* connection if possible until the connection is overwhelmed.
* This feature is still under development, please contact write api team before using.
*
* @param enableConnectionPool
* @return Builder
*/
public Builder setEnableConnectionPool(boolean enableConnectionPool) {
this.enableConnectionPool = enableConnectionPool;
return this;
}

/**
* Location of the table this stream writer is targeting.
* Connection pools are shared by location.
*
* @param location
* @return Builder
*/
public Builder setLocation(String location) {
this.location = location;
return this;
}

/**
* Builds JsonStreamWriter
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.auto.value.AutoOneOf;
import com.google.auto.value.AutoValue;
import com.google.cloud.bigquery.storage.v1.StreamWriter.Builder.ConnectionMode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.grpc.Status;
Expand All @@ -33,6 +32,7 @@
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;

/**
Expand Down Expand Up @@ -180,7 +180,7 @@ private StreamWriter(Builder builder) throws IOException {
client = builder.client;
ownsBigQueryWriteClient = false;
}
if (builder.connectionMode == ConnectionMode.SINGLE_TABLE) {
if (!builder.enableConnectionPool) {
this.singleConnectionOrConnectionPool =
SingleConnectionOrConnectionPool.ofSingleConnection(
new ConnectionWorker(
Expand Down Expand Up @@ -212,22 +212,31 @@ private StreamWriter(Builder builder) throws IOException {
builder.traceId,
client,
ownsBigQueryWriteClient)));
validateFetchedConnectonPool(client, builder);
validateFetchedConnectonPool(builder);
// Shut down the passed in client. Internally we will create another client inside connection
// pool for every new connection worker.
// TODO(gaole): instead of perform close outside of pool approach, change to always create
// new client in connection.
if (client != singleConnectionOrConnectionPool.connectionWorkerPool().bigQueryWriteClient()
&& ownsBigQueryWriteClient) {
client.shutdown();
try {
client.awaitTermination(150, TimeUnit.SECONDS);
} catch (InterruptedException unused) {
// Ignore interruption as this client is not used.
}
client.close();
}
}
}

// Validate whether the fetched connection pool matched certain properties.
private void validateFetchedConnectonPool(
BigQueryWriteClient client, StreamWriter.Builder builder) {
private void validateFetchedConnectonPool(StreamWriter.Builder builder) {
String paramsValidatedFailed = "";
if (!Objects.equals(
this.singleConnectionOrConnectionPool.connectionWorkerPool().getTraceId(),
builder.traceId)) {
paramsValidatedFailed = "Trace id";
} else if (!Objects.equals(
this.singleConnectionOrConnectionPool.connectionWorkerPool().bigQueryWriteClient(),
client)) {
paramsValidatedFailed = "Bigquery write client";
} else if (!Objects.equals(
this.singleConnectionOrConnectionPool.connectionWorkerPool().limitExceededBehavior(),
builder.limitExceededBehavior)) {
Expand Down Expand Up @@ -341,19 +350,6 @@ SingleConnectionOrConnectionPool.Kind getConnectionOperationType() {

/** A builder of {@link StreamWriter}s. */
public static final class Builder {
/** Operation mode for the internal connection pool. */
public enum ConnectionMode {
// Create a connection per given write stream.
SINGLE_TABLE,
// Share a connection for multiple tables. This mode is only effective in default stream case.
// Some key characteristics:
// 1. tables within the same pool has to be in the same location.
// 2. Close(streamReference) will not close connection immediately until all tables on
// this connection is closed.
// 3. Try to use one stream per table at first and share stream later.
MULTIPLEXING
}

private static final long DEFAULT_MAX_INFLIGHT_REQUESTS = 1000L;

private static final long DEFAULT_MAX_INFLIGHT_BYTES = 100 * 1024 * 1024; // 100Mb.
Expand All @@ -379,14 +375,14 @@ public enum ConnectionMode {
private FlowController.LimitExceededBehavior limitExceededBehavior =
FlowController.LimitExceededBehavior.Block;

private ConnectionMode connectionMode = ConnectionMode.SINGLE_TABLE;

private String traceId = null;

private TableSchema updatedTableSchema = null;

private String location;

private boolean enableConnectionPool = false;

private Builder(String streamName) {
this.streamName = Preconditions.checkNotNull(streamName);
this.client = null;
Expand Down Expand Up @@ -419,8 +415,16 @@ public Builder setEndpoint(String endpoint) {
return this;
}

public Builder enableConnectionPool() {
this.connectionMode = ConnectionMode.MULTIPLEXING;
/**
* Enable multiplexing for this writer. In multiplexing mode tables will share the same
* connection if possible until the connection is overwhelmed.
* This feature is still under development, please contact write api team before using.
*
* @param enableConnectionPool
* @return Builder
*/
public Builder setEnableConnectionPool(boolean enableConnectionPool) {
this.enableConnectionPool = enableConnectionPool;
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,8 @@ private void testSendRequestsToMultiTable(
throws IOException, ExecutionException, InterruptedException {
ConnectionWorkerPool.setOptions(
Settings.builder()
.setMinConnectionsPerPool(2)
.setMaxConnectionsPerPool(maxConnections)
.setMinConnectionsPerRegion(2)
.setMaxConnectionsPerRegion(maxConnections)
.build());
ConnectionWorkerPool connectionWorkerPool =
createConnectionWorkerPool(maxRequests, /*maxBytes=*/ 100000);
Expand Down Expand Up @@ -201,7 +201,7 @@ private void testSendRequestsToMultiTable(
@Test
public void testMultiStreamClosed_multiplexingEnabled() throws Exception {
ConnectionWorkerPool.setOptions(
Settings.builder().setMaxConnectionsPerPool(10).setMinConnectionsPerPool(5).build());
Settings.builder().setMaxConnectionsPerRegion(10).setMinConnectionsPerRegion(5).build());
ConnectionWorkerPool connectionWorkerPool =
createConnectionWorkerPool(/*maxRequests=*/ 3, /*maxBytes=*/ 1000);

Expand Down Expand Up @@ -250,7 +250,7 @@ public void testMultiStreamClosed_multiplexingEnabled() throws Exception {
@Test
public void testMultiStreamAppend_appendWhileClosing() throws Exception {
ConnectionWorkerPool.setOptions(
Settings.builder().setMaxConnectionsPerPool(10).setMinConnectionsPerPool(5).build());
Settings.builder().setMaxConnectionsPerRegion(10).setMinConnectionsPerRegion(5).build());
ConnectionWorkerPool connectionWorkerPool =
createConnectionWorkerPool(/*maxRequests=*/ 3, /*maxBytes=*/ 100000);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ private StreamWriter getMultiplexingTestStreamWriter() throws IOException {
.setWriterSchema(createProtoSchema())
.setTraceId(TEST_TRACE_ID)
.setLocation("US")
.enableConnectionPool()
.setEnableConnectionPool(true)
.build();
}

Expand Down Expand Up @@ -722,6 +722,26 @@ public void testInitialization_operationKind() throws Exception {
}
}

@Test
public void testInitializationTwice_closeSecondClient() throws Exception {
BigQueryWriteClient client2 = BigQueryWriteClient.create(
BigQueryWriteSettings.newBuilder()
.setCredentialsProvider(NoCredentialsProvider.create())
.setTransportChannelProvider(serviceHelper.createChannelProvider())
.build());

StreamWriter streamWriter1 = getMultiplexingTestStreamWriter();
StreamWriter streamWriter2 = StreamWriter.newBuilder(TEST_STREAM, client2)
.setWriterSchema(createProtoSchema())
.setTraceId(TEST_TRACE_ID)
.setLocation("US")
.setEnableConnectionPool(true)
.build();

// The second passed in client will be closed
assertTrue(client2.isShutdown());
}

// Timeout to ensure close() doesn't wait for done callback timeout.
@Test(timeout = 10000)
public void testCloseDisconnectedStream() throws Exception {
Expand Down