Search code examples
spring-bootgemfirespring-data-gemfire

Updating individual column in Pivotal GemFire


As per my knowledge, there is no option to update individual columns using a query in gemfire. To update an individual column I am currently getting the entire old object and modifying the changed value and storing it. If anyone has implemented anything on updating individual columns, please share.

@Region("tracking")
public class Tracking implements Serializable {
public String id;
public String status;
public String program;
}



@Region("tracking")
public interface TrackingQueryRepository extends CrudRepository<Tracking, String> {
}

I am new to the implementation of the Delta Propagation. I have read the user guide and have tried to implement and received the exception given below. Can you please share your thoughts about this.

Another.java – Domain class

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;
import org.springframework.data.annotation.Id;
import org.springframework.data.gemfire.mapping.Region;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.gemstone.gemfire.Delta;
import com.gemstone.gemfire.InvalidDeltaException;


@Region("delta")
public class Another implements Delta, Serializable {

    private static final long serialVersionUID = 1L;

    @Id
    private String anotherId;

    @JsonProperty("anotherProgramId")
    private String anotherProgramId;

    public Another() {
    }

    public Another(String anotherId, String anotherProgramId) {
        this.anotherId = anotherId;
        this.anotherProgramId = anotherProgramId;
    }

    public String getAnotherId() {
        return anotherId;
    }

    public void setAnotherId(String anotherId) {
        this.anotherIdChd = true;
        this.anotherId = anotherId;
    }

    public String getAnotherProgramId() {
        return anotherProgramId;
    }

    public void setAnotherProgramId(String anotherProgramId) {
        this.anotherProgramIdChd = true;
        this.anotherProgramId = anotherProgramId;
    }

    private transient boolean anotherIdChd = false;
    private transient boolean anotherProgramIdChd = false;

    @Override
    public String toString() {
        return "Another [anotherId=" + anotherId + ", anotherProgramId=" + anotherProgramId + "]";
    }

    @Override
    public void fromDelta(DataInput in) throws IOException, InvalidDeltaException {

        if (in.readBoolean()) {
            // Read the change and apply it to the object
            this.anotherId = in.toString();
            System.out.println(" Applied delta to field 'anotherId' = " + this.anotherId);
        }
        if (in.readBoolean()) {
            this.anotherProgramId = in.toString();
            System.out.println(" Applied delta to field 'anotherProgramId' = " + this.anotherProgramId);
        }
    }

    @Override
    public boolean hasDelta() {
        return this.anotherIdChd || this.anotherProgramIdChd;

    }

    @Override
    public void toDelta(DataOutput out) throws IOException {
        System.out.println("Extracting delta from " + this.toString());
        out.writeBoolean(anotherIdChd);
        if (anotherIdChd) {
            // Write just the changes into the data stream

            out.writeUTF(this.anotherId);
            // Once the delta information is written, reset the delta status
            // field
            this.anotherIdChd = false;
            System.out.println(" Extracted delta from field 'anotherId' = " + this.anotherId);
        }
        out.writeBoolean(anotherProgramIdChd);
        if (anotherProgramIdChd) {
            out.writeUTF(this.anotherProgramId);
            this.anotherProgramIdChd = false;
            System.out.println(" Extracted delta from field 'anotherProgramId' = " + this.anotherProgramId);
        }

    }

}

Client-cache.xml

<pdx>
        <pdx-serializer>
            <class-name>com.gemstone.gemfire.pdx.ReflectionBasedAutoSerializer</class-name>
            <parameter name="classes">
                <string>com\.rs\.main\..+</string>
            </parameter>
        </pdx-serializer>
    </pdx>

Spring XML Namespace

<util:properties id="gemfire-props">
<prop key="delta-propagation">true</prop>
</util:properties>
<gfe:client-cache pool-name="serverPool" cache-xml-location="classpath:client-cache.xml" properties-ref="gemfire-props"/>
<gfe:client-region id="delta" pool-name="serverPool" shortcut="PROXY" cloning-enabled="true">

Local gemfire instance version - pivotal-gemfire-9.0.1

Region Creation Create region –name=delta –type=REPLICATE

Exception:

2017-05-08 22:17:12.370 ERROR 14696 --- [nio-8080-exec-1] o.a.c.c.C.[.[.[/].[dispatcherServlet]    : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.springframework.dao.DataAccessResourceFailureException: remote server on 10.148.210.249(:loner):53784:e10627eb: com.gemstone.gemfire.pdx.PdxSerializationException: Could not create an instance of a class com.rs.main.Another; nested exception is com.gemstone.gemfire.cache.client.ServerOperationException: remote server on 10.148.210.249(:loner):53784:e10627eb: com.gemstone.gemfire.pdx.PdxSerializationException: Could not create an instance of a class com.rs.main.Another] with root cause

java.lang.ClassNotFoundException: com.rs.main.Another
    at org.apache.geode.internal.ClassPathLoader.forName(ClassPathLoader.java:437) ~[na:na]
    at org.apache.geode.internal.InternalDataSerializer.getCachedClass(InternalDataSerializer.java:4010) ~[na:na]
    at org.apache.geode.pdx.internal.PdxType.getPdxClass(PdxType.java:235) ~[na:na]
    at org.apache.geode.pdx.internal.PdxReaderImpl.basicGetObject(PdxReaderImpl.java:687) ~[na:na]
    at org.apache.geode.pdx.internal.PdxReaderImpl.getObject(PdxReaderImpl.java:682) ~[na:na]
    at org.apache.geode.internal.InternalDataSerializer.readPdxSerializable(InternalDataSerializer.java:3218) ~[na:na]
    at org.apache.geode.internal.InternalDataSerializer.basicReadObject(InternalDataSerializer.java:3005) ~[na:na]
    at org.apache.geode.DataSerializer.readObject(DataSerializer.java:2897) ~[na:na]
    at org.apache.geode.internal.util.BlobHelper.deserializeBlob(BlobHelper.java:90) ~[na:na]
    at org.apache.geode.internal.cache.EntryEventImpl.deserialize(EntryEventImpl.java:1891) ~[na:na]
    at org.apache.geode.internal.cache.EntryEventImpl.deserialize(EntryEventImpl.java:1884) ~[na:na]
    at org.apache.geode.internal.cache.VMCachedDeserializable.getDeserializedValue(VMCachedDeserializable.java:134) ~[na:na]
    at org.apache.geode.internal.cache.EntryEventImpl.processDeltaBytes(EntryEventImpl.java:1687) ~[na:na]
    at org.apache.geode.internal.cache.EntryEventImpl.setNewValueInRegion(EntryEventImpl.java:1558) ~[na:na]
    at org.apache.geode.internal.cache.EntryEventImpl.putExistingEntry(EntryEventImpl.java:1504) ~[na:na]
    at org.apache.geode.internal.cache.AbstractRegionMap.updateEntry(AbstractRegionMap.java:2959) ~[na:na]
    at org.apache.geode.internal.cache.AbstractRegionMap.basicPut(AbstractRegionMap.java:2782) ~[na:na]
    at org.apache.geode.internal.cache.LocalRegion.virtualPut(LocalRegion.java:5750) ~[na:na]
    at org.apache.geode.internal.cache.DistributedRegion.virtualPut(DistributedRegion.java:337) ~[na:na]
    at org.apache.geode.internal.cache.LocalRegionDataView.putEntry(LocalRegionDataView.java:151) ~[na:na]
    at org.apache.geode.internal.cache.LocalRegion.basicUpdate(LocalRegion.java:5730) ~[na:na]
    at org.apache.geode.internal.cache.LocalRegion.basicBridgePut(LocalRegion.java:5374) ~[na:na]
    at org.apache.geode.internal.cache.tier.sockets.command.Put65.cmdExecute(Put65.java:381) ~[na:na]
    at org.apache.geode.internal.cache.tier.sockets.BaseCommand.execute(BaseCommand.java:141) ~[na:na]
    at org.apache.geode.internal.cache.tier.sockets.ServerConnection.doNormalMsg(ServerConnection.java:776) ~[na:na]
    at org.apache.geode.internal.cache.tier.sockets.ServerConnection.doOneMessage(ServerConnection.java:904) ~[na:na]
    at org.apache.geode.internal.cache.tier.sockets.ServerConnection.run(ServerConnection.java:1160) ~[na:na]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0_121]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0_121]
    at org.apache.geode.internal.cache.tier.sockets.AcceptorImpl$1$1.run(AcceptorImpl.java:519) ~[na:na]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_121]

Solution

  • Hi (again) Vigneshwaran-

    Right, so GemFire's Query capabilities (via the QueryService) is strictly for running queries (i.e. SELECT statements). There is no equivalent in GemFire OQL for UPDATES and DELETES. GemFire is a Key/Value store with Map-like operations (e.g. get(key), put(key, value), etc), where you are generally working with the entire application domain object. However, there are a few features of GemFire that can help you regardless whether your application is a peer cache (i.e. member of the cluster) or cache client. Typically, applications are cache clients and have/use a ClientCache where the cluster is standalone and clients connect to the cluster much like a RDBMS.

    I would also say, that while the Function service is useful, it is not the only option and could actually be more overhead code-wise.

    It is very typical, as Wes mentions above, to use a PARTITION Region, especially for "transactional" data (NOTE: REPLICATE Regions are more applicable for reference data that changes infrequently).

    Where the "Function" can help you is, you would code the Function to take the update to the application domain object. The "update" could be passed in the Function's "arguments". To invoke a Function, you use the GemFire's FunctionService to get an Execution, using one of targeted methods (e.g. [onRegion("tracking")][7]).

    NOTE: the other targeting methods (namely, onMember(s) and onServer(s)) are specific to whether your application is a "peer" or "client", respectively. For instance, you cannot call onMember(s) if your application is a client as it assumes your application is a "peer". Likewise, you cannot call onServer(s) if your application is a peer as it assumes your application is a "client". onRegion(..) works whether the application is a peer or a client. While you might think why not use onRegion all the time, there are technical advantages to using the other forms of targeting depending on your UC (e.g. think server groups and routing). Anyway...

    When the Region is a PARTITION, you can also set the Function's [optimizeForWrite()][8], which implies the Function will be updating Region data, and therefore, will be routed to the PARTITION's primary bucket for the key, when the key is specified using the filtering option as described by Wes above.

    A PARTITION Region's consistency comes from the fact that all updates are routed and written to the "primary" first (regardless of which server receives the client's update, which maybe a server that does not even host the Region or the data/key in question; i.e. different shard). After the primary is updated, thenthen the data changes are propagated (distributed) to the other nodes in the cluster hosting secondaries for the partition/sharded data set. This is the "transactional" consistency that Wes is referring to above.

    NOTE: PARTITION is just another word for SHARDING the data, where the data is spread evenly across the cluster of available nodes. When nodes are added/removed, the data is rebalanced. A PARTITION can also have redundancy. These are referred to as secondaries. PARTITION Regions help with latency and throughput since data is divided (into 113 Buckets by default), where each bucket has primary and maybe 1 or more copies (secondaries, for redundancy; HA) thereby improving both read and write throughput.

    Additionally, if the data must stick, then you can also set the Function's HA property. This will allow retries in the case of failures.

    However, despite all these advantages, you still have to deal with "how-to" update your application domain object in the Function on the server. You also have to deal with "mapping" since there really is no equivalent to ORM in Key/Value stores like GemFire. Of course, this is not that difficult, but perhaps there is a better way.

    There is another feature, called Delta Propagation. Essentially, you are always getting and updating the full value in GemFire whenever you are making updates.

    NOTE: it is possible to query select fields of an object, in sort of a projection like fashion, but it is not a proxy or related to the actual object in anyway.

    When you take advantage of GemFire's Serialization capabilities, you can leverage Delta Propagation.

    When implementing "Deltas", only the difference in the application domain object is actually serialized, sent over the wire, whether that is between client and server, or between peers when upholding the redundancy policies. This is completely seamless to you. You get your object (client-side), update it, and put it. GemFire handles the logic of sending the "delta" for you.

    Additionally, when using the client/server topology and PARTITION Regions on the servers in the cluster, you can enable Single-Hop access, which effectively routes the data to the server containing the "primary" bucket, thereby avoiding additional network hops, which will affect your perceived latency per operation.

    So, between Deltas and Single-Hop, you end up with a pretty performant solution and can still leverage the object-oriented approach, using your application domain object APIs as you would expect.

    Be mindful of the pitfalls of using Deltas, however.

    Anyway, food for thought. You generally always have more than 1 way to accomplish a task, but the better approach is not always apparent until you measure and assess the desired effect based on your UC/goal.

    Cheers, John