Skip to content

JsonStreamWriter fails to append rows with updated schemas right after updating the table #1465

@bphenriques

Description

@bphenriques

Hello,

Me and my team are exploring the BigQuery Storage API to stream information to BigQuery (BUFFERED mode). The problem comes with schema updates. We can successfully update the BigQuery table's schema once we deem necessary, however appending rows with the new schemas using JsonStreamWriter fails as the underlying table schema is outdated leading to JSONObject has fields unknown to BigQuery: root.col2..

I am aware that #1447 adds support for table schemas, however it depends on the client making new calls to the append until the underlying schema is updated (looking at the test).

For reference, I copied your integration tests to our project (Scala) to ease discussion:

Works with `BUFFERED` mode
test("Simulate JsonStreamWriter with schema migrations using the BQ API directly") {
  val client = BigQueryWriteClient.create()
  val bigquery = BigQueryOptions.newBuilder
    .setProjectId("some-project")
    .build
    .getService
  val DATASET = "it-test"
  val tableName =
    "SchemaUpdateTestTable" + UUID.randomUUID().toString.replace("-", "").substring(0, 5)
  val tableId = TableId.of(DATASET, tableName)
  val col1 = Field.newBuilder("col1", StandardSQLTypeName.STRING).build
  val originalSchema = Schema.of(col1)
  val tableInfo = TableInfo.newBuilder(tableId, StandardTableDefinition.of(originalSchema)).build
  bigquery.create(tableInfo)
  val parent = TableName.of(ServiceOptions.getDefaultProjectId, DATASET, tableName)
  val writeStream = client.createWriteStream(
    CreateWriteStreamRequest.newBuilder
      .setParent(parent.toString)
      .setWriteStream(WriteStream.newBuilder.setType(WriteStream.Type.BUFFERED).build)
      .build
  )
  try {
    val jsonStreamWriter =
      JsonStreamWriter.newBuilder(writeStream.getName, writeStream.getTableSchema).build
    try { // write the 1st row
      var currentOffset = 0L
      val foo = new JSONObject()
      foo.put("col1", "aaa")
      val jsonArr = new JSONArray()
      jsonArr.put(foo)
      val response = jsonStreamWriter.append(jsonArr, currentOffset)
      currentOffset += jsonArr.length() - 1

      assertEquals(0L, response.get.getAppendResult.getOffset.getValue)

      assertEquals(
        0L,
        client
          .flushRows(
            FlushRowsRequest.newBuilder
              .setWriteStream(writeStream.getName)
              .setOffset(Int64Value.of(currentOffset))
              .build()
          )
          .getOffset,
      )

      // update schema with a new column
      val col2 = Field.newBuilder("col2", StandardSQLTypeName.STRING).build
      val updatedSchema = Schema.of(ImmutableList.of(col1, col2))
      val updatedTableInfo =
        TableInfo.newBuilder(tableId, StandardTableDefinition.of(updatedSchema)).build
      val updatedTable = bigquery.update(updatedTableInfo)
      assertEquals(
        updatedSchema,
        updatedTable.getDefinition.asInstanceOf[TableDefinition].getSchema,
      )
      // continue writing rows until backend acknowledges schema update
      val foo2 = new JSONObject()
      foo2.put("col1", "bbb")
      val jsonArr2 = new JSONArray()
      jsonArr2.put(foo2)
      var next = 0
      breakable {
        for (i <- 1 until 100) {
          val response2 = jsonStreamWriter.append(jsonArr2, i)
          currentOffset += jsonArr2.length()
          assertEquals(i.toLong, response2.get.getAppendResult.getOffset.getValue)
          if (response2.get.hasUpdatedSchema) {
            next = i
            break
          } else Thread.sleep(1000)
        }
      }
      assertEquals(
        currentOffset,
        client
          .flushRows(
            FlushRowsRequest.newBuilder
              .setWriteStream(writeStream.getName)
              .setOffset(Int64Value.of(currentOffset))
              .build()
          )
          .getOffset,
      )

      // write rows with updated schema.
      val updatedFoo = new JSONObject()
      updatedFoo.put("col1", "ccc")
      updatedFoo.put("col2", "ddd")
      val updatedJsonArr = new JSONArray()
      updatedJsonArr.put(updatedFoo)
      for (i <- 0 until 10) {
        currentOffset += updatedJsonArr.length()
        val response3 = jsonStreamWriter.append(updatedJsonArr, currentOffset)
        assertEquals(currentOffset, response3.get.getAppendResult.getOffset.getValue)
      }

      assertEquals(
        currentOffset,
        client
          .flushRows(
            FlushRowsRequest.newBuilder
              .setWriteStream(writeStream.getName)
              .setOffset(Int64Value.of(currentOffset))
              .build()
          )
          .getOffset,
      )

      // verify table data correctness
      val rowsIter = bigquery.listTableData(tableId).getValues.iterator
      // 1 row of aaa
      assertEquals("aaa", rowsIter.next.get(0).getStringValue)
      // a few rows of bbb
      for (j <- 1 to next) {
        assertEquals("bbb", rowsIter.next.get(0).getStringValue)
      }
      // 10 rows of ccc, ddd
      for (j <- next + 1 until next + 1 + 10) {
        val temp = rowsIter.next
        assertEquals("ccc", temp.get(0).getStringValue)
        assertEquals("ddd", temp.get(1).getStringValue)
      }
      assertFalse(rowsIter.hasNext)
    } finally if (jsonStreamWriter != null) jsonStreamWriter.close()
  }
}
Does not work if appending rows with new schema right after updating table
test("Appending rows with new schema right after updating table") {
  val client = BigQueryWriteClient.create()
  val bigquery = BigQueryOptions.newBuilder
    .setProjectId("some-project")
    .build
    .getService
  val DATASET = "it-test"
  val tableName =
    "SchemaUpdateTestTable" + UUID.randomUUID().toString.replace("-", "").substring(0, 5)
  val tableId = TableId.of(DATASET, tableName)
  val col1 = Field.newBuilder("col1", StandardSQLTypeName.STRING).build
  val originalSchema = Schema.of(col1)
  val tableInfo = TableInfo.newBuilder(tableId, StandardTableDefinition.of(originalSchema)).build
  bigquery.create(tableInfo)
  val parent = TableName.of(ServiceOptions.getDefaultProjectId, DATASET, tableName)
  val writeStream = client.createWriteStream(
    CreateWriteStreamRequest.newBuilder
      .setParent(parent.toString)
      .setWriteStream(WriteStream.newBuilder.setType(WriteStream.Type.BUFFERED).build)
      .build
  )
  try {
    val jsonStreamWriter =
      JsonStreamWriter.newBuilder(writeStream.getName, writeStream.getTableSchema).build
    try { // write the 1st row
      var currentOffset = 0L
      val foo = new JSONObject()
      foo.put("col1", "aaa")
      val jsonArr = new JSONArray()
      jsonArr.put(foo)
      val response = jsonStreamWriter.append(jsonArr, currentOffset)
      currentOffset += jsonArr.length() - 1

      assertEquals(0L, response.get.getAppendResult.getOffset.getValue)

      assertEquals(
        0L,
        client
          .flushRows(
            FlushRowsRequest.newBuilder
              .setWriteStream(writeStream.getName)
              .setOffset(Int64Value.of(currentOffset))
              .build()
          )
          .getOffset,
      )

      // update schema with a new column
      val col2 = Field.newBuilder("col2", StandardSQLTypeName.STRING).build
      val updatedSchema = Schema.of(ImmutableList.of(col1, col2))
      val updatedTableInfo =
        TableInfo.newBuilder(tableId, StandardTableDefinition.of(updatedSchema)).build
      val updatedTable = bigquery.update(updatedTableInfo)
      assertEquals(
        updatedSchema,
        updatedTable.getDefinition.asInstanceOf[TableDefinition].getSchema,
      )

      // write rows with updated schema.
      val updatedFoo = new JSONObject()
      updatedFoo.put("col1", "ccc")
      updatedFoo.put("col2", "ddd")
      val updatedJsonArr = new JSONArray()
      updatedJsonArr.put(updatedFoo)
      for (i <- 0 until 10) {
        currentOffset += updatedJsonArr.length()
        val response3 = jsonStreamWriter.append(updatedJsonArr, currentOffset) // Crashes here.
        assertEquals(currentOffset, response3.get.getAppendResult.getOffset.getValue)
      }

      assertEquals(
        currentOffset,
        client
          .flushRows(
            FlushRowsRequest.newBuilder
              .setWriteStream(writeStream.getName)
              .setOffset(Int64Value.of(currentOffset))
              .build()
          )
          .getOffset,
      )

      // verify table data correctness
      val rowsIter = bigquery.listTableData(tableId).getValues.iterator
      // 1 row of aaa
      assertEquals("aaa", rowsIter.next.get(0).getStringValue)
      // 10 rows of ccc, ddd
      for (_ <- 1 until 10) {
        val temp = rowsIter.next
        assertEquals("ccc", temp.get(0).getStringValue)
        assertEquals("ddd", temp.get(1).getStringValue)
      }
      assertFalse(rowsIter.hasNext)
    } finally if (jsonStreamWriter != null) jsonStreamWriter.close()
  }

Error:

java.lang.IllegalArgumentException: JSONObject has fields unknown to BigQuery: root.col2.

Describe the solution you'd like
Follows two ideas but you are best suited to take design decisions.

  1. Transparent to the caller by, e.g., JsonStreamWriter recovering this error by checking the latest TableDefinition, however this API call can be costly depending on how often schema updates occur (in our case, rare). It may be a configurable option. Works for our use-case.
  2. Delegate the responsibility to the caller, e.g., JsonStreamWriter could support changing the underlying schema: JsonStreamWriter::setTableDefinition that does something similar to this. Edit: This introduces mutability so it is not the best solution.

Describe alternatives you've considered
Attempting to recreate the WriteStream, then JsonStreamWriter but it is disruptive given our Streaming approach.

Environment details

OS type and version: Mac OS M1
Java version: Coretto 11.0.12
bigquerystorage version(s): 2.8.0
bigquery: 2.5.1

Steps to reproduce

See second code sample provided. Happy to help!

Code example

See second code sample provided. I essentially removed the logic // continue writing rows until backend acknowledges schema update from the first sample with the BUFFERED mode.

Stack trace

java.lang.IllegalArgumentException: JSONObject has fields unknown to BigQuery: root.col2.

Metadata

Metadata

Labels

api: bigquerystorageIssues related to the googleapis/java-bigquerystorage API.type: questionRequest for information or clarification. Not an issue.

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions