Search code examples
azure-table-storageazure-eventhubazure-stream-analytics

azure stream analytics query too complex? not giving proper output and now error. what to do next


Query I use below. I am using an event hub to ingest messages in json format from a push api. Json msg has some arrays in it. Now I am getting an error like "We cannot connect to Event Hub partition [0] because the maximum number of allowed receivers per partition in a consumer group has been reached. Ensure that other Stream Analytics jobs or Service Bus Explorer are not using the same consumer group. The following information may be helpful in identifying the connected receivers: Exceeded the maximum number of allowed receivers per partition in a consumer group which is 5".

I want the output to go to Azure Table Storage which works as long as the query works properly.

I got it to work before when it was not that complex but now it seems I cannot join all the needed results back to the main serviceProblem output. and sometimes the query runs successfully but some of the output are blank although the should not be. should i split the query over more ASA jobs or should I push some of the outputs through another event hub to avoid the error? thanks for your help.

with OrigQ as (
        select
        *
        from tmfsnowgstm Partition BY PartitionId --INTO 4
        where substring(tmfsnowgstm.eventType,1,14) = 'serviceProblem'
    )
    ,ReaderQuery as (
        select
            PartitionId,
            event.serviceProblem.*
        from OrigQ Partition BY PartitionId --INTO 4
    )
    ,comment as ( 
        SELECT
            'comment' as partitionKey,
            concat(rq.id ,'-',comment.ArrayValue.date,'-',comment.ArrayValue.[@type]) as rowKey,
            rq.PartitionId,
            rq.id as ServiceProblemId,
            comment.ArrayValue.date as date,
            comment.ArrayValue.system as system,
            comment.ArrayValue.author as author,
            comment.ArrayValue.[@type] as type,
            comment.ArrayValue.text as text
        from ReaderQuery rq Partition BY PartitionId --INTO 4
        CROSS APPLY GetArrayElements(rq.comment) AS comment
    )
    ,trackingRecord as (
        select 
            rq.PartitionId,
            rq.id,
            trackingRecord.ArrayValue.extensionInfo        
        from ReaderQuery rq Partition BY PartitionId --INTO 4
        CROSS APPLY GetArrayElements(rq.trackingRecord) AS trackingRecord
    ),
    trackingRecordDtls as (
        SELECT
            trackingRecord.PartitionId,
            trackingRecord.id as id,
            extensionInfo.ArrayValue.name as name,
            extensionInfo.ArrayValue.value as value
        from trackingRecord Partition BY PartitionId --INTO 4
        CROSS APPLY GetArrayElements(extensionInfo) AS extensionInfo
    ),
    relatedParty as (
        SELECT
            rq.PartitionId,
            rq.id as id,
            relatedParty.ArrayValue.role as name,
            relatedParty.ArrayValue.name as value
        from ReaderQuery rq Partition BY PartitionId --INTO 4
        CROSS APPLY GetArrayElements(rq.relatedParty) AS relatedParty
    )
    ,relatedPartyDtls as (
        select
            rq.PartitionId,
            rq.id as id,
            rqp.value as requestedBy,
            cal.value as caller,
            atg.value as assignedToGroup,
            onob.value as ownerNocOpenedBy,
            onc.value as ownerNoc,
            ast.value as assignedTo
        from ReaderQuery rq Partition BY PartitionId --INTO 4
        LEFT JOIN relatedParty rqp ON rqp.name = 'requestedBy' and rqp.id = rq.id AND DATEDIFF(minute,rq,rqp) BETWEEN 0 AND 0 and rq.PartitionId = rqp.PartitionId
        LEFT JOIN relatedParty cal ON cal.name = 'caller' and cal.id = rq.id AND DATEDIFF(minute,rq,cal) BETWEEN 0 AND 0 and rq.PartitionId = cal.PartitionId
        LEFT JOIN relatedParty atg ON atg.name = 'assignedToGroup' and atg.id = rq.id AND DATEDIFF(minute,rq,atg) BETWEEN 0 AND 0 and rq.PartitionId = atg.PartitionId
        LEFT JOIN relatedParty onob ON onob.name = 'ownerNocOpenedBy,openedBy' and onob.id = rq.id AND DATEDIFF(minute,rq,onob) BETWEEN 0 AND 0 and rq.PartitionId = onob.PartitionId
        LEFT JOIN relatedParty ast ON ast.name = 'assignedTo' and ast.id = rq.id AND DATEDIFF(minute,rq,ast) BETWEEN 0 AND 0 and rq.PartitionId = ast.PartitionId
        LEFT JOIN relatedParty onc ON onc.name = 'ownerNoc' and onc.id = rq.id AND DATEDIFF(minute,rq,onc) BETWEEN 0 AND 0 and rq.PartitionId = onc.PartitionId
    )
    ,extensionInfo as (
       SELECT
            rq.PartitionId,
            rq.id as id,
            extensionInfo.ArrayValue.name as name,
            extensionInfo.ArrayValue.value as value
        from ReaderQuery rq Partition BY PartitionId --INTO 4
        CROSS APPLY GetArrayElements(rq.extensionInfo) AS extensionInfo
    )
    ,extensionInfoDtls3 as (
        select
            rq.PartitionId,
            rq.id as id,
            rq.requestedBy,
            rq.caller,
            rq.assignedToGroup,
            rq.ownerNocOpenedBy,
            rq.ownerNoc,
            rq.assignedTo,
            cc.value as closureCode,
            ooi.value as originOfIssue,
            rec.value as resolutionCode,
            rc.value as rootCause,
            src.value as subRootCause
        from relatedPartyDtls rq Partition BY PartitionId --INTO 4
        LEFT JOIN trackingRecordDtls cc ON cc.name = 'closureCode' and cc.id = rq.id AND DATEDIFF(minute,rq,cc) BETWEEN 0 AND 0 and rq.PartitionId = cc.PartitionId
        LEFT JOIN trackingRecordDtls ooi ON ooi.name = 'originOfIssue' and ooi.id = rq.id AND DATEDIFF(minute,rq,ooi) BETWEEN 0 AND 0 and rq.PartitionId = ooi.PartitionId
        LEFT JOIN trackingRecordDtls rec ON rec.name = 'resolutionCode' and rec.id = rq.id AND DATEDIFF(minute,rq,rec) BETWEEN 0 AND 0 and rq.PartitionId = rec.PartitionId
        LEFT JOIN trackingRecordDtls rc ON rc.name = 'rootCause' and rc.id = rq.id AND DATEDIFF(minute,rq,rc) BETWEEN 0 AND 0 and rq.PartitionId = rc.PartitionId
        LEFT JOIN trackingRecordDtls src ON src.name = 'subRootCause' and src.id = rq.id AND DATEDIFF(minute,rq,src) BETWEEN 0 AND 0 and rq.PartitionId = src.PartitionId
    )
    ,extensionInfoDtls1 as (
            select
            rq.PartitionId,
            rq.id as id,
            rq.closureCode,
            rq.originOfIssue,
            rq.resolutionCode,
            rq.rootCause,
            rq.subRootCause,
            rq.requestedBy,
            rq.caller,
            rq.assignedToGroup,
            rq.ownerNocOpenedBy,
            rq.ownerNoc,
            rq.assignedTo,
            wrif.value as weatherRelatedIssueFlag,
            ncf.value as notifyCustomerFlag,
            mdu.value as monitoringDuration,
            cot.value as contactType,
            cpr.value as calculatedPriority
        from extensionInfoDtls3 rq Partition BY PartitionId --INTO 4
        LEFT JOIN extensionInfo wrif ON wrif.name = 'weatherRelatedIssueFlag' and wrif.id = rq.id AND DATEDIFF(minute,rq,wrif) BETWEEN 0 AND 0 and rq.PartitionId = wrif.PartitionId
        LEFT JOIN extensionInfo ncf ON ncf.name = 'notifyCustomerFlag' and ncf.id = rq.id AND DATEDIFF(minute,rq,ncf) BETWEEN 0 AND 0 and rq.PartitionId = ncf.PartitionId
        LEFT JOIN extensionInfo mdu ON mdu.name = 'monitoringDuration' and mdu.id = rq.id AND DATEDIFF(minute,rq,mdu) BETWEEN 0 AND 0 and rq.PartitionId = mdu.PartitionId
        LEFT JOIN extensionInfo cpr ON cpr.name = 'calculatedPriority' and cpr.id = rq.id AND DATEDIFF(minute,rq,cpr) BETWEEN 0 AND 0  and rq.PartitionId = cpr.PartitionId
        LEFT JOIN extensionInfo cot ON cot.name = 'contactType' and cot.id = rq.id AND DATEDIFF(minute,rq,cot) BETWEEN 0 AND 0 and rq.PartitionId = cot.PartitionId
    )
    ,affectedResource as (
    SELECT
        rq.PartitionId,
        'affectedResource' as partitionKey,
        concat(rq.id,'-',case when len(affectedResource.ArrayValue.id) < 3 then affectedResource.ArrayValue.role else affectedResource.ArrayValue.id end) as rowKey,
        'serviceProblem' as sourceType,
        rq.id as sourceTypeId,
        affectedResource.ArrayValue.id as name,
        affectedResource.ArrayValue.name as value,
        affectedResource.ArrayValue.role as role,
        affectedResource.ArrayValue.[@referredType] as type
    from ReaderQuery rq Partition BY PartitionId --INTO 4
    CROSS APPLY GetArrayElements(rq.affectedResource) AS affectedResource
    )
    ,affectedService as (
    SELECT
        rq.PartitionId,
        'affectedService' as partitionKey,
        concat(rq.id,'-',affectedService.ArrayValue.id) as rowKey,
        'serviceProblem' as sourceType,
        rq.id as sourceTypeId,
        affectedService.ArrayValue.id as name,
        affectedService.ArrayValue.name as value,
        'affectedService' as role,
        'Service' as type
    from ReaderQuery rq Partition BY PartitionId --INTO 4
    CROSS APPLY GetArrayElements(rq.affectedService) AS affectedService
    )
    ,relatedObject as (
        SELECT
            rq.PartitionId,
            rq.id as id,
            relatedObject.ArrayValue.[@referredType] as name,
            relatedObject.ArrayValue.name  as value
        from tmfsnowgstm rq Partition BY PartitionId --INTO 2
        CROSS APPLY GetArrayElements(rq.relatedObject) AS relatedObject
    )
    ,relatedObjectDtls as (
        select
            'relatedObject' as partitionKey,
            rq.id as rowKey,
            rq.PartitionId,
            rq.id as id,
            prd.value as product,
            chr.value as changeRequest,
            ser.value as service,
            res.value as resource,
            rccr.value as rootCauseChangeRequest
        from readerQuery rq Partition BY PartitionId --INTO 4
        LEFT JOIN relatedObject prd ON prd.name = 'product' and prd.id = rq.id AND DATEDIFF(minute,rq,prd) BETWEEN 0 AND 0 and rq.PartitionId = prd.PartitionId
        LEFT JOIN relatedObject chr ON chr.name = 'changeRequest' and chr.id = rq.id AND DATEDIFF(minute,rq,chr) BETWEEN 0 AND 0 and rq.PartitionId = chr.PartitionId
        LEFT JOIN relatedObject ser ON ser.name = 'service' and ser.id = rq.id AND DATEDIFF(minute,rq,ser) BETWEEN 0 AND 0 and rq.PartitionId = ser.PartitionId
        LEFT JOIN relatedObject res ON res.name = 'resource' and res.id = rq.id AND DATEDIFF(minute,rq,res) BETWEEN 0 AND 0 and rq.PartitionId = res.PartitionId
        LEFT JOIN relatedObject rccr ON rccr.name = 'rootCauseChangeRequest' and rccr.id = rq.id AND DATEDIFF(minute,rq,rccr) BETWEEN 0 AND 0 and rq.PartitionId = rccr.PartitionId
    )
    , serviceProblem as (
        Select
        rq.PartitionId,
        'serviceProblem' as partitionKey,
        rq.id as rowKey,
        rq.id,
        rq.name,
        rq.statusChangeReason,
        rq.reason,
        rq.resolutionDate,
        rq.responsibleParty,
        rq.description,
        rq.underlyingProblem,
        rq.statusChangeDate,
        rq.problemEscalation,
        rq.associatedSLAViolation,
        rq.timeChanged,
        rq.severity,
        rq.impactPatterns,
        rq.timeRaised,
        rq.underlyingAlarm,
        rq.rootCauseService,
        rq.relatedEvent,
        rq.originatorParty,
        rq.priority,
        rq.firstAlert,
        rq.originatingSystem,
        --rq.associatedTroubleTicket,
        rq.affectedNumberOfServices,
        rq.impactImportanceFactor,
        rq.category,
        --rq.parentProblem,
        rq.affectedLocation,
        rq.status,
        rq.rootCauseResource,
        rq.ssociatedTroubleTicket,
        case when GetArrayLength(rq.associatedTroubleTicket) < 1 then '' else associatedTroubleTicket.ArrayValue.Id end as associatedTroubleTicket,
        parentProblem.ArrayValue.Id as parentProblemId,
        parentProblem.ArrayValue.correlationId as correlationId
        --,rpdtls.*
    from ReaderQuery rq Partition BY PartitionId --INTO 4
    --left join relatedPartyDtls rpdtls on rq.id = rpdtls.id AND DATEDIFF(minute,rq,rpdtls) BETWEEN 0 AND 0 and rq.PartitionId = rpdtls.PartitionId
    CROSS APPLY GetArrayElements(rq.associatedTroubleTicket) AS associatedTroubleTicket
    CROSS APPLY GetArrayElements(rq.parentProblem) AS parentProblem
    )
    
    select
        'serviceProblem' as partitionKey,
        rq.id as rowKey,
        rq.id,
        rq.name,
        rq.statusChangeReason,
        rq.reason,
        rq.resolutionDate,
        rq.responsibleParty,
        rq.description,
        rq.underlyingProblem,
        rq.statusChangeDate,
        rq.problemEscalation,
        rq.associatedSLAViolation,
        rq.timeChanged,
        rq.severity,
        rq.impactPatterns,
        rq.timeRaised,
        rq.underlyingAlarm,
        rq.rootCauseService,
        rq.relatedEvent,
        rq.originatorParty,
        rq.priority,
        rq.firstAlert,
        rq.originatingSystem,
        rq.affectedNumberOfServices,
        rq.impactImportanceFactor,
        rq.category,
        rq.affectedLocation,
        rq.status,
        rq.rootCauseResource,
        rq.associatedTroubleTicket,
        resnt.text resolutionNote,
        svcdesc.text serviceProblemDescription,
        rq.parentProblemId,
        rq.correlationId
    --    eidtls3.*,
    --    rpdtls.*
    into tsserviceProblem
    from serviceProblem rq Partition BY PartitionId
    left join comment resnt on rq.id = resnt.serviceProblemId AND DATEDIFF(minute,rq,resnt) BETWEEN 0 AND 0 and rq.PartitionId = resnt.PartitionId and resnt.type = 'resolutionNote'
    left join comment svcdesc on rq.id = svcdesc.serviceProblemId AND DATEDIFF(minute,rq,svcdesc) BETWEEN 0 AND 0 and rq.PartitionId = svcdesc.PartitionId and svcdesc.type = 'serviceProblemDescription'
    left join relatedObjectDtls rpdtls on rq.id = rpdtls.id AND DATEDIFF(minute,rq,rpdtls) BETWEEN 0 AND 0 and rq.PartitionId = rpdtls.PartitionId
    
    SELECT rq.* into tsextensionInfo from extensionInfoDtls1 rq Partition BY PartitionId
    SELECT * into tscomment from comment PARTITION BY PartitionId where type <> 'serviceProblemDescription' and type <> 'resolutionNote'
    SELECT * into tsrelatedInfo from affectedResource PARTITION BY PartitionId union SELECT* from affectedService PARTITION BY PartitionId
    select * into veritaslake from OrigQ PARTITION BY PartitionId

Solution

  • As from error message, Event Hub allows only 5 receiver connections per consumer group per partition. With a complex query with operators like Union, self-join etc. the job is likely to create multiple input receivers. You could follow this troubleshooting doc maximum number of allowed receivers per partition to reconstruct your query and resolve this error.