I am not a native English speaker but I try to express my question as clear as possible. I encountered this problem which has confused me for two days and I still can't find the solution.
I have built a stream which will run in the Spring Could Data Flow in the Hadoop YARN.
The stream is composed of Http source,processor and file sink.
1.Http Source
The HTTP Source component has two output channels binding with two different destinations which are dest1 and dest2 defined in the application.properties.
spring.cloud.stream.bindings.output.destination=dest1 spring.cloud.stream.bindings.output2.destination=dest2
Below is the code snipet for HTTP source for your reference..
@Autowired
private EssSource channels; //EssSource is the interface for multiple output channels
##output channel 1:
@RequestMapping(path = "/file", method = POST, consumes = {"text/*", "application/json"})
@ResponseStatus(HttpStatus.ACCEPTED)
public void handleRequest(@RequestBody byte[] body, @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) {
logger.info("enter ... handleRequest1...");
channels.output().send(MessageBuilder.createMessage(body,
new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
}
##output channel 2:
@RequestMapping(path = "/test", method = POST, consumes = {"text/*", "application/json"})
@ResponseStatus(HttpStatus.ACCEPTED)
public void handleRequest2(@RequestBody byte[] body, @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) {
logger.info("enter ... handleRequest2...");
channels.output2().send(MessageBuilder.createMessage(body,
new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));
}
2. Processor
The processor has two multiple input channels and two output channels binding with different destinations.
The destination binding is defined in application.properties in processor component project.
//input channel binding
spring.cloud.stream.bindings.input.destination=dest1
spring.cloud.stream.bindings.input2.destination=dest2
//output channel binding
spring.cloud.stream.bindings.output.destination=hdfsSink
spring.cloud.stream.bindings.output2.destination=fileSink
Below is the code snippet for Processor.
@Transformer(inputChannel = EssProcessor.INPUT, outputChannel = EssProcessor.OUTPUT)
public Object transform(Message<?> message) {
logger.info("enter ...transform...");
return "processed by transform1";;
}
@Transformer(inputChannel = EssProcessor.INPUT_2, outputChannel = EssProcessor.OUTPUT_2)
public Object transform2(Message<?> message) {
logger.info("enter ... transform2...");
return "processed by transform2";
}
3. The file sink component.
I use the official fil sink component from Spring. maven://org.springframework.cloud.stream.app:file-sink-kafka:1.0.0.BUILD-SNAPSHOT
And I just add the destination binding in its applicaiton.properties file. spring.cloud.stream.bindings.input.destination=fileSink
4.Finding:
The data flow I expected should like this:
Source.handleRequest() -->Processor.handleRequest()
Source.handleRequest2() -->Processor.handleRequest2() --> Sink.fileWritingMessageHandler();
Should only the string "processed by transform2" is saved to the file.
But after my testing, the data flow is actual like this:
Source.handleRequest() -->Processor.handleRequest() --> Sink.fileWritingMessageHandler();
Source.handleRequest2() -->Processor.handleRequest2() --> Sink.fileWritingMessageHandler();
Both the "processed by transform1" and "processed by transform2" string are saved to the file.
5.Question:
Although the destination for the output channel in Processor.handleRequest() binds to hdfsSink instead of fileSink,the data still flows to file Sink. I can't understand this and this is not what I want. I only want the data from Processor.handleRequest2() flows to file sink instead of both. If I don't do it right, could anyone tell me how to do it and what is the solution? It has been confused me for 2 days.
Thanks you for your kindly help.
Alex
Is your stream definition something like this (where the '-2' versions are the ones with multiple channels) ?
http-source-2 | processor-2 | file-sink
Note that Spring Cloud Data Flow will override the destinations defined in applications.properties
which is why, even if spring.cloud.stream.bindings.output.destination
for the processor is set to hdfs-sink
, it will actually match the input of file-sink
.
The way destinations are configured from a stream definition is explained here (in the context of taps): http://docs.spring.io/spring-cloud-dataflow/docs/current/reference/htmlsingle/#spring-cloud-dataflow-stream-tap-dsl
What you can do is to simply swap the meaning of channel 1 and 2 - use the side channel for hdfs. This is a bit brittle though - as the input
/output
channels of the Stream will be configured automatically and the other channels will be configured via application.properties
- in this case it may be better to configure the side channel destinations via stream definition or at deployment time - see http://docs.spring.io/spring-cloud-dataflow/docs/current/reference/htmlsingle/#_application_properties.
It seems to me that these could be just as well be 2 streams listening to separate endpoints, using regular components - given that data is supposed to be flowing side by side.