I'm trying to read records from a topic in Kafka which is produced by a kafka connect jdbc source connector , here is the connector config:
"name": "customers",
"config": {
"poll.interval.ms": "3000",
"table.poll.interval.ms": "10000",
"errors.log.enable": "false",
"errors.log.include.messages": "false",
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"topic.prefix": "_",
"mode": "incrementing",
"validate.non.null": "true",
"table.whitelist": "customers",
"incrementing.column.name": "customer_id",
"connection.url": "jdbc:sqlserver://demo-sqlserver:1433;databaseName=DemoData",
"connection.user": "sa",
"connection.password": "password",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://kafka-schema-registry:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://kafka-schema-registry:8081",
"tasks.max": "2"
}
I'm using docker-compose to run kafka stack containers with this images: zookeeper:3.4.9, confluentinc/cp-kafka:5.5.1, confluentinc/cp-schema-registry:5.5.1, confluentinc/cp-kafka-connect:5.5.1
and I'm using flink 1.12.0 and here is my docker-compose and docker file for flink:
services:
jobmanager:
build:
dockerfile: Dockerfile
context: .
volumes:
- ./examples:/opt/examples
- ./opt/flink/usrlib:/opt/flink/usrlib
hostname: "jobmanager"
expose:
- "6123"
ports:
- "28081:8081"
command: jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
networks:
- flink-proxy-net
taskmanager:
build:
dockerfile: Dockerfile
context: .
volumes:
- ./examples:/opt/examples
- ./opt/flink/usrlib:/opt/flink/usrlib
expose:
- "6121"
- "6122"
depends_on:
- jobmanager
command: taskmanager
links:
- jobmanager:jobmanager
environment:
- JOB_MANAGER_RPC_ADDRESS=jobmanager
networks:
- flink-proxy-net
Dockerfile:
FROM apache/flink:1.12-scala_2.11-java11
RUN set -ex
RUN apt-get update
RUN apt-get -y install python3
RUN apt-get -y install python3-pip
RUN apt-get -y install python3-dev
RUN ln -s /usr/bin/python3 /usr/bin/python
RUN ln -s /usr/bin/pip3 /usr/bin/pip
RUN set -ex;
RUN apt-get update;
RUN python -m pip install --upgrade pip;
RUN pip install apache-flink
ARG FLINK_VERSION=1.12.0
RUN wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-json/${FLINK_VERSION}/flink-json-${FLINK_VERSION}.jar
RUN wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-csv/${FLINK_VERSION}/flink-csv-${FLINK_VERSION}.jar
RUN wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-connector-kafka_2.12/${FLINK_VERSION}/flink-sql-connector-kafka_2.12-${FLINK_VERSION}.jar
RUN wget -P /opt/flink/lib/ https://repo.maven.apache.org/maven2/org/apache/flink/flink-sql-avro/${FLINK_VERSION}/flink-sql-avro-${FLINK_VERSION}.jar
RUN mkdir -p /opt/data; \
mkdir -p /opt/data/stream
WORKDIR /opt/flink
I'm trying to run a simple example of reading a topic from kafka and write data to kafka again with this python code:
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings
b_s_env = StreamExecutionEnvironment.get_execution_environment()
b_s_env.set_parallelism(1)
b_s_settings = EnvironmentSettings.new_instance().use_blink_planner().in_streaming_mode().build()
b_s_t_env = StreamTableEnvironment.create(b_s_env, environment_settings=b_s_settings)
src_kafka_ddl="""
CREATE TABLE SourceTable (
`age` INT,
`customer_id` INT,
`name` STRING
) WITH (
'connector' = 'kafka',
'topic' = 'dj_customers',
'properties.bootstrap.servers' = 'kafka1:9092',
'properties.zookeeper.connect'='zoo1:2181',
'scan.startup.mode' = 'earliest-offset',
'format' = 'avro'
)
"""
sink_kafka_ddl="""
CREATE TABLE SinkTable (
`customer_id` INT,
`name` STRING
) WITH (
'connector' = 'kafka',
'topic' = 'sink_test',
'properties.bootstrap.servers' = 'kafka1:9092',
'properties.zookeeper.connect'='zoo1:2181',
'scan.startup.mode' = 'earliest-offset',
'format' = 'avro'
)
"""
b_s_t_env.execute_sql(src_kafka_ddl)
b_s_t_env.execute_sql(sink_kafka_ddl)
orders = b_s_t_env.from_path("SourceTable")
orders.select(orders.customer_id, orders.name).execute_insert("SinkTable").wait()
after submitting the job I get timeout exception
Caused by: org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition _customers-0 could be determined.
Caused by: org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at org.apache.flink.client.deployment.ClusterClientJobClientAdapter.lambda$null$6(ClusterClientJobClientAdapter.java:123)
... 18 more
Caused by: org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:118)
at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:80)
at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:233)
at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:224)
at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:215)
at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:665)
at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:89)
at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:447)
at jdk.internal.reflect.GeneratedMethodAccessor41.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:306)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213)
at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.TimeoutException: Timeout of 60000ms expired before the position for partition _customers-0 could be determined
You need to put your Flink services on the same Docker network as the broker
Easiest way would just use one compose file for everything