Is there a functional project that would enable moving data from Node.js into Flume-NG, without an intermediary file.
Perhaps I am missing something, I would have thought moving data from Node.js into Flume a more common need but it seems that is not the case.
I have found a number of projects that seem to have attempted this but all seem to have been abandoned about 3 years ago and are not functional with current versions. There seem to be some that will work with the older versions of flume, but the API changed substantially with flume-ng and they are no longer applicable.
I have found avro and thrift modules for Node.js, and thrift has node.js support now seeming to indicate this should be straight forward, but that is not working, there may not be enough information about which transport/protocol to use with Flume-NG, or maybe I just don't understand it well enough.
Can anyone point me in the right direction before I re-invent the wheel?
This is the current node code I have. It generates a ECONNREFUSED.
#!/usr/local/bin/node
var thrift = require('thrift');
//var ThriftTransports = require('thrift/transport');
//var ThriftProtocols = require('thrift/protocol');
var Flume = require('./gen-nodejs/ThriftSourceProtocol');
var ttypes = require('./gen-nodejs/flume_types');
transport = thrift.TBufferedTransport();
protocol = thrift.TBinaryProtocol();
//transport = ThriftTransports.TBufferedTransport();
//protocol = ThriftProtocols.TBinaryProtocol();
var connection = thrift.createConnection("127.0.0.1", 51515,{
transport: transport,
protocol: protocol
});
connection.on('error', function(err) {
console.error(err);
});
var client = thrift.createClient(Flume, connection);
var myEvent = new ttypes.ThriftFlumeEvent();
myEvent.headers = {};
myEvent.body = "body";
client.append(myEvent, function(err,data) {
if (err) {
// handle err
} else {
// data == [ttypes.ColumnOrSuperColumn, ...]
}
connection.end();
});
This is the Setup of the Thrift Server implemented in FLUME-1894, file ThriftSource.java:
args.protocolFactory(new TCompactProtocol.Factory());
args.inputTransportFactory(new TFastFramedTransport.Factory());
args.outputTransportFactory(new TFastFramedTransport.Factory());
args.processor(new ThriftSourceProtocol.Processor<ThriftSourceHandler>(new ThriftSourceHandler()));
To get it to work, you need to use compatible stack on the client side:
There is already a client implentation in ThriftRpcClient.java, so you don't need to invent the wheel again.