Search code examples
apache-sparkapache-kafkaspark-structured-streaming

Could anyone explain what all these values mean in Kafka/Spark?


I'm doing spark structured streaming/kafka, and these are getting logged in the console but I'm not sure what all these mean.

2019-08-04 15:02:08 INFO  MicroBatchExecution:54 - Streaming query made progress: {
  "id" : "81474cea-e15d-425b-a69e-c3fe7b8f32d0",
  "runId" : "9060d86d-344d-45bb-ac20-68318e704e30",
  "name" : null,
  "timestamp" : "2019-08-04T22:02:08.870Z",
  "batchId" : 130,
  "numInputRows" : 48,
  "inputRowsPerSecond" : 3692.3076923076924,
  "processedRowsPerSecond" : 615.3846153846154,
  "durationMs" : {
    "addBatch" : 33,
    "getBatch" : 2,
    "getOffset" : 1,
    "queryPlanning" : 3,
    "triggerExecution" : 78,
    "walCommit" : 38
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[service-calls]]",
    "startOffset" : {
      "service-calls" : {
        "0" : 17141
      }
    },
    "endOffset" : {
      "service-calls" : {
        "0" : 17189
      }
    },
    "numInputRows" : 48,
    "inputRowsPerSecond" : 3692.3076923076924,
    "processedRowsPerSecond" : 615.3846153846154
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ConsoleSinkProvider@2f1cf834"
  }
}
2019-08-04 15:02:08 INFO  MicroBatchExecution:54 - Streaming query made progress: {
  "id" : "81474cea-e15d-425b-a69e-c3fe7b8f32d0",
  "runId" : "9060d86d-344d-45bb-ac20-68318e704e30",
  "name" : null,
  "timestamp" : "2019-08-04T22:02:08.983Z",
  "batchId" : 131,
  "numInputRows" : 0,
  "inputRowsPerSecond" : 0.0,
  "durationMs" : {
    "getOffset" : 0,
    "triggerExecution" : 0
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[service-calls]]",
    "startOffset" : {
      "service-calls" : {
        "0" : 17189
      }
    },
    "endOffset" : {
      "service-calls" : {
        "0" : 17189
      }
    },
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ConsoleSinkProvider@2f1cf834"
  }
}

I guess the main ones that I'm curious about are numInputRows, inputRowsPerSecond, startOffset and endOffset.

I also have this option added option("maxOffsetsPerTrigger", 2000) to receive 2000 offsets per trigger, but it seems I can't find where this value is visible. Am I really receiving 2000 offsets? If not, how do I increase the number of brokers to receive/process more messages?

I'm also using standalone mode (local[2]).


Solution

  • The role of ProgressReporter is to provide an interface that, once implemented, can be freely used to report the statistics about the execution of the streaming query. One of the implementation is org.apache.spark.sql.execution.streaming.MicroBatchExecution.

    Everything begins when the streaming query trigger (processing or event time) is executed. The first thing the trigger does is the call to startTrigger method of ProgressReporter. This method prepares the reporter to accumulate the statistics for the just started execution.The reporter registers the statistics about the execution of several different steps.

    Next step is the data processing where also some statistics are gathered by the reporter.

    After adding these statistics , the ProgressReporter calls finishTrigger(hasNewData: Boolean). This method finalizes the trigger execution and creates the objects holding the execution statistics that are put to the progressBuffer = new mutable.Queue[StreamingQueryProgress]().

    Later the client can retrieve the updates (or the last one) directly from there, through the public accessor methods like lastProgress() or recentProgress()

    Regarding the output :

    Information about progress made in the execution of a StreamingQuery during a trigger. Each event relates to processing done for a single trigger of the streaming query. Events are emitted even when no new data is available to be processed.

    numInputRows : The aggregate (across all sources) number of records processed in a trigger.
    
    inputRowsPerSecond : The aggregate (across all sources) rate of data arriving.
    
    processedRowsPerSecond :
    The aggregate (across all sources) rate at which Spark is processing data.
    

    These self describing fields provide key indicators about the job performance

    Now lets come to the Sources Part

     "sources" : [ {
         "description" : "KafkaSource[Subscribe[service-calls]]",
         "startOffset" : {
           "service-calls" : {
             "0" : 17189
           }
         },
         "endOffset" : {
           "service-calls" : {
             "0" : 17189
           }
         },
         "numInputRows" : 0,
         "inputRowsPerSecond" : 0.0   } ]
    

    Its the Information about progress made for a source in the execution of a StreamingQuery during a trigger.

    startOffset The starting offset for data being read. 
    
    endOffset The ending offset for data being read.
    
    numInputRows The number of records read from this source.
    
    inputRowsPerSecond The rate at which data is arriving from this source. 
    
    processedRowsPerSecond The rate at which data from this source is being processed by Spark.
    

    Last Part

    As you can see here:

    Use maxOffsetsPerTrigger option to limit the number of records to fetch per trigger.The specified total number of offsets will be proportionally split across topicPartitions of different volume.

    This means that for each trigger or fetch process Kafka will get 2000 records. However if you check the logs You are getting 48 rows in batch 130 endOffset - startOffset = 17189 - 17141 = 48.

    0 for batch 131.

    maxOffsetsPerTrigger is a configuration and is not returned in the StreamingQueryProgress , hence you cannot find it.

    Finally , I dont think increasing the number of brokers will help you at this point , if you really do not have enough messages to consume.