Search code examples
javaaccumulogeomesa

Using GeoMesa Native API to insert data in accumulo


I am trying to insert and read data from accumulo storage using GeoMesa Native API. I have created a class file to use geomesa accumulo storage natively. Here is my java code :

package org.locationtech.geomesa.api;

import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.vividsolutions.jts.geom.Coordinate;
import com.vividsolutions.jts.geom.Geometry;
import com.vividsolutions.jts.geom.GeometryFactory;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.mock.MockInstance;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.security.Authorizations;
import org.geotools.factory.CommonFactoryFinder;
import org.geotools.feature.AttributeTypeBuilder;
import org.geotools.geometry.jts.JTSFactoryFinder;
import org.junit.Assert;
import org.junit.Test;
import org.locationtech.geomesa.accumulo.data.AccumuloDataStore;
import org.locationtech.geomesa.accumulo.index.AccumuloFeatureIndex;
import org.locationtech.geomesa.accumulo.index.AccumuloFeatureIndex$;
import org.locationtech.geomesa.utils.index.IndexMode$;
import org.opengis.feature.simple.SimpleFeature;
import org.opengis.feature.type.AttributeDescriptor;
import org.opengis.filter.FilterFactory2;

import javax.annotation.Nullable;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;

public class WorkerBeta {
    public static void main(String[] args){
        try {
            DomainObjectValueSerializer dovs = new DomainObjectValueSerializer();
            final GeoMesaIndex<DomainObject> index = AccumuloGeoMesaIndex.buildWithView(
            "aj_v14",
            "localhost:2181",
            "hps",
            "root", "9869547580",
            false,
            dovs,
            new SimpleFeatureView<DomainObject>() {
              AttributeTypeBuilder atb = new AttributeTypeBuilder();
              private List<AttributeDescriptor> attributeDescriptors =
                Lists.newArrayList(atb.binding(Integer.class).buildDescriptor("rId")
                    , atb.binding(String.class).buildDescriptor("dId")
                    , atb.binding(Integer.class).buildDescriptor("s")
                    , atb.binding(Integer.class).buildDescriptor("a")
                    , atb.binding(Integer.class).buildDescriptor("e")
                );
              @Override
              public void populate(SimpleFeature f, DomainObject domainObject, String id, byte[] payload, Geometry geom, Date dtg) {
                f.setAttribute("rId", domainObject.rideId);
                f.setAttribute("dId", domainObject.deviceId);
                f.setAttribute("s", domainObject.speed);
                f.setAttribute("a", domainObject.angle);
                f.setAttribute("e", domainObject.error);
              }

              @Override
              public List<AttributeDescriptor> getExtraAttributes() {
                return attributeDescriptors;
              }
            }
        );

        //Inserting 
        final DomainObject one = new DomainObject(1, "AJJASsP", 12, 40, 1);
        final GeometryFactory gf = JTSFactoryFinder.getGeometryFactory();
        System.out.println(index.insert(
                one,
                gf.createPoint(new Coordinate(-74.0, 34.0)),
                date("2017-03-31T01:15:00.000Z")
            ));

            //Read 
            GeoMesaQuery q = GeoMesaQuery.GeoMesaQueryBuilder.builder()
                .within(-90.0, -180, 90, 180)
                .during(date("2017-01-01T00:00:00.000Z"), date("2017-04-01T00:00:00.000Z"))
                .build();
            Iterable<DomainObject> results = index.query(q);
            int counter = 0;
            for(DomainObject dm : results){
                counter += 1;
                System.out.println("result counter: " + counter);
                dovs.toBytes(dm);
            }
        }
        catch (Exception ex){
      ex.printStackTrace();
        }
    }
    public static class DomainObject {
      public final int rideId;
      public final String deviceId;
      public final int angle;
      public final int speed;
      public final int error;

      public DomainObject(int rideId, String deviceId, int angle, int speed, int error) {
          this.rideId = rideId;
          this.deviceId = deviceId;
          this.angle = angle;
          this.speed = speed;
          this.error = error;
      }
    }
    public static class DomainObjectValueSerializer implements ValueSerializer<DomainObject> {
        public static final Gson gson = new Gson();
        @Override
        public byte[] toBytes(DomainObject o) {
            return gson.toJson(o).getBytes();
        }
        @Override
        public DomainObject fromBytes(byte[] bytes) {
            return gson.fromJson(new String(bytes), DomainObject.class);
        }
    }
    public static Date date(String s) {
        return Date.from(ZonedDateTime.parse(s).toInstant());
    }
}

Logs for command is :

suresh@hpss-MacBook-Air:~/GeomesaAccumuloNativeClient $ java -cp target/geomesa-native-api_2.11-1.3.2-SNAPSHOT.jar org.locationtech.geomesa.api.WorkerBeta
WARNING: org.apache.hadoop.metrics.jvm.EventCounter is deprecated. Please use org.apache.hadoop.log.metrics.EventCounter in all the log4j.properties files.
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:zookeeper.version=3.4.6-1569965, built on 02/20/2014 09:09 GMT
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:host.name=192.168.1.103
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:java.version=1.8.0_121
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:java.vendor=Oracle Corporation
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:java.home=/Library/Java/JavaVirtualMachines/jdk1.8.0_121.jdk/Contents/Home/jre
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:java.class.path=target/geomesa-native-api_2.11-1.3.2-SNAPSHOT.jar
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:java.library.path=/Users/suresh/Library/Java/Extensions:/Library/Java/Extensions:/Network/Library/Java/Extensions:/System/Library/Java/Extensions:/usr/lib/java:.
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:java.io.tmpdir=/var/folders/yk/h858t8h57nz42t6t4nqmwhcc0000gp/T/
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:java.compiler=<NA>
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:os.name=Mac OS X
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:os.arch=x86_64
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:os.version=10.12.3
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:user.name=suresh
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:user.home=/Users/suresh
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Client environment:user.dir=/Users/suresh/GeomesaAccumuloNativeClient
17/04/01 15:11:48 INFO zookeeper.ZooKeeper: Initiating client connection, connectString=localhost:2181 sessionTimeout=30000 watcher=org.apache.accumulo.fate.zookeeper.ZooSession$ZooWatcher@73eb439a
17/04/01 15:11:48 INFO zookeeper.ClientCnxn: Opening socket connection to server localhost/0:0:0:0:0:0:0:1:2181. Will not attempt to authenticate using SASL (unknown error)
17/04/01 15:11:48 INFO zookeeper.ClientCnxn: Socket connection established to localhost/0:0:0:0:0:0:0:1:2181, initiating session
17/04/01 15:11:48 INFO zookeeper.ClientCnxn: Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2181, sessionid = 0x15aea0c41f601a1, negotiated timeout = 30000
17/04/01 15:11:52 WARN data.AccumuloDataStore: Configured server-side iterators do not match client version - client version: 1.3.2-SNAPSHOT, server version: 1.3.0
50fa12fb-11f8-4776-bb35-95b32da9225d
[]

But when i try to verify the inserted record, I am unable to find any specific entries in created related to data inserted in Tables of accumulo web interface. Here is the screen shot for the accumulo tablesenter image description here. Please correct me if I am missing anything. Tons of thanks in advance.


Solution

  • Likely your insert is not getting flushed to disk. Accumulo uses a batch writer for performance - this will periodically write to disk once its internal buffer is filled. Since you are only inserting a single record, this isn't happening. To fix, you can call close on your GeoMesaIndex instance. This will flush any existing records to disk. You would then need to instantiate a new instance to do your query.