Search code examples
scalaamazon-dynamodbspark-streamingamazon-kinesisamazon-dynamodb-streams

Spark Streaming Kinesis Integration: Error while initializing LeaseCoordinator in Worker


I have been having some issues when running a simple vanilla spark streaming with kinesis application in scala. I have followed the basic guidances in some tutorials as Snowplow and WordCountASL.

Yet i still cannot make it work because of this Kinesis Worker Error:

16/11/15 09:00:27 ERROR Worker: Caught exception when initializing LeaseCoordinator
com.amazonaws.services.kinesis.leases.exceptions.DependencyException: com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain
    at com.amazonaws.services.kinesis.leases.impl.LeaseManager.createLeaseTableIfNotExists(LeaseManager.java:125)
    at com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibLeaseCoordinator.initialize(KinesisClientLibLeaseCoordinator.java:173)
    at com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.initialize(Worker.java:374)
    at com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.run(Worker.java:318)
    at org.apache.spark.streaming.kinesis.KinesisReceiver$$anon$1.run(KinesisReceiver.scala:174)
Caused by: com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain
    at com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117)
    at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.invoke(AmazonDynamoDBClient.java:1758)
    at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.createTable(AmazonDynamoDBClient.java:822)
    at com.amazonaws.services.kinesis.leases.impl.LeaseManager.createLeaseTableIfNotExists(LeaseManager.java:118)
    ... 4 more

Here is my code sample:

import com.amazonaws.auth.BasicAWSCredentials
import com.amazonaws.internal.StaticCredentialsProvider
import com.amazonaws.regions.RegionUtils
import com.amazonaws.services.kinesis.AmazonKinesisClient
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kinesis.KinesisUtils
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Milliseconds, StreamingContext}

/**
  * Created by franco on 11/11/16.
  */
object TestApp {
  // === Configurations for Kinesis streams ===
  val awsAccessKeyId = "XXXXXX"
  val awsSecretKey = "XXXXXXX"
  val kinesisStreamName = "MyStream"
  val kinesisEndpointUrl = "https://kinesis.region.amazonaws.com" //example "https://kinesis.us-west-2.amazonaws.com"
  val appName = "MyAppName"

  def main(args: Array[String]): Unit = {

    val credentials = new BasicAWSCredentials(awsAccessKeyId,awsSecretKey)

    val provider = new StaticCredentialsProvider(credentials)

    val kinesisClient = new AmazonKinesisClient(provider)
    kinesisClient.setEndpoint(kinesisEndpointUrl)

    val shards = kinesisClient.describeStream(kinesisStreamName).getStreamDescription.getShards.size()

    val streams = shards

    val batchInterval = Milliseconds(2000)

    val kinesisCheckpointInterval = batchInterval

    val regionName = RegionUtils.getRegionByEndpoint(kinesisEndpointUrl).getName

    val cores : Int = Runtime.getRuntime.availableProcessors()
    println("Available Cores : " + cores.toString)
    val config = new SparkConf().setAppName("MyAppName").setMaster("local[" + (cores / 2 ) + "]" )
    val ssc = new StreamingContext(config, batchInterval)

    // Create the Kinesis DStreams
    val kinesisStreams = (0 until streams).map { i =>
      KinesisUtils.createStream(ssc, appName, kinesisStreamName, kinesisEndpointUrl, regionName,
        InitialPositionInStream.LATEST, kinesisCheckpointInterval * 2, StorageLevel.MEMORY_AND_DISK_2)
    }

    ssc.union(kinesisStreams).map(bytes => new String(bytes)).print()
    // Start the streaming context and await termination
    ssc.start()
    ssc.awaitTermination()
  }


}

And my IAM policy looks like this:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "Stmt123",
            "Effect": "Allow",
            "Action": [
                "kinesis:DescribeStream",
                "kinesis:GetShardIterator",
                "kinesis:GetRecords"
            ],
            "Resource": [
                "arn:aws:kinesis:region:account:stream/name"
            ]
        },
        {
            "Sid": "Stmt456",
            "Effect": "Allow",
            "Action": [
                "dynamodb:CreateTable",
                "dynamodb:DeleteItem",
                "dynamodb:DescribeTable",
                "dynamodb:GetItem",
                "dynamodb:PutItem",
                "dynamodb:Scan",
                "dynamodb:UpdateItem"
            ],
            "Resource": [
                "arn:aws:dynamodb:region:account:table/name"
            ]
        },
        {
            "Sid": "Stmt789",
            "Effect": "Allow",
            "Action": [
                "cloudwatch:PutMetricData"
            ],
            "Resource": [
                "*"
            ]
        }
    ]
}

I cannot get what is wrong with this app. Any guidance on this subject will be appreciated.


Solution

  • There are other constructors for the DStreams that allow you to pass in the AWS Access Key and Secret Key.

    The 1st and 5th constructors in the link below for example will allow you to pass them in the constructor (and it should be passed through your system) vs having to set a System Property.

    KinesisUtil Constructors