Search code examples
javasqljoinhazelcast

Hazelcast SQL JOIN throwing java.lang.AssertionError: class com.hazelcast.jet.sql.impl.opt.physical.IndexScanMapPhysicalRel


I am trying to execute the following Hazelcast sql query:

    private static final String SQL_FIND_BY_CLIENT_CODE =
            "SELECT c.clientKey " +
            "FROM client c " +
            "JOIN client2system c2s on c2s.clientId = c.clientId " +
            "WHERE c2s.systemId = ? " +
            "AND c2s.clientCode = ?";

    @Autowired
    private SqlService sqlService;

    public Client findByClientCode(long systemId, String clientCode) {
        try (SqlResult result = sqlService.execute(SQL_FIND_BY_CLIENT_CODE, systemId, clientCode) {
           ...
        }
    }

But I'm getting the following exception

java.lang.AssertionError: class com.hazelcast.jet.sql.impl.opt.physical.IndexScanMapPhysicalRel
        at com.hazelcast.jet.sql.impl.opt.physical.CreateDagVisitor.onNestedLoopJoin(CreateDagVisitor.java:380)
        at com.hazelcast.jet.sql.impl.opt.physical.JoinNestedLoopPhysicalRel.accept(JoinNestedLoopPhysicalRel.java:110)
        at com.hazelcast.jet.sql.impl.opt.physical.CreateDagVisitor.connectInput(CreateDagVisitor.java:494)
        at com.hazelcast.jet.sql.impl.opt.physical.CreateDagVisitor.connectInputPreserveCollation(CreateDagVisitor.java:536)
        at com.hazelcast.jet.sql.impl.opt.physical.CreateDagVisitor.onProject(CreateDagVisitor.java:207)
        at com.hazelcast.jet.sql.impl.opt.physical.ProjectPhysicalRel.accept(ProjectPhysicalRel.java:61)
        at com.hazelcast.jet.sql.impl.opt.physical.CreateDagVisitor.connectInput(CreateDagVisitor.java:494)
        at com.hazelcast.jet.sql.impl.opt.physical.CreateDagVisitor.onRoot(CreateDagVisitor.java:470)
        at com.hazelcast.jet.sql.impl.opt.physical.RootRel.accept(RootRel.java:43)
        at com.hazelcast.jet.sql.impl.CalciteSqlOptimizer.traverseRel(CalciteSqlOptimizer.java:660)
        at com.hazelcast.jet.sql.impl.CalciteSqlOptimizer.toPlan(CalciteSqlOptimizer.java:528)
        at com.hazelcast.jet.sql.impl.CalciteSqlOptimizer.createPlan(CalciteSqlOptimizer.java:283)
        at com.hazelcast.jet.sql.impl.CalciteSqlOptimizer.prepare(CalciteSqlOptimizer.java:242)
        at com.hazelcast.sql.impl.SqlServiceImpl.prepare(SqlServiceImpl.java:256)
        at com.hazelcast.sql.impl.SqlServiceImpl.query0(SqlServiceImpl.java:236)
        at com.hazelcast.sql.impl.SqlServiceImpl.execute(SqlServiceImpl.java:193)
        at com.hazelcast.sql.impl.SqlServiceImpl.execute(SqlServiceImpl.java:161)
        at com.hazelcast.sql.impl.SqlServiceImpl.execute(SqlServiceImpl.java:157)
        at com.hazelcast.sql.impl.SqlServiceImpl.execute(SqlServiceImpl.java:153)
        at com.hazelcast.sql.SqlService.execute(SqlService.java:89)

Looking into the hazelcast sources of CreateDagVisitor at line 380 I see the following

    public Vertex onNestedLoopJoin(JoinNestedLoopPhysicalRel rel) {
        assert rel.getRight() instanceof FullScanPhysicalRel : rel.getRight().getClass(); // this is line 380

        Table rightTable = rel.getRight().getTable().unwrap(HazelcastTable.class).getTarget();
        collectObjectKeys(rightTable);

        VertexWithInputConfig vertexWithConfig = getJetSqlConnector(rightTable).nestedLoopReader(
                dag,
                rightTable,
                rel.rightFilter(parameterMetadata),
                rel.rightProjection(parameterMetadata),
                rel.joinInfo(parameterMetadata)
        );
        Vertex vertex = vertexWithConfig.vertex();
        connectInput(rel.getLeft(), vertex, vertexWithConfig.configureEdgeFn());
        return vertex;
    }

It seems that Hazelcast is expecting an instance of FullScanPhysicalRel but is finding an instance of IndexScanMapPhysicalRel

I'm using hazelcast 5.1

    <properties>
        <version.hazelcast>5.1</version.hazelcast>
    </properties>
    ...
        <dependency>
            <groupId>com.hazelcast</groupId>
            <artifactId>hazelcast-spring</artifactId>
            <version>${version.hazelcast}</version>
        </dependency>
        <dependency>
            <groupId>com.hazelcast</groupId>
            <artifactId>hazelcast</artifactId>
            <version>${version.hazelcast}</version>
        </dependency>
        <dependency>
            <groupId>com.hazelcast</groupId>
            <artifactId>hazelcast-sql</artifactId>
            <version>${version.hazelcast}</version>
        </dependency>

Please note that client and client2system are both IMaps which I have made available to sql via

    @Bean
    public SqlService sqlService(HazelcastInstance hazelcastInstance) {
        SqlService sqlService = hazelcastInstance.getSql();
        Map<String, Class<?>> configMap = Map.of(
                "client", Client.class,
                "client2system", Client2System.class,
        );
        configMap.forEach((name, type) -> {
            sqlService.execute(String.format("CREATE MAPPING \"%s\" TYPE IMap OPTIONS ('keyFormat' = 'java', 'keyJavaClass' = 'java.lang.Long', 'valueFormat' = 'java', 'valueJavaClass' = '%s' )", name, type.getName()));
        });
        return sqlService;
    }

Solution

  • It turns out that the following index in hazelcast.yaml is causing the issue

    hazelcast:
      map:
        client2system:
          indexes:
            - type: HASH
              attributes:
                - systemId
                - clientCode
    

    When I remove this index the query returns the desired results but obviously it is not indexed so will perform badly (I need the index for performance reasons).

    I get the feeling that Hazelcast have not fully implemented JOIN and require the right hand side of the JOIN to be a full scan instead of an indexed scan.

    I raised an issue on Hazelcast for this https://github.com/hazelcast/hazelcast/issues/22160

    As a workaround I make two calls to hazelcast instead of using JOIN

    @AllArgsConstructor
    @Component
    public class ClientService {
        private final SqlService sqlService;
        private final IMap<Long, Client> clientMap;
    
        private static final String CLIENT_ID_SQL =
                "SELECT clientId " +
                "FROM client2system " +
                "WHERE systemId = ? " +
                "AND clientCode = ?";
    
        public Client findByClientCode(long systemId, String clientCode) {
            try (SqlResult result = sqlService.execute(CLIENT_ID_SQL, systemId, clientCode)) {
                Iterator<SqlRow> iterator = result.iterator();
                if (!iterator.hasNext()) {
                    return null;
                }
                Long clientId = iterator.next().getObject(0);
                Collection<Client> clients = clientMap.values(Predicates.equal("clientId", clientId));
                return clients.isEmpty() ? null : clients.iterator().next();
            }
        }
    }
    

    Update

    Hazelcast have fixed the issue which will be available in the next release version 5.2