Search code examples
javamemcachedspymemcached

How to add memcached nodes dynamically with spymemcached


I have a Java application setup which has multiple memcached server nodes communicating with a spymemcached client.

I want to know if it is possible to add or remove server nodes at runtime, without disturbing all the existing cache nodes (I know some nodes should be changed).

Here is what i know (or understand):

It is possible to set custom hashing algorithm in DefaultConnectionFactory, which helps us to use Consistent Hashing or even use the built in KetamaConnectionFactory.

So we should be able to add or remove nodes with changes to just one or few existing nodes.

Is it possible using spymemcached?

if it is, then how?

can anyone please point me in the right direction?


Solution

  • It looks like NodeLocator.updateLocator(List<MemcachedNode> newNodes) should do the job.

    But to get connected MemcachedNode is a bit hard. You have to override MemcachedClient, MemcachedConnection and DefaultConnectionFactory.

    It reasonable you would like to add or remove clients from MemcachedClient so you add remove(MemcachedNode node) and add(MemcachedNode node) methods.

    In case of remove you should disconnect the node (see MemcachedConnection.shutdown()) and remove it from NodeLocator.getAll() and call NodeLocator.updateLocator(List<MemcachedNode> newNodes).

    In case of add you should connect the node via MemcachedConnection.createConnections(final Collection a), merge it with NodeLocator.getAll() and call NodeLocator.updateLocator(List<MemcachedNode> newNodes).

    Well, I have never try it so it is possible it won't work. So good luck!

    ExtMemCachedConnection.java public class ExtMemCachedConnection extends MemcachedConnection {

      protected final OperationFactory opFact;
    
      /**
       * Construct a memcached connection.
       *
       * @param bufSize the size of the buffer used for reading from the server
       * @param f       the factory that will provide an operation queue
       * @param a       the addresses of the servers to connect to
       * @throws java.io.IOException if a connection attempt fails early
       */
      public ExtendableMemcachedConnection(int bufSize, ConnectionFactory f,
                                           List<InetSocketAddress> a,
                                           Collection<ConnectionObserver> obs,
                                           FailureMode fm, OperationFactory opfactory)
          throws IOException {
        super(bufSize, f, a, obs, fm, opfactory);
        this.opFact = opfactory;
      }
    
      public void add(InetSocketAddress nodeAddress) throws IOException {
        final List<InetSocketAddress> nodeToAdd = new ArrayList<InetSocketAddress>(1);
        nodeToAdd.add(nodeAddress);
        List<MemcachedNode> newNodesList = createConnections(nodeToAdd);
        newNodesList.addAll(getLocator().getAll());
        getLocator().updateLocator(newNodesList);
      }
    
      //The node should be obtain from locator to ensure currentNode.equals(node) will return true
      public void remove(MemcachedNode node) throws IOException {
        for(MemcachedNode currentNode : getLocator().getAll()) {
          if(currentNode.equals(node)) {
            Collection<Operation> notCompletedOperations = currentNode.destroyInputQueue();
            if (currentNode.getChannel() != null) {
              currentNode.getChannel().close();
              currentNode.setSk(null);
              if (currentNode.getBytesRemainingToWrite() > 0) {
                getLogger().warn("Shut down with %d bytes remaining to write",
                                 currentNode.getBytesRemainingToWrite());
              }
              getLogger().debug("Shut down channel %s", currentNode.getChannel());
            }
            //Unfortunatelly,  redistributeOperations is private so it cannot be used or override. I put copy/paste the implementation
            redistributeOperations(notCompletedOperations);
          }
        }
      }
    
      protected void redistributeOperations(Collection<Operation> ops) {
        for (Operation op : ops) {
          if (op.isCancelled() || op.isTimedOut()) {
            continue;
          }
          if (op instanceof KeyedOperation) {
            KeyedOperation ko = (KeyedOperation) op;
            int added = 0;
            for (String k : ko.getKeys()) {
              for (Operation newop : opFact.clone(ko)) {
                addOperation(k, newop);
                added++;
              }
            }
            assert added > 0 : "Didn't add any new operations when redistributing";
          } else {
            // Cancel things that don't have definite targets.
            op.cancel();
          }
        }
      }
    
    
    }
    

    ExtMemcachedClient.java

      public void add(InetSocketAddress nodeAddress) {
        if(mconn instanceof ExtMemcachedConnection) {
          ((ExtMemcachedConnection)mconn).add(nodeAddress);  
        }
      }
    
      public boolean remove(MemcachedNode node) {
        if(mconn instanceof ExtMemcachedConnection) {
          ((ExtMemcachedConnection)mconn).remove(nodeAddress);
        }
      }
    

    ExtMemcachedConnectionfactory.java

      @Override
      public MemcachedConnection createConnection(List<InetSocketAddress> addrs) throws IOException {
        return new ExtendableMemcachedConnection(getReadBufSize(), this, addrs,
                                                 getInitialObservers(), getFailureMode(), getOperationFactory());
      }