Search code examples
javascalaaccumulo

Executing flush method, for sending mutations to accumulo without closing the writer


I am writing data to accumulo storage natively using Geomesa Native Client. Here is my java code

package org.locationtech.geomesa.api;

import com.google.common.base.Function;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.vividsolutions.jts.geom.Coordinate;
import com.vividsolutions.jts.geom.Geometry;
import com.vividsolutions.jts.geom.GeometryFactory;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.mock.MockInstance;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.security.Authorizations;
import org.geotools.factory.CommonFactoryFinder;
import org.geotools.feature.AttributeTypeBuilder;
import org.geotools.geometry.jts.JTSFactoryFinder;
import org.junit.Assert;
import org.junit.Test;
import org.locationtech.geomesa.accumulo.data.AccumuloDataStore;
import org.locationtech.geomesa.accumulo.index.AccumuloFeatureIndex;
import org.locationtech.geomesa.accumulo.index.AccumuloFeatureIndex$;
import org.locationtech.geomesa.utils.index.IndexMode$;
import org.opengis.feature.simple.SimpleFeature;
import org.opengis.feature.type.AttributeDescriptor;
import org.opengis.filter.FilterFactory2;

import javax.annotation.Nullable;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;

public class WorkerBeta {
    public static void main(String[] args){
        try {
            DomainObjectValueSerializer dovs = new DomainObjectValueSerializer();
            final GeoMesaIndex<DomainObject> index = AccumuloGeoMesaIndex.buildWithView(
            "aj_v14",
            "localhost:2181",
            "hps",
            "root", "9869547580",
            false,
            dovs,
            new SimpleFeatureView<DomainObject>() {
              AttributeTypeBuilder atb = new AttributeTypeBuilder();
              private List<AttributeDescriptor> attributeDescriptors =
                Lists.newArrayList(atb.binding(Integer.class).buildDescriptor("rId")
                    , atb.binding(String.class).buildDescriptor("dId")
                    , atb.binding(Integer.class).buildDescriptor("s")
                    , atb.binding(Integer.class).buildDescriptor("a")
                    , atb.binding(Integer.class).buildDescriptor("e")
                );
              @Override
              public void populate(SimpleFeature f, DomainObject domainObject, String id, byte[] payload, Geometry geom, Date dtg) {
                f.setAttribute("rId", domainObject.rideId);
                f.setAttribute("dId", domainObject.deviceId);
                f.setAttribute("s", domainObject.speed);
                f.setAttribute("a", domainObject.angle);
                f.setAttribute("e", domainObject.error);
              }

              @Override
              public List<AttributeDescriptor> getExtraAttributes() {
                return attributeDescriptors;
              }
            }
        );

        //Inserting 
        final DomainObject one = new DomainObject(1, "AJJASsP", 12, 40, 1);
        final GeometryFactory gf = JTSFactoryFinder.getGeometryFactory();
        System.out.println(index.insert(
                one,
                gf.createPoint(new Coordinate(-74.0, 34.0)),
                date("2017-03-31T01:15:00.000Z")
            ));

            //Read 
            GeoMesaQuery q = GeoMesaQuery.GeoMesaQueryBuilder.builder()
                .within(-90.0, -180, 90, 180)
                .during(date("2017-01-01T00:00:00.000Z"), date("2017-04-01T00:00:00.000Z"))
                .build();
            Iterable<DomainObject> results = index.query(q);
            int counter = 0;
            for(DomainObject dm : results){
                counter += 1;
                System.out.println("result counter: " + counter);
                dovs.toBytes(dm);
            }
        }
        catch (Exception ex){
          ex.printStackTrace();
        }
        index.close();
    }
    public static class DomainObject {
      public final int rideId;
      public final String deviceId;
      public final int angle;
      public final int speed;
      public final int error;

      public DomainObject(int rideId, String deviceId, int angle, int speed, int error) {
          this.rideId = rideId;
          this.deviceId = deviceId;
          this.angle = angle;
          this.speed = speed;
          this.error = error;
      }
    }
    public static class DomainObjectValueSerializer implements ValueSerializer<DomainObject> {
        public static final Gson gson = new Gson();
        @Override
        public byte[] toBytes(DomainObject o) {
            return gson.toJson(o).getBytes();
        }
        @Override
        public DomainObject fromBytes(byte[] bytes) {
            return gson.fromJson(new String(bytes), DomainObject.class);
        }
    }
    public static Date date(String s) {
        return Date.from(ZonedDateTime.parse(s).toInstant());
    }
}

The problem with this code is, I need to create index object every time for a new insert request and call index.close() to reflect the same. But I can't execute insert() agin, once index.close() is called. However i will be accepting insert request from queue at very high rate and I don't want to create index object every time. How can i do that?

In short how i can flush writes without calling close().


Solution

  • I created geomesa Client class file to use geomesa natively. Below is the partial implementation of the same which shows how you can flush with AccumuloAppendFeatureWriter without calling to close.

    public class GeomesaClient {
      private AccumuloDataStore ds = null;
      private AccumuloAppendFeatureWriter fw = null;
      private SimpleFeatureSource sfs = null;
      private String tableName = "";
      private FeatureStore fst = null;
      private SimpleFeatureType sft;
    
      public GeomesaClient(Map<String, String> dsConf) throws Exception {
        this.ds = (AccumuloDataStore) DataStoreFinder.getDataStore(dsConf);
        this.tableName = dsConf.get("tableName");
    
        sft = createFeatureType();
        if(!Arrays.asList(this.ds.getTypeNames()).contains(sft.getTypeName())){
          ds.createSchema(sft);
        }
        this.fst = (FeatureStore)sfs;
        this.fw = (AccumuloAppendFeatureWriter) (this.ds.getFeatureWriterAppend(sft.getTypeName(),
            Transaction.AUTO_COMMIT));
        this.sfs = ds.getFeatureSource(sft.getTypeName());
      }
      /*
            Flush with AccumuloAppendFeatureWriter
      */
      public void flush(boolean force) {
        fw.flush();
      }
    }