Search code examples
ignite

How to do AffinityKey join using key as multiple column of two caches/tables?


I am planning to join two caches having multiple columns as key. But I am finding difficulty to write join query. Here is the full code. Please help to write join query using two caches/tables

person value object:

private static class Person implements Serializable {
        private PersonKey personKey;
        private OrgKey orgKey;
        @QuerySqlField
        private String firstName;
        @QuerySqlField
        private String lastName;
        @QueryTextField
        private String resume;
        /** Salary (indexed). */
        @QuerySqlField(index = true)
        private double salary;
        /**
         * Custom cache key to guarantee that person is always collocated with
         * its organization.
         */
        private transient AffinityKey<PersonKey> key;


        Person(Organization org, PersonKey key, String firstName, String lastName, double salary, String resume) {
            // Generate unique ID for this person.
            this.personKey = key;
            this.orgKey = org.key;
            this.firstName = firstName;
            this.lastName = lastName;
            this.resume = resume;
            this.salary = salary;
        }

person key object:

private static class PersonKey implements Serializable {

        @QuerySqlField(index = true)
        private UUID id;
        @QuerySqlField(index = true)
        private String location;

        public PersonKey(UUID id, String location) {
            this.id = id;
            this.location = location;
        }
    }

Organization value object:

private static class Organization implements Serializable {
        /** Organization ID (indexed). */
        private OrgKey key;
        /** Organization name (indexed). */
        @QuerySqlField(index = true)
        private String name;

        public Organization(OrgKey key, String name) {
            this.key = key;
            this.name = name;
        }

organization key class:

private static class OrgKey implements Serializable {

    @QuerySqlField(index = true)
    private int id;
    @QuerySqlField(index = true)
    private String location;
    private UUID orgId;

    public OrgKey(int id, String location, UUID orgId) {
        this.id = id;
        this.location = location;
        this.orgId = orgId;
    }
}

Join query:

IgniteCache<AffinityKey<PersonKey>, Person> cache = Ignition.ignite().cache(PERSON_CACHE);

        String sql = "select * " + "from Persons, Organizations as org " + "where Persons.id = org.orgId";

        QueryCursor<List<?>> cursor = cache.query(new SqlFieldsQuery(sql));

Ignite cache creation:

/** Organizations cache name. */
    private static final String ORG_CACHE = "Organizations";
    /** Persons cache name. */
    private static final String PERSON_CACHE = "Persons";

    CacheConfiguration<OrgKey, Organization> orgCacheCfg = new CacheConfiguration<>(ORG_CACHE);
                orgCacheCfg.setCacheMode(CacheMode.PARTITIONED); // Default.
                orgCacheCfg.setIndexedTypes(OrgKey.class, Organization.class);

                CacheConfiguration<AffinityKey<PersonKey>, Person> personCacheCfg = new CacheConfiguration<>(PERSON_CACHE);
                personCacheCfg.setCacheMode(CacheMode.PARTITIONED); // Default.
                personCacheCfg.setIndexedTypes(AffinityKey.class, Person.class);

Solution

  • Please check this simple example, I do join by two fields in keys:

    package sql;
    
    import java.util.Arrays;
    import org.apache.ignite.Ignite;
    import org.apache.ignite.IgniteCache;
    import org.apache.ignite.Ignition;
    import org.apache.ignite.cache.CacheAtomicityMode;
    import org.apache.ignite.cache.query.FieldsQueryCursor;
    import org.apache.ignite.cache.query.SqlFieldsQuery;
    import org.apache.ignite.cache.query.annotations.QuerySqlField;
    import org.apache.ignite.configuration.CacheConfiguration;
    import org.apache.ignite.configuration.IgniteConfiguration;
    import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
    import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
    import org.jetbrains.annotations.NotNull;
    
    public class DoubleJoin {
        public static final String DEFAULT_CACHE_NAME = "DEFAULT";
    
        public static void main(String[] args) throws InterruptedException {
            IgniteConfiguration ignCfg = new IgniteConfiguration();
    
            TcpDiscoverySpi spi = new TcpDiscoverySpi();
            TcpDiscoveryVmIpFinder finder = new TcpDiscoveryVmIpFinder();
            finder.setAddresses(Arrays.asList("127.0.0.1"));
            spi.setIpFinder(finder);
    
            ignCfg.setDiscoverySpi(spi);
    
            Ignite ignite = Ignition.start(ignCfg);
    
            CacheConfiguration cacheCfg = getCacheConfiguration();
    
            final IgniteCache cache = ignite.getOrCreateCache(cacheCfg);
    
            cache.put(new A(1, 2), new AVal(100));
            cache.put(new B(1, 2), new BVal(100));
    
            FieldsQueryCursor cursor = cache.query(new SqlFieldsQuery("SELECT * " +
                "FROM AVal " +
                "INNER JOIN BVal ON AVal.a1=BVal.b1 AND AVal.a2=BVal.b2;"));
    
            for (Object o : cursor) {
                System.out.println(o);
            }
        }
    
        static class A {
            @AffinityKeyMapped
            @QuerySqlField
            int a1;
            @QuerySqlField
            int a2;
    
            public A(int a1, int a2) {
                this.a1 = a1;
                this.a2 = a2;
            }
        }
    
        static class AVal {
            @QuerySqlField
            int aV;
    
            public AVal(int aV) {
                this.aV = aV;
            }
        }
    
        static class B {
            @AffinityKeyMapped
            @QuerySqlField
            int b1;
            @QuerySqlField
            int b2;
    
            public B(int b1, int b2) {
                this.b1 = b1;
                this.b2 = b2;
            }
        }
    
        static class BVal {
            @QuerySqlField
            int bV;
    
            public BVal(int bV) {
                this.bV = bV;
            }
        }
    
        @NotNull private static CacheConfiguration<Integer, Integer> getCacheConfiguration() {
            CacheConfiguration<Integer, Integer> cfg = new CacheConfiguration<>();
    
            cfg.setAtomicityMode(CacheAtomicityMode.TRANSACTIONAL);
            cfg.setName(DEFAULT_CACHE_NAME);
    
            cfg.setIndexedTypes(A.class, AVal.class, B.class, BVal.class);
    
            return cfg;
        }
    }