I have been having some issues when running a simple vanilla spark streaming with kinesis application in scala. I have followed the basic guidances in some tutorials as Snowplow and WordCountASL.
Yet i still cannot make it work because of this Kinesis Worker Error:
16/11/15 09:00:27 ERROR Worker: Caught exception when initializing LeaseCoordinator
com.amazonaws.services.kinesis.leases.exceptions.DependencyException: com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain
at com.amazonaws.services.kinesis.leases.impl.LeaseManager.createLeaseTableIfNotExists(LeaseManager.java:125)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibLeaseCoordinator.initialize(KinesisClientLibLeaseCoordinator.java:173)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.initialize(Worker.java:374)
at com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.run(Worker.java:318)
at org.apache.spark.streaming.kinesis.KinesisReceiver$$anon$1.run(KinesisReceiver.scala:174)
Caused by: com.amazonaws.AmazonClientException: Unable to load AWS credentials from any provider in the chain
at com.amazonaws.auth.AWSCredentialsProviderChain.getCredentials(AWSCredentialsProviderChain.java:117)
at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.invoke(AmazonDynamoDBClient.java:1758)
at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.createTable(AmazonDynamoDBClient.java:822)
at com.amazonaws.services.kinesis.leases.impl.LeaseManager.createLeaseTableIfNotExists(LeaseManager.java:118)
... 4 more
Here is my code sample:
import com.amazonaws.auth.BasicAWSCredentials
import com.amazonaws.internal.StaticCredentialsProvider
import com.amazonaws.regions.RegionUtils
import com.amazonaws.services.kinesis.AmazonKinesisClient
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.kinesis.KinesisUtils
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Milliseconds, StreamingContext}
/**
* Created by franco on 11/11/16.
*/
object TestApp {
// === Configurations for Kinesis streams ===
val awsAccessKeyId = "XXXXXX"
val awsSecretKey = "XXXXXXX"
val kinesisStreamName = "MyStream"
val kinesisEndpointUrl = "https://kinesis.region.amazonaws.com" //example "https://kinesis.us-west-2.amazonaws.com"
val appName = "MyAppName"
def main(args: Array[String]): Unit = {
val credentials = new BasicAWSCredentials(awsAccessKeyId,awsSecretKey)
val provider = new StaticCredentialsProvider(credentials)
val kinesisClient = new AmazonKinesisClient(provider)
kinesisClient.setEndpoint(kinesisEndpointUrl)
val shards = kinesisClient.describeStream(kinesisStreamName).getStreamDescription.getShards.size()
val streams = shards
val batchInterval = Milliseconds(2000)
val kinesisCheckpointInterval = batchInterval
val regionName = RegionUtils.getRegionByEndpoint(kinesisEndpointUrl).getName
val cores : Int = Runtime.getRuntime.availableProcessors()
println("Available Cores : " + cores.toString)
val config = new SparkConf().setAppName("MyAppName").setMaster("local[" + (cores / 2 ) + "]" )
val ssc = new StreamingContext(config, batchInterval)
// Create the Kinesis DStreams
val kinesisStreams = (0 until streams).map { i =>
KinesisUtils.createStream(ssc, appName, kinesisStreamName, kinesisEndpointUrl, regionName,
InitialPositionInStream.LATEST, kinesisCheckpointInterval * 2, StorageLevel.MEMORY_AND_DISK_2)
}
ssc.union(kinesisStreams).map(bytes => new String(bytes)).print()
// Start the streaming context and await termination
ssc.start()
ssc.awaitTermination()
}
}
And my IAM policy looks like this:
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "Stmt123",
"Effect": "Allow",
"Action": [
"kinesis:DescribeStream",
"kinesis:GetShardIterator",
"kinesis:GetRecords"
],
"Resource": [
"arn:aws:kinesis:region:account:stream/name"
]
},
{
"Sid": "Stmt456",
"Effect": "Allow",
"Action": [
"dynamodb:CreateTable",
"dynamodb:DeleteItem",
"dynamodb:DescribeTable",
"dynamodb:GetItem",
"dynamodb:PutItem",
"dynamodb:Scan",
"dynamodb:UpdateItem"
],
"Resource": [
"arn:aws:dynamodb:region:account:table/name"
]
},
{
"Sid": "Stmt789",
"Effect": "Allow",
"Action": [
"cloudwatch:PutMetricData"
],
"Resource": [
"*"
]
}
]
}
I cannot get what is wrong with this app. Any guidance on this subject will be appreciated.
There are other constructors for the DStreams that allow you to pass in the AWS Access Key and Secret Key.
The 1st and 5th constructors in the link below for example will allow you to pass them in the constructor (and it should be passed through your system) vs having to set a System Property.