Search code examples
google-cloud-dataflowapache-beamgoogle-cloud-spanner

com.google.cloud.spanner.SpannerException: DEADLINE_EXCEEDED


i've been stuck with this issue for over a day and thought someone here might know the answer. Basically, as a simple test i'm trying to read data from a table and simply output the results to a log file. The table is rather large (~167 million rows). I keep getting the following error

java.lang.RuntimeException: org.apache.beam.sdk.util.UserCodeException: com.google.cloud.spanner.SpannerException: DEADLINE_EXCEEDED: io.grpc.StatusRuntimeException: DEADLINE_EXCEEDED: deadline exceeded after 119997536405ns

Followed by this :

Workflow failed. Causes: S15:SpannerIO.ReadAll/Read from Cloud Spanner/Shuffle partitions/Reshuffle/GroupByKey/Read+SpannerIO.ReadAll/Read from Cloud Spanner/Shuffle partitions/Reshuffle/GroupByKey/GroupByWindow+SpannerIO.ReadAll/Read from Cloud Spanner/Shuffle partitions/Reshuffle/ExpandIterable+SpannerIO.ReadAll/Read from Cloud Spanner/Shuffle partitions/Values/Values/Map+SpannerIO.ReadAll/Read from Cloud Spanner/Read from Partitions+ParDo(FinishProcess) failed., The job failed because a work item has failed 4 times. Look in previous log entries for the cause of each one of the 4 failures. For more information, see https://cloud.google.com/dataflow/docs/guides/common-errors. The work item was attempted on these workers: ricardocascadejob-m0771463-12021322-cpj6-harness-4nvn Root cause: Work item failed., ricardocascadejob-m0771463-12021322-cpj6-harness-c485 Root cause: Work item failed., ricardocascadejob-m0771463-12021322-cpj6-harness-kcgb Root cause: Work item failed., ricardocascadejob-m0771463-12021322-cpj6-harness-kcgb Root cause: Work item failed.

Here is the main code that is running on dataflow

PipelineOptionsFactory.register(RicardoPriceLoadOptions.class);
    RicardoPriceLoadOptions opts = PipelineOptionsFactory.fromArgs(args)
        .withValidation().as(RicardoPriceLoadOptions.class);
    Pipeline pipeline = Pipeline.create(opts);

    SpannerConfig spannerConfig =
        SpannerConfig.create()
            .withProjectId(opts.getGcpProjectId())
            .withInstanceId(opts.getSpannerInstanceId())
            .withDatabaseId(opts.getSpannerDatabaseId());

    PCollectionView<Transaction> tx =
        pipeline.apply(SpannerIO.createTransaction().withSpannerConfig(spannerConfig));

    //Fetch All Price Events
    PCollection<Struct> pepLIst = pipeline.apply(Create.of(ReadOperation.create()
        .withColumns("DisabledFlag", "PriceEventPriceableId", "PriceableItemId",
            "OutgoingType", "PriceOriginal", "PriceIntermediate", "PriceRetail", "SaleValue", "SaleValueIntermediate",
            "SchedulableFlag", "SendToSiteFlag", "StartTime", "EndTime", "DisplayCode")
        .withTable("ABC")))
        .apply(SpannerIO.readAll().withTransaction(tx).withSpannerConfig(spannerConfig));

    pepLIst.apply(ParDo.of(new FinishProcessFn()));

The last DoFn is a simple one that simply logs the spanner row.

public class FinishProcessFn extends DoFn<Struct, Void> {

@ProcessElement
public void process(@Element Struct elem) {
    log.debug(elem.toString());
}

}

I have tried google suggestions as shown here

Common Errors

The code seems simple enough, but i'm not sure why i keep getting the error above. Any input or help is appreciated.

Thanks!

Here is the table schema

    CREATE TABLE ABC (
    PriceEventPriceableId INT64 NOT NULL,
    Created TIMESTAMP NOT NULL,
    CreatedBy STRING(MAX) NOT NULL,
    DisabledFlag STRING(MAX) NOT NULL,
    DisplayCode STRING(MAX),
    EndTime TIMESTAMP,
    ErrorCode INT64,
    EstablishmentOverrideFlag STRING(MAX),
    LastUpdated TIMESTAMP NOT NULL,
    LastUpdatedBy STRING(MAX) NOT NULL,
    NotApplicableFlag STRING(MAX),
    OnSaleRatioOverrideFlag STRING(MAX),
    OutgoingType INT64,
    OwnedValue STRING(MAX),
    ParentPriceableItemId INT64,
    PriceableItemId INT64 NOT NULL,
    PriceEventId INT64 NOT NULL,
    PriceIntermediate STRING(MAX),
    PriceOriginal STRING(MAX),
    PriceRetail STRING(MAX),
    ReasonUnschedulable STRING(MAX),
    SaleValue STRING(MAX),
    SaleValueIntermediate STRING(MAX),
    SavingsMaxOverrideFlag STRING(MAX),
    SchedulableFlag STRING(MAX),
    SendToSiteFlag STRING(MAX),
    SentToSiteDate DATE,
    StartTime TIMESTAMP,
    StoredValue STRING(MAX),
    TenPercentOverrideFlag STRING(MAX),
    Timestamp TIMESTAMP NOT NULL OPTIONS (allow_commit_timestamp=true),
) PRIMARY KEY (PriceEventPriceableId)

Solution

  • So just as an update what i did to fix the issue was to use EXACTLY the same version of beam and google spanner drivers as the Spanner Export to Avro template available in gpc dataflow and my code started to magically work. I made no code changes.