Search code examples
databricksazure-databricks

databricks autoloader not updating table immediately


I have a simple autoloader job which looks like this

df_dwu_limit = spark.readStream.format("cloudFiles") \
    .option("cloudFiles.format", "JSON") \
    .schema(schemaFromJson) \
    .load("abfss://[email protected]/synapse-usage/")\
    .writeStream \
    .format("delta")\
    .option("checkpointLocation", "abfss://[email protected]/checkpoint_synapse_usage_api_landing/") \
    .trigger(availableNow=True)\
    .toTable("platform_dnu.synapse_usage_api_landing")

very next line i am using delta table "" to further processing.

i also have count(*) query before and after auto loader. count doesn't change. Although i can see in auto loader profile, record has been written.

If I wait for 1 min or so, and run count(*) query, i can see updated record. how to solve this issue?

o/p of auto loader for one particular session

{
  "id" : "cb9a28b4-c5b4-4865-bc65-b3ca5efd2537",
  "runId" : "64c2afd9-ad69-4e9a-97bf-d6fa2794931a",
  "name" : null,
  "timestamp" : "2022-12-03T04:44:17.591Z",
  "batchId" : 7,
  "numInputRows" : 27,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 0.7879760688749453,
  "durationMs" : {
    "addBatch" : 3005,
    "commitOffsets" : 146,
    "getBatch" : 12,
    "latestOffset" : 30380,
    "queryPlanning" : 61,
    "triggerExecution" : 34259,
    "walCommit" : 222
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "CloudFilesSource[abfss://[email protected]/synapse-usage/]",
    "startOffset" : {
      "seqNum" : 2534,
      "sourceVersion" : 1,
      "lastBackfillStartTimeMs" : 1669823987701,
      "lastBackfillFinishTimeMs" : 1669823991340
    },
    "endOffset" : {
      "seqNum" : 2562,
      "sourceVersion" : 1,
      "lastBackfillStartTimeMs" : 1669823987701,
      "lastBackfillFinishTimeMs" : 1669823991340
    },
    "latestOffset" : null,
    "numInputRows" : 27,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 0.7879760688749453,
    "metrics" : {
      "numBytesOutstanding" : "0",
      "numFilesOutstanding" : "0"
    }
  } ],
  "sink" : {
    "description" : "DeltaSink[abfss://[email protected]/delta/synapse_usage_api_landing]",
    "numOutputRows" : -1
  }}

DDL for Delta Table.

enter image description here


Solution

  • placing following code after autoloader solves the issue.

    df_dwu_limit.awaitTermination()