Search code examples
sessiongoogle-bigquerytransactionsbatch-insert

Google BigQuery InsertAll with Transaction?


I am new to BigQuery in general and I'm attempting to insert multiple rows into a bigquery table while also modifying another table in a single transaction.

I'm using the BigQuery java client in kotlin, but I'm having trouble setting the session id for the insertAll-request.

I'm starting this by creating a transaction and creating a new session id

val queryConfig = QueryJobConfiguration.newBuilder("BEGIN TRANSACTION;")
    .setCreateSession(true)
    .build()

// runQuery just creates a job and waits for execution to end
val sessionId = runQuery(queryConfig).getStatistics<JobStatistics>().sessionInfo.sessionId

I am then setting the session id for the first request, updating the other table

val queryConfig2 = QueryJobConfiguration.newBuilder("SOME_SQL_STATEMENT")
    .setCreateSession(false)
    .setConnectionProperties(listOf(ConnectionProperty.of("session_id", sessionId)))
    .setNamedParameters(values)
    .setUseLegacySql(false)
    .build()
runQuery(queryConfig2)

At last I am sending the InsertAllRequest, but I am unsure how to set the session id here?

bigquery.insertAll(
    InsertAllRequest.newBuilder(tableId)
        .setRows(rowContents)
        .build()
)

All of this is currently wrapped within this little routine:

    suspend fun runRoutine(source: String, : String, name: String, records: List<RowToInsert>) 
        val context = "context"
        val id = "id"
        val latestRefreshAt = "latestRefreshAt"
        transaction { configureSession ->
            query(
                """
                    UPDATE `${googleTableId1.toSqlTableName()}` 
                    SET $latestRefreshAt = CURRENT_TIMESTAMP()
                    WHERE $context = @$context AND $id = @$id
                """,
                mapOf(
                    context to QueryParameterValue.string(source),
                    id to QueryParameterValue.string(name),
                ),
                configureSession
            )

            tableInsertRows(
                tableId = googleTableId2,
                rowContents = records,
            )
        }
    }

    suspend fun transaction(block: suspend BigQuery.(QueryJobConfiguration.Builder.() -> Unit) -> Unit) {
        val sessionId = beginTransaction()
        val prepareQuery: QueryJobConfiguration.Builder.() -> Unit = {
            setCreateSession(false)
            setConnectionProperties(
                listOf(ConnectionProperty.of("session_id", sessionId))
            )
        }

        try {
            bigquery.block(prepareQuery)
            commitTransaction(prepareQuery)
        } catch (throwable: Throwable) {
            rollbackTransaction(prepareQuery)
            closeSession(prepareQuery)
            throw throwable
        }
    }

    private suspend fun beginTransaction(): String {
        val queryConfig = QueryJobConfiguration.newBuilder("BEGIN TRANSACTION;")
            .setCreateSession(true)
            .build()

        return runQuery(queryConfig).getStatistics<JobStatistics>().sessionInfo.sessionId
    }

    private suspend fun commitTransaction(prepareQuery: QueryJobConfiguration.Builder.() -> Unit) {
        val queryConfig = QueryJobConfiguration.newBuilder("COMMIT TRANSACTION;")
            .apply(prepareQuery)
            .build()

        runQuery(queryConfig)
    }

    private suspend fun rollbackTransaction(prepareQuery: QueryJobConfiguration.Builder.() -> Unit) {
        val queryConfig = QueryJobConfiguration.newBuilder("ROLLBACK TRANSACTION;")
            .apply(prepareQuery)
            .build()

        runQuery(queryConfig)
    }

    private suspend fun closeSession(prepareQuery: QueryJobConfiguration.Builder.() -> Unit) {
        val queryConfig = QueryJobConfiguration.newBuilder("CALL BQ.ABORT_SESSION();")
            .apply(prepareQuery)
            .build()

        runQuery(queryConfig)
    }

    private suspend fun runQuery(queryJobConfiguration: QueryJobConfiguration): Job {
        val jobId = JobId.newBuilder().setProject(projectId.value).build()
        val queryJob =
            bigquery.create(JobInfo.newBuilder(queryJobConfiguration).setJobId(jobId).build())
                .waitFor()

        if (queryJob == null) {
            throw RuntimeException("Job no longer exists")
        } else if (queryJob.getStatus().getError() != null) {
            throw RuntimeException(queryJob.getStatus().getError().toString())
        }

        return queryJob
    }

    suspend fun query(
        sql: String,
        values: Map<String, QueryParameterValue>,
        configure: QueryJobConfiguration.Builder.() -> Unit = {},
    ): TableResult {
        val queryConfig = QueryJobConfiguration.newBuilder(sql)
            .apply(configure)
            .setNamedParameters(values)
            .setUseLegacySql(false)
            .build()

        return runQuery(queryConfig).getQueryResults()
    }


    suspend fun tableInsertRows(
        tableId: TableId,
        rowContents: Iterable<InsertAllRequest.RowToInsert>,
    ) {
        bigquery.insertAll(
            InsertAllRequest.newBuilder(tableId)
                .setRows(rowContents)
                .build()
        )
    }

    private val bigquery by lazy {
        BigQueryOptions.getDefaultInstance().getService()
    };

So yeah, I'm wondering how to set a sessionId for an insertAll request?


Solution

  • As far as I know you can’t directly set a sessionID for an insertAll request using the BigQuery Java client. insertAll is designed for streaming inserts and doesn't participate in explicit transactions the same way queries do.

    I think a parameterized INSERT statement within your transaction would be a good alternative. This way, you guarantee atomicity of your operations, so either all changes are applied, or none are. This is a better approach for managing multiple changes within a single transaction in BigQuery.