Search code examples
javaapache-cameleip

Apache camel (which EIP to use here?) aggregate/enrich all rows with repeating header data


I have a CSV I am processing formatted like so:

01,H2,H3
02,B2,B3,B4,B5
02,B2,B3,B4,B5
02,B2,B3,B4,B5
02,B2,B3,B4,B5
01,H2,H3
02,B2,B3,B4,B5
02,B2,B3,B4,B5
01,H2,H3
02,B2,B3,B4,B5
02,B2,B3,B4,B5
02,B2,B3,B4,B5

01 specifies header row and 02 specifies a body row.

I need to take the header data and add it to the body messages so I end up sending messages like this:

H2,H3,B2,B3,B4,B5
H2,H3,B2,B3,B4,B5
H2,H3,B2,B3,B4,B5

I have tried to aggregate but that does not appear to be the right EIP in this scenario, as I am just combining the same message over and over again instead of multiple messages into one... At a fundamental level I need access to the header data in order to process the body (in truth it's just one field). I just don't know how to set a variable as headers and properties are cleared on each exchange. Any tips? Thanks in advance. Let me know if it would help to see the camel route as it stands.

Here is the camel route which may help:

  from("direct:inventory")
    .split(body().tokenize("\n")).streaming()
    .throttle(100)
    .choice()
      .when(property("CamelSplitComplete").isEqualTo(true))
        .log("Processed ${property.CamelSplitSize} updates")
      .end()
    .unmarshal(csv)
          .log("${body}")
          .aggregate(header("CamelFileLastModified"), new InventoryAggregationStrategy())
          .completionPredicate(header("aggregationComplete").isEqualTo(true))
          .to("freemarker://templates/inventory.ftl")
          .unmarshal().string("UTF-8")
          .unmarshal().json(JsonLibrary.Jackson)
          .convertBodyTo(JsonObject.class)
          .to("endpoint");

Here is the spec for the data


Solution

  • You could always just go with the simple approach and use a bean.

    public class CamelHeadersAndRows {
        public static class HeaderBean {
            String header = null;
            public void setHeader(String body) {
                header = body.substring("01,".length());
            }
            public String useHeader(String body) {
                return header + "," + body.substring("02,".length());
            }
        }
    
        public static void main(String[] args) throws Exception {
            final HeaderBean headerBean = new HeaderBean();
    
            Main camelMain = new Main();
            camelMain.addRouteBuilder(new RouteBuilder() {
                @Override
                public void configure() throws Exception {
                    from("timer:foo?period=1s&repeatCount=1")
                            .setBody(constant(
                                    "01,H2,H3\n" +
                                    "02,B2,B3,B4,B5\n" +
                                    "02,B2,B3,B4,B5\n" +
                                    "02,B2,B3,B4,B5\n" +
                                    "02,B2,B3,B4,B5\n" +
                                    "01,H2,H3\n" +
                                    "02,B2,B3,B4,B5\n" +
                                    "02,B2,B3,B4,B5\n" +
                                    "01,H2,H3\n" +
                                    "02,B2,B3,B4,B5\n" +
                                    "02,B2,B3,B4,B5\n" +
                                    "02,B2,B3,B4,B5"
                            ))
                            .to("direct:inventory");
    
                    from("direct:inventory")
                            .split(body().tokenize("\n")).streaming()
                            .choice()
                                .when().simple("${body} regex '^01.*'")
                                    .bean(headerBean, "setHeader")
                                    .stop()
                                .otherwise()
                                    .bean(headerBean, "useHeader")
                            .end()
                            .log("message: ${body}")
                    ;
                }
            });
            camelMain.run();
        }
    
    }