I tried realtime data streams project and use kafka, elasticsearch, kibana, postgres with docker compose and flink.
My data streams like this :
kafka -> flink -> elasticsearch and postgres.
When I tried to writing kafka streams data into elasticsearch but on kibana dev tools console(GET index/_search or GET index) I can't find new data until cancel flink job.
flink job start -> can't find new data on kibana -> cancel flink job -> now I can see new data on kibana.
Part of my code is
DataStream<Transaction> transactionStream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka source");
transactionStream.sinkTo(
new Elasticsearch7SinkBuilder<Transaction>()
.setHosts(new HttpHost("localhost", 9200, "http"))
.setEmitter((transaction, runtimeContext, requestIndexer) -> {
String json = convertTransactionToJson(transaction);
IndexRequest indexRequest = Requests.indexRequest()
.index("transactions")
.id(transaction.getTransactionId())
.source(json, XContentType.JSON);
requestIndexer.add(indexRequest);
})
.build()
).name("Elasticsearch Sink");
Postgres DB update is fine.
I use Mac and
Java version: 11
flink : 1.18.0
flink connector kafka : 3.0.1-1.18
flink sql connector elasticsearch7 : 3.0.1-1.17
What I tried:
but another error occurs
Unable to parse response body for Response{requestLine=POST /_bulk?timeout=1m HTTP/1.1, host=http://localhost:9200, response=HTTP/1.1 200 OK}
My code exactly same with this repository
https://github.com/airscholar/FlinkCommerce
https://www.youtube.com/watch?v=deepQRXnniM
So I clone this, try execute, but same problem happens. On his youtube this problem doesn't happen.
What can I do for this?
The Elasticsearch sink either commits transactions during checkpointing, or it flushes its buffer once it contains 1000 actions. Enabling checkpointing is probably the best solution.