Search code examples
spring-cloud-streamspring-cloud-dataflow

Spring Cloud Data Flow Stream files to HDFS


I'm new to Spring Cloud Data Flow. I use v 1.7.3 and want to create a simple stream that scans a directory for new files and pushes them to HDFS. I have the following definition:

file --cron='* * * * * *' --mode=ref --directory=/dir | hdfs --fs-uri=hdfs://myhdpmaster:8020

When I deploy my stream I have two issues:

  1. No matter which file mode I use there is only a hdfs-sink-0.txt created that has no content at all or lines that appear to print the default toString() output (e.g. '[B@7d5bfc85').

  2. When I put new files in the directory, the message is not received by the HDFS sink, although I see in the file source logs that the message was created.

The output of my hdfs sink:

2019-01-25 12:21:06.330  INFO 63 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version : 0.10.1.1
2019-01-25 12:21:06.330  INFO 63 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId : f10ef2720b03b247
2019-01-25 12:21:06.338  INFO 63 --- [           main] s.i.k.i.KafkaMessageDrivenChannelAdapter : started org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter@67110f71
2019-01-25 12:21:06.338  INFO 63 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : Adding {message-handler:inbound.testhdfs1.file.testhdfs1} as a subscriber to the 'bridge.testhdfs1.file' channel
2019-01-25 12:21:06.338  INFO 63 --- [           main] o.s.i.endpoint.EventDrivenConsumer       : started inbound.testhdfs1.file.testhdfs1
2019-01-25 12:21:06.340  INFO 63 --- [           main] o.s.c.support.DefaultLifecycleProcessor  : Starting beans in phase 2147483647
2019-01-25 12:21:06.476  INFO 63 --- [           main] s.b.c.e.t.TomcatEmbeddedServletContainer : Tomcat started on port(s): 47888 (http)
2019-01-25 12:21:06.483  INFO 63 --- [           main] s.c.s.a.h.s.k.HdfsSinkKafka10Application : Started HdfsSinkKafka10Application in 17.593 seconds (JVM running for 18.756)
2019-01-25 12:21:08.250  INFO 63 --- [           -C-1] o.a.k.c.c.internals.AbstractCoordinator  : Discovered coordinator kafka:9092 (id: 2147482646 rack: null) for group testhdfs1.
2019-01-25 12:21:08.256  INFO 63 --- [           -C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Revoking previously assigned partitions [] for group testhdfs1
2019-01-25 12:21:08.256  INFO 63 --- [           -C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$3  : partitions revoked:[]
2019-01-25 12:21:08.256  INFO 63 --- [           -C-1] o.a.k.c.c.internals.AbstractCoordinator  : (Re-)joining group testhdfs1
2019-01-25 12:21:08.522  INFO 63 --- [           -C-1] o.a.k.c.c.internals.AbstractCoordinator  : Successfully joined group testhdfs1 with generation 1
2019-01-25 12:21:08.526  INFO 63 --- [           -C-1] o.a.k.c.c.internals.ConsumerCoordinator  : Setting newly assigned partitions [testhdfs1.file-0] for group testhdfs1
2019-01-25 12:21:08.735  INFO 63 --- [           -C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$3  : partitions assigned:[testhdfs1.file-0]
2019-01-25 12:21:23.238  INFO 63 --- [           -L-1] o.s.i.codec.kryo.CompositeKryoRegistrar  : registering [40, java.io.File] with serializer org.springframework.integration.codec.kryo.FileSerializer
2019-01-25 12:21:23.353  INFO 63 --- [           -L-1] o.s.d.h.s.o.AbstractDataStreamWriter     : Creating output for path /data/hdfs-sink-0.txt

Solution

  • You can't copy files into hdfs with hdfs sink as it's just meant to write arbitrary messages received from sources. Reason you see zero length of that files is that file is still open and not flushed. hdfs sink readme contains config options and if you i.e. use idle-timeout or rollover settings you're starting to see files written.