Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ public WritableByteChannelSession<?, BlobInfo> writeSession(
grpc.storageClient
.writeObjectCallable()
.withDefaultCallContext(grpcCallContext))
.setHasher(Hasher.noop())
.setHasher(opts.getHasher())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

love how the call site changes are so clean and contained 🎉

.setByteStringStrategy(ByteStringStrategy.copy())
.resumable()
.withRetryConfig(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,14 @@ final class GapicWritableByteChannelSessionBuilder {
GapicWritableByteChannelSessionBuilder(
ClientStreamingCallable<WriteObjectRequest, WriteObjectResponse> write) {
this.write = write;
this.hasher = Hasher.noop();
this.hasher = Hasher.defaultHasher();
this.byteStringStrategy = ByteStringStrategy.copy();
}

/**
* Set the {@link Hasher} to apply to the bytes passing through the built session's channel.
*
* <p>Default: {@link Hasher#noop()}
* <p>Default: {@link Hasher#defaultHasher()}
*
* @see Hasher#enabled()
* @see Hasher#noop()
Expand Down Expand Up @@ -179,14 +179,17 @@ UnbufferedDirectUploadBuilder setRequest(WriteObjectRequest req) {
}

UnbufferedWritableByteChannelSession<WriteObjectResponse> build() {
ChunkSegmenter chunkSegmenter = getChunkSegmenter();
return new UnbufferedWriteSession<>(
ApiFutures.immediateFuture(requireNonNull(req, "req must be non null")),
lift((WriteObjectRequest start, SettableApiFuture<WriteObjectResponse> resultFuture) ->
new GapicUnbufferedDirectWritableByteChannel(
resultFuture,
getChunkSegmenter(),
chunkSegmenter,
write,
new WriteCtx<>(WriteObjectRequestBuilderFactory.simple(start))))
WriteCtx.of(
WriteObjectRequestBuilderFactory.simple(start),
chunkSegmenter.getHasher())))
.andThen(StorageByteChannels.writable()::createSynchronized));
}
}
Expand All @@ -207,14 +210,17 @@ BufferedDirectUploadBuilder setRequest(WriteObjectRequest req) {
}

BufferedWritableByteChannelSession<WriteObjectResponse> build() {
ChunkSegmenter chunkSegmenter = getChunkSegmenter();
return new BufferedWriteSession<>(
ApiFutures.immediateFuture(requireNonNull(req, "req must be non null")),
lift((WriteObjectRequest start, SettableApiFuture<WriteObjectResponse> resultFuture) ->
new GapicUnbufferedDirectWritableByteChannel(
resultFuture,
getChunkSegmenter(),
chunkSegmenter,
write,
new WriteCtx<>(WriteObjectRequestBuilderFactory.simple(start))))
WriteCtx.of(
WriteObjectRequestBuilderFactory.simple(start),
chunkSegmenter.getHasher())))
.andThen(c -> new DefaultBufferedWritableByteChannel(bufferHandle, c))
.andThen(StorageByteChannels.writable()::createSynchronized));
}
Expand Down Expand Up @@ -290,20 +296,24 @@ UnbufferedResumableUploadBuilder setStartAsync(ApiFuture<ResumableWrite> start)

UnbufferedWritableByteChannelSession<WriteObjectResponse> build() {
RetrierWithAlg boundRetrier = retrier;
ChunkSegmenter chunkSegmenter = getChunkSegmenter();
return new UnbufferedWriteSession<>(
requireNonNull(start, "start must be non null"),
lift((ResumableWrite start, SettableApiFuture<WriteObjectResponse> result) -> {
if (fsyncEvery) {
return new GapicUnbufferedChunkedResumableWritableByteChannel(
result,
getChunkSegmenter(),
chunkSegmenter,
write,
new WriteCtx<>(start),
WriteCtx.of(start, chunkSegmenter.getHasher()),
boundRetrier,
Retrying::newCallContext);
} else {
return new GapicUnbufferedFinalizeOnCloseResumableWritableByteChannel(
result, getChunkSegmenter(), write, new WriteCtx<>(start));
result,
chunkSegmenter,
write,
WriteCtx.of(start, chunkSegmenter.getHasher()));
}
})
.andThen(StorageByteChannels.writable()::createSynchronized));
Expand All @@ -330,20 +340,24 @@ BufferedResumableUploadBuilder setStartAsync(ApiFuture<ResumableWrite> start) {
}

BufferedWritableByteChannelSession<WriteObjectResponse> build() {
ChunkSegmenter chunkSegmenter = getChunkSegmenter();
return new BufferedWriteSession<>(
requireNonNull(start, "start must be non null"),
lift((ResumableWrite start, SettableApiFuture<WriteObjectResponse> result) -> {
if (fsyncEvery) {
return new GapicUnbufferedChunkedResumableWritableByteChannel(
result,
getChunkSegmenter(),
chunkSegmenter,
write,
new WriteCtx<>(start),
WriteCtx.of(start, chunkSegmenter.getHasher()),
retrier,
Retrying::newCallContext);
} else {
return new GapicUnbufferedFinalizeOnCloseResumableWritableByteChannel(
result, getChunkSegmenter(), write, new WriteCtx<>(start));
result,
chunkSegmenter,
write,
WriteCtx.of(start, chunkSegmenter.getHasher()));
}
})
.andThen(c -> new DefaultBufferedWritableByteChannel(bufferHandle, c))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ public Blob create(BlobInfo blobInfo, InputStream content, BlobWriteOption... op
GrpcCallContext grpcCallContext =
optsWithDefaults.grpcMetadataMapper().apply(GrpcCallContext.createDefault());
WriteObjectRequest req = getWriteObjectRequest(blobInfo, optsWithDefaults);
Hasher hasher = Hasher.enabled();
Hasher hasher = optsWithDefaults.getHasher();
GrpcCallContext merge = Utils.merge(grpcCallContext, Retrying.newCallContext());
UnbufferedWritableByteChannelSession<WriteObjectResponse> session =
ResumableMedia.gapic()
Expand Down Expand Up @@ -324,7 +324,7 @@ public Blob internalCreateFrom(Path path, BlobInfo info, Opts<ObjectTargetOpt> o
write,
storageClient.queryWriteStatusCallable(),
rw,
Hasher.noop()),
opts.getHasher()),
MoreExecutors.directExecutor());
try {
GrpcResumableSession got = session2.get();
Expand Down Expand Up @@ -365,7 +365,7 @@ public Blob createFrom(
.write()
.byteChannel(
storageClient.writeObjectCallable().withDefaultCallContext(grpcCallContext))
.setHasher(Hasher.noop())
.setHasher(opts.getHasher())
.setByteStringStrategy(ByteStringStrategy.noCopy())
.resumable()
.withRetryConfig(retrier.withAlg(retryAlgorithmManager.idempotent()))
Expand Down Expand Up @@ -779,7 +779,7 @@ public GrpcBlobWriteChannel writer(BlobInfo blobInfo, BlobWriteOption... options
GrpcCallContext grpcCallContext =
opts.grpcMetadataMapper().apply(GrpcCallContext.createDefault());
WriteObjectRequest req = getWriteObjectRequest(blobInfo, opts);
Hasher hasher = Hasher.noop();
Hasher hasher = opts.getHasher();
// in JSON, the starting of the resumable session happens before the invocation of write can
// happen. Emulate the same thing here.
// 1. create the future
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,15 @@ void validateUnchecked(Crc32cValue<?> expected, ByteString byteString)
@Nullable Crc32cLengthKnown nullSafeConcat(
@Nullable Crc32cLengthKnown r1, @NonNull Crc32cLengthKnown r2);

/**
* The initial value to use for this hasher.
*
* <p>Not ideal, really we should always start with {@link Crc32cValue#zero()} but this saves us
* from having to plumb the initial value along with the actual hasher to the constructor of the
* WriteCtx when hashing is disabled because of user provided crc32c/md5 preconditions.
*/
@Nullable Crc32cLengthKnown initialValue();

static NoOpHasher noop() {
return NoOpHasher.INSTANCE;
}
Expand Down Expand Up @@ -118,6 +127,11 @@ public void validateUnchecked(Crc32cValue<?> expected, ByteString byteString) {}
@Nullable Crc32cLengthKnown r1, @NonNull Crc32cLengthKnown r2) {
return null;
}

@Override
public @Nullable Crc32cLengthKnown initialValue() {
return null;
}
}

@Immutable
Expand Down Expand Up @@ -185,6 +199,11 @@ public Crc32cLengthKnown nullSafeConcat(
return r1.concat(r2);
}
}

@Override
public @NonNull Crc32cLengthKnown initialValue() {
return Crc32cValue.zero();
}
}

final class ChecksumMismatchException extends IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,8 @@ public WritableByteChannelSession<?, BlobInfo> writeSession(
grpcStorage.startResumableWrite(
grpcCallContext, grpcStorage.getWriteObjectRequest(info, opts), opts);
ApiFuture<WriteCtx<ResumableWrite>> start =
ApiFutures.transform(f, WriteCtx::new, MoreExecutors.directExecutor());
ApiFutures.transform(
f, s -> WriteCtx.of(s, opts.getHasher()), MoreExecutors.directExecutor());

ClientStreamingCallable<WriteObjectRequest, WriteObjectResponse> write =
grpcStorage.storageClient.writeObjectCallable().withDefaultCallContext(grpcCallContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.google.storage.v2.WriteObjectRequest;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;

final class WriteCtx<RequestFactoryT extends WriteObjectRequestBuilderFactory> {
Expand All @@ -33,22 +34,18 @@ final class WriteCtx<RequestFactoryT extends WriteObjectRequestBuilderFactory> {
private final AtomicLong confirmedBytes;
private final AtomicReference<@Nullable Crc32cLengthKnown> cumulativeCrc32c;

WriteCtx(RequestFactoryT requestFactory) {
this(requestFactory, null);
}

/**
* TODO: Remove initialValue and replace with Crc32cValue.zero() once all uploads have been
* updated to do e2e checksumming by default.
*/
@Deprecated
WriteCtx(RequestFactoryT requestFactory, @Nullable Crc32cLengthKnown initialValue) {
private WriteCtx(RequestFactoryT requestFactory, @Nullable Crc32cLengthKnown initialValue) {
this.requestFactory = requestFactory;
this.totalSentBytes = new AtomicLong(0);
this.confirmedBytes = new AtomicLong(0);
this.cumulativeCrc32c = new AtomicReference<>(initialValue);
}

static <RFT extends WriteObjectRequestBuilderFactory> WriteCtx<RFT> of(
RFT rft, @NonNull Hasher hasher) {
return new WriteCtx<>(rft, hasher.initialValue());
}

public RequestFactoryT getRequestFactory() {
return requestFactory;
}
Expand Down
Loading