Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 1 addition & 32 deletions google-cloud-bigquery/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
</parent>
<properties>
<site.installationModule>google-cloud-bigquery</site.installationModule>
<netty.version>4.1.119.Final</netty.version>
</properties>
<dependencies>
<dependency>
Expand Down Expand Up @@ -117,37 +116,6 @@
<dependency>
<groupId>org.apache.arrow</groupId>
<artifactId>arrow-memory-netty</artifactId>
<!--
Due to JDK8 requirement, we are forced to use arrow version below v18
which has io.netty:netty-common a dep. and its vulnerabilities
-->
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
</exclusion>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
</exclusion>
</exclusions>
<scope>runtime</scope>
</dependency>
<!--
Define io.netty versions to override org.apache.arrow:arrow-memory-netty
transitive dependency versions which contains vulnerabilities.
-->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
<version>${netty.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
<version>${netty.version}</version>
<scope>runtime</scope>
</dependency>

<!-- auto-value creates a class that uses an annotation from error_prone_annotations -->
Expand Down Expand Up @@ -290,6 +258,7 @@
<fork>true</fork>
<compilerArgs>
<arg>-J--add-opens=java.base/java.nio=org.apache.arrow.memory.core,ALL-UNNAMED</arg>
<arg>-J--add-opens=java.base/java.nio=java-base,ALL-UNNAMED</arg>
</compilerArgs>
</configuration>
</plugin>
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1,6 @@
Args = --add-opens=java.base/java.nio=ALL-UNNAMED
Args = --add-opens=java.base/java.nio=ALL-UNNAMED \

#--initialize-at-build-time=org.apache.arrow.memory.BaseAllocator \
#--initialize-at-build-time=org.apache.arrow.memory.BaseAllocator$Config \
#--initialize-at-build-time=org.apache.arrow.memory.DefaultAllocationManagerOption \
#--initialize-at-build-time=org.apache.arrow.memory.netty.NettyAllocationManager$1
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,48 @@
"name":"java.nio.DirectByteBuffer",
"methods":[{"name":"<init>","parameterTypes":["long","int"] }]
},

{
"name":"org.apache.arrow.memory.BaseAllocator",
"allDeclaredFields":true,
"queryAllDeclaredFields":true,
"queryAllDeclaredMethods":true,
"queryAllDeclaredConstructors":true
},
{
"name":"org.apache.arrow.memory.BaseAllocator$Config",
"allDeclaredFields":true,
"queryAllDeclaredFields":true,
"queryAllDeclaredMethods":true,
"queryAllDeclaredConstructors":true
},
{
"name":"org.apache.arrow.memory.DefaultAllocationManagerOption",
"allDeclaredFields":true,
"queryAllDeclaredFields":true,
"queryAllDeclaredMethods":true,
"queryAllDeclaredConstructors":true
},
{
"name":"org.apache.arrow.memory.netty.NettyAllocationManager$1",
"allDeclaredFields":true,
"queryAllDeclaredFields":true,
"queryAllDeclaredMethods":true,
"queryAllDeclaredConstructors":true
},
{
"name":"org.apache.arrow.memory.netty.DefaultAllocationManagerFactory",
"allDeclaredFields":true,
"queryAllDeclaredFields":true,
"queryAllDeclaredMethods":true,
"queryAllDeclaredConstructors":true
},
{
"name":"org.apache.arrow.memory.DefaultAllocationManagerFactory",
"fields":[{"name":"FACTORY"}]
"allDeclaredFields":true,
"queryAllDeclaredFields":true,
"queryAllDeclaredMethods":true,
"queryAllDeclaredConstructors":true
},
{
"name":"org.apache.arrow.vector.types.pojo.ArrowType",
Expand All @@ -50,6 +89,18 @@
"allDeclaredFields":true,
"queryAllDeclaredMethods":true
},
{
"name": "org.apache.arrow.vector.types.pojo.DictionaryEncoding",
"allDeclaredFields": true
},
{
"name": "org.apache.arrow.vector.types.pojo.Field",
"allDeclaredFields": true
},
{
"name": "org.apache.arrow.vector.types.pojo.Schema",
"allDeclaredFields": true
},
{
"name":"io.netty.buffer.AbstractReferenceCountedByteBuf",
"fields":[{"name":"refCnt"}]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,20 @@
{
"resources":{
"includes":[{
"pattern":"\\Qorg/apache/arrow/memory/DefaultAllocationManagerFactory.class\\E"
}]}
"includes":[
{
"pattern":"\\Qorg/apache/arrow/memory/DefaultAllocationManagerFactory.class\\E"
}
]
},
"globs":[
{
"glob": "org/apache/arrow/memory/DefaultAllocationManagerFactory.class"
},
{
"glob": "org/apache/arrow/memory/netty/DefaultAllocationManagerFactory.class"
},
{
"glob": "org/apache/arrow/memory/unsafe/DefaultAllocationManagerFactory.class"
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,23 @@
import org.junit.rules.Timeout;
import org.threeten.extra.PeriodDuration;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
public class ITBigQueryTest {
@Test
public void testArrowForName() throws Exception {
// mvn test -Pnative '-Dtest=com.google.cloud.bigquery.it.ITBigQueryTest#testArrowForName' -DtrimStackTrace=false
final String clazzName = "org.apache.arrow.memory.netty.DefaultAllocationManagerFactory";
try {
BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);
//java.lang.reflect.Field field = Class.forName(clazzName).getDeclaredField("FACTORY");
//field.setAccessible(true);
//System.out.println("OK");
} catch (Exception e) {
throw new RuntimeException("Unable to instantiate Allocation Manager for " + clazzName, e);
}
}


private static final byte[] BYTES = {0xD, 0xE, 0xA, 0xD};
private static final String BYTES_BASE64 = BaseEncoding.base64().encode(BYTES);
Expand Down Expand Up @@ -1064,118 +1080,6 @@ public CompletableResultCode shutdown() {

@Rule public Timeout globalTimeout = Timeout.seconds(300);

@BeforeClass
public static void beforeClass() throws InterruptedException, IOException {
RemoteBigQueryHelper bigqueryHelper = RemoteBigQueryHelper.create();
RemoteStorageHelper storageHelper = RemoteStorageHelper.create();
Map<String, String> labels = ImmutableMap.of("test-job-name", "test-load-job");
SdkTracerProvider tracerProvider =
SdkTracerProvider.builder()
.addSpanProcessor(SimpleSpanProcessor.create(new TestSpanExporter()))
.setSampler(Sampler.alwaysOn())
.build();
otel = OpenTelemetrySdk.builder().setTracerProvider(tracerProvider).buildAndRegisterGlobal();

bigquery = bigqueryHelper.getOptions().getService();
storage = storageHelper.getOptions().getService();
storage.create(BucketInfo.of(BUCKET));
storage.create(
BlobInfo.newBuilder(BUCKET, LOAD_FILE).setContentType("text/plain").build(),
CSV_CONTENT.getBytes(StandardCharsets.UTF_8));
storage.create(
BlobInfo.newBuilder(BUCKET, LOAD_FILE_NULL).setContentType("text/plain").build(),
CSV_CONTENT_NULL.getBytes(StandardCharsets.UTF_8));
storage.create(
BlobInfo.newBuilder(BUCKET, LOAD_FILE_FLEXIBLE_COLUMN_NAME)
.setContentType("text/plain")
.build(),
CSV_CONTENT_FLEXIBLE_COLUMN.getBytes(StandardCharsets.UTF_8));
storage.create(
BlobInfo.newBuilder(BUCKET, JSON_LOAD_FILE).setContentType("application/json").build(),
JSON_CONTENT.getBytes(StandardCharsets.UTF_8));
storage.create(
BlobInfo.newBuilder(BUCKET, JSON_LOAD_FILE_SIMPLE)
.setContentType("application/json")
.build(),
JSON_CONTENT_SIMPLE.getBytes(StandardCharsets.UTF_8));
InputStream stream =
ITBigQueryTest.class.getClassLoader().getResourceAsStream("QueryTestData.csv");
storage.createFrom(
BlobInfo.newBuilder(BUCKET, LOAD_FILE_LARGE).setContentType("text/plain").build(), stream);
storage.create(
BlobInfo.newBuilder(BUCKET, JSON_LOAD_FILE_BQ_RESULTSET)
.setContentType("application/json")
.build(),
JSON_CONTENT_BQ_RESULTSET.getBytes(StandardCharsets.UTF_8));
DatasetInfo info =
DatasetInfo.newBuilder(DATASET).setDescription(DESCRIPTION).setLabels(LABELS).build();
bigquery.create(info);
DatasetInfo info2 =
DatasetInfo.newBuilder(MODEL_DATASET).setDescription("java model lifecycle").build();
bigquery.create(info2);
DatasetInfo info3 =
DatasetInfo.newBuilder(ROUTINE_DATASET).setDescription("java routine lifecycle").build();
bigquery.create(info3);

LoadJobConfiguration configuration =
LoadJobConfiguration.newBuilder(
TABLE_ID, "gs://" + BUCKET + "/" + JSON_LOAD_FILE, FormatOptions.json())
.setCreateDisposition(JobInfo.CreateDisposition.CREATE_IF_NEEDED)
.setSchema(TABLE_SCHEMA)
.setLabels(labels)
.build();
Job job = bigquery.create(JobInfo.of(configuration));
job = job.waitFor();
assertNull(job.getStatus().getError());
LoadJobConfiguration loadJobConfiguration = job.getConfiguration();
assertEquals(labels, loadJobConfiguration.getLabels());

LoadJobConfiguration configurationFastQuery =
LoadJobConfiguration.newBuilder(
TABLE_ID_FASTQUERY, "gs://" + BUCKET + "/" + JSON_LOAD_FILE, FormatOptions.json())
.setCreateDisposition(JobInfo.CreateDisposition.CREATE_IF_NEEDED)
.setSchema(TABLE_SCHEMA)
.setLabels(labels)
.build();
Job jobFastQuery = bigquery.create(JobInfo.of(configurationFastQuery));
jobFastQuery = jobFastQuery.waitFor();
assertNull(jobFastQuery.getStatus().getError());

LoadJobConfiguration configFastQueryBQResultset =
LoadJobConfiguration.newBuilder(
TABLE_ID_FASTQUERY_BQ_RESULTSET,
"gs://" + BUCKET + "/" + JSON_LOAD_FILE_BQ_RESULTSET,
FormatOptions.json())
.setCreateDisposition(JobInfo.CreateDisposition.CREATE_IF_NEEDED)
.setSchema(BQ_RESULTSET_SCHEMA)
.setLabels(labels)
.build();
Job jobFastQueryBQResultSet = bigquery.create(JobInfo.of(configFastQueryBQResultset));
jobFastQueryBQResultSet = jobFastQueryBQResultSet.waitFor();
assertNull(jobFastQueryBQResultSet.getStatus().getError());

LoadJobConfiguration configurationDDL =
LoadJobConfiguration.newBuilder(
TABLE_ID_DDL, "gs://" + BUCKET + "/" + JSON_LOAD_FILE_SIMPLE, FormatOptions.json())
.setCreateDisposition(JobInfo.CreateDisposition.CREATE_IF_NEEDED)
.setSchema(DDL_TABLE_SCHEMA)
.setLabels(labels)
.build();
Job jobDDL = bigquery.create(JobInfo.of(configurationDDL));
jobDDL = jobDDL.waitFor();
assertNull(jobDDL.getStatus().getError());

LoadJobConfiguration configurationLargeTable =
LoadJobConfiguration.newBuilder(
TABLE_ID_LARGE, "gs://" + BUCKET + "/" + LOAD_FILE_LARGE, FormatOptions.csv())
.setCreateDisposition(JobInfo.CreateDisposition.CREATE_IF_NEEDED)
.setSchema(LARGE_TABLE_SCHEMA)
.setLabels(labels)
.build();
Job jobLargeTable = bigquery.create(JobInfo.of(configurationLargeTable));
jobLargeTable = jobLargeTable.waitFor();
assertNull(jobLargeTable.getStatus().getError());
}

@AfterClass
public static void afterClass() throws ExecutionException, InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Args = --initialize-at-build-time=org.junit
#Args = --initialize-at-build-time=org.mockito \
#--initialize-at-build-time=net.bytebuddy.TypeCache$WithInlineExpunction
Loading