Search code examples
apache-kafkaopenwhisk

OpenWhisk send message to Kafka timeout


Environment details

CentOS7、Standalone OpenWhisk

Problem description

I plan to send a message to Kafka in openwhisk, the data flow process is: WSK CLI -> OpenWhisk action -> kafka-console-consume.

But there will be intermittent failures in the process,such as: I send "test01"~"test06", only get "test02"、"test04"、"test06".

According to the log , The cause of the failures is a timeout.

This is my action script:

// js
const kafka = require('kafka-node');

const Producer = kafka.Producer;
const client = new kafka.KafkaClient({
    kafkaHost: "192.168.68.132:3093"
});

const producer = new Producer(client);

function main(actionObj) {
    var message = JSON.stringify(actionObj);
    const payloads = [
        {
            topic: 'beforeReductionTopic',
            messages: message,
            partition: 0
        }
    ];
    return new Promise(function (resolve, reject) {
        producer
        .on('ready', () => {
            producer.send(payloads, function (err, data) {
                if (err) {
                    console.log("++++++ producer err: ", err);
                    return reject(err);
                } else {
                    console.log("++++++ producer data: ", data);
                    return resolve(data);
                }
            });
        })
        .on("error", err => {
            console.error(err);
            return reject(err);
        });
    });
}

exports.main = main;

According to the log , it is "action developer error".

I use the follow code to test the action script, kafka can receive all messages.

const action = require("./openwhisk-kafka.js");
for(let i = 0; i < 3; i++) {
    action.main("ccc");
}

I don't know how to modify the action script.

Log detail

Operation record of invoke openwhisk action with WSK CLI

[root@localhost openwhisk-kafka]# wsk action update scicatTest --kind nodejs:10 /test/actions/openwhisk-kafka/openwhisk-kafka.zip 
ok: updated action scicatTest
[root@localhost openwhisk-kafka]# wsk action invoke scicatTest --result --param actionObj "test01"
ok: invoked /_/scicatTest, but the request has not yet finished, with id 78e633791c114d29a633791c11fd29c2
[root@localhost openwhisk-kafka]# wsk action invoke scicatTest --result --param actionObj "test02"
{
    "beforeReductionTopic": {
        "0": 57
    }
}
[root@localhost openwhisk-kafka]# wsk action invoke scicatTest --result --param actionObj "test03"
ok: invoked /_/scicatTest, but the request has not yet finished, with id 09165a45a0f94011965a45a0f920110b
[root@localhost openwhisk-kafka]# wsk action invoke scicatTest --result --param actionObj "test04"
{
    "beforeReductionTopic": {
        "0": 58
    }
}
[root@localhost openwhisk-kafka]# wsk action invoke scicatTest --result --param actionObj "test05"
ok: invoked /_/scicatTest, but the request has not yet finished, with id 8a9f38f9d5ab4fa49f38f9d5ab6fa45a
[root@localhost openwhisk-kafka]# wsk action invoke scicatTest --result --param actionObj "test06"
{
    "beforeReductionTopic": {
        "0": 59
    }
}
[root@localhost openwhisk-kafka]# 

the fail action log

[root@localhost openwhisk-kafka]# wsk activation get 8a9f38f9d5ab4fa49f38f9d5ab6fa45a
ok: got activation 8a9f38f9d5ab4fa49f38f9d5ab6fa45a
{
    "namespace": "guest",
    "name": "scicatTest",
    "version": "0.0.4",
    "subject": "guest",
    "activationId": "8a9f38f9d5ab4fa49f38f9d5ab6fa45a",
    "start": 1619540684355,
    "end": 1619540744418,
    "duration": 60063,
    "statusCode": 0,
    "response": {
        "status": "action developer error",
        "statusCode": 0,
        "success": false,
        "result": {
            "error": "The action exceeded its time limits of 60000 milliseconds."
        }
    },
    "logs": [],
    "annotations": [
        {
            "key": "path",
            "value": "guest/scicatTest"
        },
        {
            "key": "waitTime",
            "value": 652
        },
        {
            "key": "kind",
            "value": "nodejs:10"
        },
        {
            "key": "timeout",
            "value": true
        },
        {
            "key": "limits",
            "value": {
                "concurrency": 1,
                "logs": 10,
                "memory": 256,
                "timeout": 60000
            }
        }
    ],
    "publish": false
}

Solution

  • do not use "kafka-node". replace with "kafkajs"