Search code examples
marklogicjava

Tried to implement Batch processing while invoking a transform module?


I want to batch process a set of documents using MarkLogic Java Client Api. I followed below documentation to invoke a JavaScript module.

   import com.marklogic.client.DatabaseClient;
import com.marklogic.client.DatabaseClientFactory;
import com.marklogic.client.datamovement.ApplyTransformListener;
import com.marklogic.client.datamovement.ApplyTransformListener.ApplyResult;
import com.marklogic.client.datamovement.DataMovementManager;
import com.marklogic.client.datamovement.JobTicket;
import com.marklogic.client.datamovement.QueryBatcher;
import com.marklogic.client.datamovement.WriteBatcher;
import com.marklogic.client.document.JSONDocumentManager;
import com.marklogic.client.document.ServerTransform;
import com.marklogic.client.io.DOMHandle;
import com.marklogic.client.query.StructuredQueryBuilder;

public class rest {

    public static void main(String[] args) {
        // TODO Auto-generated method stub
        DatabaseClient client = DatabaseClientFactory.newClient
                ("localhost", port, "x", "x",  DatabaseClientFactory.Authentication.DIGEST);

        final DataMovementManager manager = client.newDataMovementManager();

          // Build query
        final StructuredQueryBuilder query = client
          .newQueryManager()
          .newStructuredQueryBuilder();

        // Specify a server-side transformation module (stored procedure) by name
        ServerTransform transform = new ServerTransform("restone-tsm");
        ApplyTransformListener transformListener = new ApplyTransformListener()
          .withTransform(transform)
          .withApplyResult(ApplyResult.REPLACE) // Transform in-place, i.e. rewrite
          .onSuccess(batch -> {})
          .onSkipped(batch -> {})
          .onBatchFailure((batch, throwable) -> {});

        // Apply the transformation to only the documents that match a query.
        // In this case, those in the “raw” collection.
        final QueryBatcher batcher = manager
          .newQueryBatcher(query.collection("accounts"));
        batcher
        .withBatchSize(1000)
        .withThreadCount(16)
          .onUrisReady(transformListener)
          .onQueryFailure(exception -> exception.printStackTrace());
        final JobTicket ticket = manager.startJob(batcher);
        batcher.awaitCompletion();
        manager.stopJob(ticket);
        }
            }

As you suggested changed my transform module(i.e.restone-tsm)

   function harmonize(context, params, content)
{ 
  var transformed = {};
  transformed.Metadata = { "Source" : "International"};
  transformed.Canonical= {"Future" : "Element"};
  transformed.Source = content;
  xdmp.documentInsert(fn.concat("/transformed/", fn.baseUri(content)), transformed, {collections : "transform"});
};
exports.transform = harmonize;

It got executed successfully. But as you suggested in comment to apply cts.uris in the query batcher i checked for that function in StructuredQueryBuilder but didn't find any. But the above code worked fine.

Any help is appreciated

Thanks


Solution

  • Instead of a separate ServerEvaluationCall, use an ApplyTransformListener with your batcher, as described in Applying an In-Database Transformation:

    public static void main(String[] args) {
      // TODO Auto-generated method stub
    
      DatabaseClient client = DatabaseClientFactory.newClient
                ("localhost", pwd, "x", "x",  DatabaseClientFactory.Authentication.DIGEST);
    
      ServerTransform txform = new ServerTransform("tsm"); 
    
      QueryManager qm = client.newQueryManager();
      StructuredQueryBuilder query = qm.newStructuredQueryBuilder();
      query.collection();
    
      DataMovementManager dmm = client.newDataMovementManager();
      QueryBatcher batcher = dmm.newQueryBatcher(query);
      batcher.withBatchSize(5)
             .withThreadCount(3)
             .withConsistentSnapshot()
             .onUrisReady(
               new ApplyTransformListener().withTransform(txform))
             .onBatchSuccess(batch-> {
                       System.out.println(
                           batch.getTimestamp().getTime() +
                           " documents written: " +
                           batch.getJobWritesSoFar());
             })
             .onBatchFailure((batch,throwable) -> {
               throwable.printStackTrace();
             });
    
      // start the job and feed input to the batcher
      dmm.startJob(batcher);
    
      batcher.awaitCompletion();
      dmm.stopJob(batcher);
      client.release();
    }
    

    You need to ensure that your transform module has a function that implements the required interface and is exports it with the name transform, and is installed on the server.

    Adjust the logic in your transform not to perform the URIs query (that will be handled by the QueryBatcher), so that it expects to transform the content.

    function harmonize(context, params, content)
    { 
      var transformed = {};
      transformed.Metadata = { "Source" : "International"};
      transformed.Canonical= {"Future" : "Element"};
      transformed.Source = content;
      xdmp.documentInsert(fn.concat("/transformed", fn.baseUri(content)), transformed, {collections : "transform"});
    };
    exports.transform = harmonize;