Search code examples
exceptionapache-samza

Samza Failed to send message. Exception


I'm using samza on aws emr instances, and I always have exception like follows, can anyone help me?:

org.apache.samza.SamzaException: Failed to send message. Exception: java.lang.IllegalStateException: Cannot send after the producer is closed. at org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$send$5.apply(KafkaSystemProducer.scala:120) at org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$send$5.apply(KafkaSystemProducer.scala:111) at org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:81) at org.apache.samza.system.kafka.KafkaSystemProducer.send(KafkaSystemProducer.scala:86) at org.apache.samza.system.SystemProducers.send(SystemProducers.scala:87) at org.apache.samza.task.TaskInstanceCollector.send(TaskInstanceCollector.scala:61) at org.apache.samza.storage.kv.LoggedStore.putAll(LoggedStore.scala:72) at org.apache.samza.storage.kv.SerializedKeyValueStore.putAll(SerializedKeyValueStore.scala:57) at org.apache.samza.storage.kv.CachedStore.flush(CachedStore.scala:166) at org.apache.samza.storage.kv.NullSafeKeyValueStore.flush(NullSafeKeyValueStore.scala:69) at org.apache.samza.storage.kv.KeyValueStorageEngine.flush(KeyValueStorageEngine.scala:113) at org.apache.samza.storage.kv.KeyValueStorageEngine.close(KeyValueStorageEngine.scala:125) at org.apache.samza.storage.kv.KeyValueStorageEngine.stop(KeyValueStorageEngine.scala:119) at org.apache.samza.storage.TaskStorageManager$$anonfun$stop$2.apply(TaskStorageManager.scala:147) at org.apache.samza.storage.TaskStorageManager$$anonfun$stop$2.apply(TaskStorageManager.scala:147) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206) at org.apache.samza.storage.TaskStorageManager.stop(TaskStorageManager.scala:147) at org.apache.samza.container.TaskInstance.shutdownStores(TaskInstance.scala:185) at org.apache.samza.container.SamzaContainer$$anonfun$shutdownStores$2.apply(SamzaContainer.scala:650) at org.apache.samza.container.SamzaContainer$$anonfun$shutdownStores$2.apply(SamzaContainer.scala:650) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.MapLike$DefaultValuesIterable.foreach(MapLike.scala:206) at org.apache.samza.container.SamzaContainer.shutdownStores(SamzaContainer.scala:650) at org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:560) at org.apache.samza.container.SamzaContainer$.safeMain(SamzaContainer.scala:93) at org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:67) at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala)


Solution

  • I missed the real exception in the wrong file.