Search code examples
javaamazon-web-servicesamazon-kinesisamazon-kinesis-firehose

Getting errors in java program to write to Kinesis Firehose stream


I'm trying to just write some data from an API (google stocks/finance API) to my AWS Firehose stream. I already downloaded and installed the AWS plugin on Eclipse, setup my Firehose stream on AWS, and everything seems to be setup correctly. Am encountering some problems, though. The following line seems to be deprecated...I tried different variations from Amazon's SDK, but I can't seem to get the correct code.

AmazonKinesisFirehoseClient firehoseClient = new AmazonKinesisFirehoseClient(credentials);

Next, I'm getting some errors with the following. The specific error is, "The method setRecord(Record) is undefined for the type PutRecordRequest," even though I took it directly from Amazon's API reference.

request.setRecord(record);

firehoseClient.putRecord(request);

Also getting an error on the second line above: "The method putRecord(com.amazonaws.services.kinesisfirehose.model.PutRecordRequest) in the type AmazonKinesisFirehoseClient is not applicable for the arguments (com.amazonaws.services.kinesis.model.PutRecordRequest)"

package com.amazonaws.samples;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.ByteBuffer;

import org.apache.http.client.CredentialsProvider;

import com.amazonaws.*;
import com.amazonaws.AmazonClientException;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSCredentialsProvider;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient;
import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClient;
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
import com.amazonaws.services.kinesis.model.PutRecordRequest;
import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
import com.amazonaws.services.kinesisfirehose.AmazonKinesisFirehoseClient;
import com.amazonaws.services.kinesisfirehose.model.PutRecordBatchRequest;
import com.amazonaws.services.kinesisfirehose.model.Record;

public class FirehoseExample {

    public static void main(String[] args) {
        AWSCredentials credentials = null;

        try {
            credentials = new ProfileCredentialsProvider().getCredentials();
        }

        catch (Exception e) {
            throw new AmazonClientException("Cannot load the credentials from the credential profiles file. "
                    + "Please make sure that your credentials file is at the correct "
                    + "location (/Users/elybenari/.aws/credentials), and is in valid format.", e);
        }

        AmazonKinesisFirehoseClient firehoseClient = new AmazonKinesisFirehoseClient(credentials);
        PutRecordRequest request = new PutRecordRequest();
        request.setStreamName("project-stream");

        Record record = new Record();

        for (int i = 0; i < 20*60; i++){
            try {
                URL url = new URL("https://www.google.com/finance/info?q=NASDAQ:AMZN");
                HttpURLConnection conn = (HttpURLConnection) url.openConnection();
                BufferedReader reader = new BufferedReader(new InputStreamReader(conn.getInputStream()));
                StringBuilder response = new StringBuilder();
                String line;

                while ((line = reader.readLine()) != null) {
                    response.append(line);  
                }
                reader.close();

                System.out.println(response.toString().replace("\n", "").replaceAll(" ", ""));
                System.out.println("****\n");

                ByteBuffer buffer = ByteBuffer.wrap(response.toString().replace("\n", "").replaceAll(" ", "").getBytes());
                record.setData(buff);

                request.setRecord(record);

                firehoseClient.putRecord(request);

                Thread.sleep(2000);


            }
            catch(Exception e){
                e.printStackTrace();
            }
        }   

    }




    }

Solution

  • The problem is that you've included some classes from Kinesis, not Kinesis Firehose, Java package. For e.g., you've used:

    import com.amazonaws.services.kinesis.model.PutRecordRequest;
    

    Whereas, you should've used:

    import com.amazonaws.services.kinesisfirehose.model.PutRecordRequest;
    

    Kinesis, Kinesis Firehose and Kinesis Analytics are different services, even though they fall under one umbrella of streaming services on AWS. Consequently, they have different package namespaces in the Java SDK. If you start from the official documentation here, you'll reach the correct Java SDK reference here.

    EDIT: To answer your other question: yes, the following is deprecated:

    AmazonKinesisFirehoseClient firehoseClient = new AmazonKinesisFirehoseClient(credentials);
    

    You should instead use the following:

     AmazonKinesisFirehoseClient firehoseClient = AmazonKinesisFirehoseClientBuilder.standard().withCredentials(new AWSStaticCredentialsProvider(awsCredentials)).build();
    

    Refer to the official documentation here on how to correctly initialize AmazonKinesisFirehoseClient.