Search code examples
jdbcapache-storm

Storm simple jdbc mapper write array to phoenix db not supported?


I have a Storm topology (hortonworks distribution, version 1.1.0.2.6.2) that writes to a phoenix (version 4.7.0.2.6.2) db using the JdbcInsertBolt class. I have successfully written to standard columns eg: varchar and int, but need to write to an array column and have ran into this:

java.lang.RuntimeException: We do not support tables with SqlType: ARRAY
    at org.apache.storm.jdbc.common.Util.getJavaType(Util.java:72) ~[stormjar.jar:?]
    at org.apache.storm.jdbc.mapper.SimpleJdbcMapper.getColumns(SimpleJdbcMapper.java:60) ~[stormjar.jar:?]
    at org.apache.storm.jdbc.bolt.JdbcInsertBolt.process(JdbcInsertBolt.java:87) [stormjar.jar:?]
    at org.apache.storm.topology.base.BaseTickTupleAwareRichBolt.execute(BaseTickTupleAwareRichBolt.java:38) [storm-core-1.1.0.2.6.2.14-5.jar:1.1.0.2.6.2.14-5]
    at org.apache.storm.daemon.executor$fn__10454$tuple_action_fn__10456.invoke(executor.clj:730) [storm-core-1.1.0.2.6.2.14-5.jar:1.1.0.2.6.2.14-5]
    at org.apache.storm.daemon.executor$mk_task_receiver$fn__10375.invoke(executor.clj:462) [storm-core-1.1.0.2.6.2.14-5.jar:1.1.0.2.6.2.14-5]
    at org.apache.storm.disruptor$clojure_handler$reify__9889.onEvent(disruptor.clj:40) [storm-core-1.1.0.2.6.2.14-5.jar:1.1.0.2.6.2.14-5]
    at org.apache.storm.utils.DisruptorQueue.consumeBatchToCursor(DisruptorQueue.java:472) [storm-core-1.1.0.2.6.2.14-5.jar:1.1.0.2.6.2.14-5]
    at org.apache.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:451) [storm-core-1.1.0.2.6.2.14-5.jar:1.1.0.2.6.2.14-5]

The array Im attempting to write is an array of Strings, is there a way around this issue?


Solution

  • Wrote my own implementation of JdbcInsertBolt,JdbcClient, and JdbcMapper

    Mapper relevant code:

     else if(getJavaType(columnSqlType).equals(Array.class)) {
                String[] value = (String[]) tuple.getValueByField(columnName);
                columns.add(new Column(columnName, PDataType.instantiatePhoenixArray(PDataType.arrayBaseType(PVarcharArray.INSTANCE),value) , columnSqlType));
    

    Client relevant code:

     else if (columnJavaType.equals(Array.class)) {
                preparedStatement.setArray(index, (PhoenixArray) column.getVal());