Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@
import com.google.cloud.storage.BufferedReadableByteChannelSession.BufferedReadableByteChannel;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import org.checkerframework.checker.nullness.qual.Nullable;

abstract class BaseStorageReadChannel<T> implements StorageReadChannel {

private boolean open;
private ByteRangeSpec byteRangeSpec;
private int chunkSize = _2MiB;
private BufferHandle bufferHandle;
Expand All @@ -34,6 +36,7 @@ abstract class BaseStorageReadChannel<T> implements StorageReadChannel {
@Nullable private T resolvedObject;

protected BaseStorageReadChannel() {
this.open = true;
this.byteRangeSpec = ByteRangeSpec.nullRange();
}

Expand All @@ -45,16 +48,12 @@ public final synchronized void setChunkSize(int chunkSize) {

@Override
public final synchronized boolean isOpen() {
if (lazyReadChannel == null) {
return true;
} else {
LazyReadChannel<T> tmp = internalGetLazyChannel();
return tmp.isOpen();
}
return open;
}

@Override
public final synchronized void close() {
open = false;
if (internalGetLazyChannel().isOpen()) {
StorageException.wrapIOException(internalGetLazyChannel().getChannel()::close);
}
Expand All @@ -75,17 +74,23 @@ public final ByteRangeSpec getByteRangeSpec() {

@Override
public final synchronized int read(ByteBuffer dst) throws IOException {
// BlobReadChannel only considered itself closed if close had been called on it.
if (!open) {
throw new ClosedChannelException();
}
long diff = byteRangeSpec.length();
if (diff <= 0) {
close();
return -1;
}
try {
int read = internalGetLazyChannel().getChannel().read(dst);
// trap if the fact that tmp is already closed, and instead return -1
BufferedReadableByteChannel tmp = internalGetLazyChannel().getChannel();
if (!tmp.isOpen()) {
return -1;
}
int read = tmp.read(dst);
if (read != -1) {
byteRangeSpec = byteRangeSpec.withShiftBeginOffset(read);
} else {
close();
}
return read;
} catch (StorageException e) {
Expand Down Expand Up @@ -128,15 +133,16 @@ protected void setResolvedObject(@Nullable T resolvedObject) {
protected abstract LazyReadChannel<T> newLazyReadChannel();

private void maybeResetChannel(boolean freeBuffer) throws IOException {
if (lazyReadChannel != null && lazyReadChannel.isOpen()) {
try (BufferedReadableByteChannel ignore = lazyReadChannel.getChannel()) {
if (bufferHandle != null && !freeBuffer) {
bufferHandle.get().clear();
} else if (freeBuffer) {
bufferHandle = null;
}
lazyReadChannel = null;
if (lazyReadChannel != null) {
if (lazyReadChannel.isOpen()) {
lazyReadChannel.getChannel().close();
}
if (bufferHandle != null && !freeBuffer) {
bufferHandle.get().clear();
} else if (freeBuffer) {
bufferHandle = null;
}
lazyReadChannel = null;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,13 @@

package com.google.cloud.storage.it;

import static com.google.cloud.storage.TestUtils.assertAll;
import static com.google.cloud.storage.TestUtils.xxd;
import static com.google.common.truth.Truth.assertThat;
import static java.nio.charset.StandardCharsets.UTF_8;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.fail;

import com.google.cloud.ReadChannel;
Expand Down Expand Up @@ -52,6 +54,7 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.file.Files;
Expand Down Expand Up @@ -372,6 +375,40 @@ public void seekAfterReadWorks() throws IOException {
}
}

@Test
public void seekBackToStartAfterReachingEndOfObjectWorks() throws IOException {
ObjectAndContent obj512KiB = objectsFixture.getObj512KiB();
BlobInfo gen1 = obj512KiB.getInfo();
byte[] bytes = obj512KiB.getContent().getBytes();

int from = bytes.length - 5;
byte[] expected1 = Arrays.copyOfRange(bytes, from, bytes.length);

String xxdExpected1 = xxd(expected1);
String xxdExpected2 = xxd(bytes);
try (ReadChannel reader = storage.reader(gen1.getBlobId())) {
// seek forward to a new offset
reader.seek(from);

try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
WritableByteChannel out = Channels.newChannel(baos)) {
ByteStreams.copy(reader, out);
String xxd = xxd(baos.toByteArray());
assertThat(xxd).isEqualTo(xxdExpected1);
}

// seek back to the beginning
reader.seek(0);
// read again
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
WritableByteChannel out = Channels.newChannel(baos)) {
ByteStreams.copy(reader, out);
String xxd = xxd(baos.toByteArray());
assertThat(xxd).isEqualTo(xxdExpected2);
}
}
}

@Test
public void limitAfterReadWorks() throws IOException {
ObjectAndContent obj512KiB = objectsFixture.getObj512KiB();
Expand Down Expand Up @@ -469,6 +506,29 @@ public void responseWith416ReturnsZeroAndLeavesTheChannelOpen() throws IOExcepti
}
}

/** Read channel does not consider itself closed once it returns {@code -1} from read. */
@Test
public void readChannelIsAlwaysOpen_willReturnNegative1UntilExplicitlyClosed() throws Exception {
int length = 10;
byte[] bytes = DataGenerator.base64Characters().genBytes(length);

BlobInfo info1 = BlobInfo.newBuilder(bucket, generator.randomObjectName()).build();
Blob gen1 = storage.create(info1, bytes, BlobTargetOption.doesNotExist());

try (ReadChannel reader = storage.reader(gen1.getBlobId())) {
ByteBuffer buf = ByteBuffer.allocate(length * 2);
int read = reader.read(buf);
assertAll(
() -> assertThat(read).isEqualTo(length), () -> assertThat(reader.isOpen()).isTrue());
int read2 = reader.read(buf);
assertAll(() -> assertThat(read2).isEqualTo(-1), () -> assertThat(reader.isOpen()).isTrue());
int read3 = reader.read(buf);
assertAll(() -> assertThat(read3).isEqualTo(-1), () -> assertThat(reader.isOpen()).isTrue());
reader.close();
assertThrows(ClosedChannelException.class, () -> reader.read(buf));
}
}

private void captureAndRestoreTest(@Nullable Integer position, @Nullable Integer endOffset)
throws IOException {
ObjectAndContent obj512KiB = objectsFixture.getObj512KiB();
Expand Down