Search code examples
scaladockerapache-flink

Cannot instantiate user function in scala using flink


I'm trying to run a large open-source project written in Scala. I am running it using 3 docker images.

The docker-compose file is here.

version: '2'
services:
  streammachine:
    environment:
      - "EXECUTION_TYPE=flink-cluster"
      - "FLINK_JOBMGR_HOST=jobmanager"
      - "FLINK_JOBMGR_PORT=8081"
      - "FLINK_MONITORING_HOST=jobmanager"
      - "FLINK_MONITORING_PORT=8081"
      - "JOB_MANAGER_RPC_ADDRESS=jobmanager"
      - "TSP_JAVA_OPTS=-Xms2G -Xmx4G" 
    restart: on-failure
    image: clovergrp/tsp:latest
    ports:
      - "8080:8080"
  jobmanager:
    image: flink:1.7.2-scala_2.12-alpine
    expose:
      - "6123"
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager

  taskmanager:
    image: flink:1.7.2-scala_2.12-alpine
    expose:
      - "6121"
      - "6122"
    depends_on:
      - jobmanager
    command: taskmanager
    links:
      - "jobmanager:jobmanager"
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager

The project itself uses Scala 2.12.7. It is taken from Build.sbt

However, when I use the functionality of the project, I am facing an error Cannot instantiate user function.

org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot instantiate user function.
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:239)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createChainedOperator(OperatorChain.java:369)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.createOutputCollector(OperatorChain.java:296)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.<init>(OperatorChain.java:133)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:267)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:704)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.InvalidClassException: scala.Symbol; local class incompatible: stream classdesc serialVersionUID = 6865603221856321286, local class serialVersionUID = 2966401305346518859
    at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:699)
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1885)
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1751)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2042)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
    at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1975)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1567)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2287)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2211)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2069)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1573)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:566)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:552)
    at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:540)
    at org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:501)
    at org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperator(StreamConfig.java:224)

Solution

  • Сhange flink version in the docker-compose, and scala-version in build.sbt