Search code examples
csvmarklogicjavamarklogic-corb

Create a CSV file from marklogic using Java Client Api(DMSDK)


I want to create a csv file for 1.3M records from my marklogic db . I tried using CORB for that but it had taken more time than i expected. My data is like this

{
"One": {
"Name": "One",
"Country": "US"
}, 
"Two": {
"State": "kentucky"
}, 
"Three": {
"Element1": "value1", 
"Element2": "value2", 
"Element3": "value3", 
"Element4": "value4",
so on ...
}
}

Below are the my Corb modules

Selector.xqy

var total = cts.uris("", null, cts.collectionQuery("data"));
fn.insertBefore(total,0,fn.count(total))

Transform.xqy(Where i am keeping all the elements in an array )

var name = fn.tokenize(URI, ";");
const node = cts.doc(name);
var a= node.xpath("/One/*");
var b= node.xpath("/Two/*");
var c= node.xpath("/Three/*");
fn.stringJoin([a, b, c,name], " , ")

my properties file

THREAD-COUNT=16
BATCH-SIZE=1000
URIS-MODULE=selector.sjs|ADHOC
PROCESS-MODULE=transform.sjs|ADHOC
PROCESS-TASK=com.marklogic.developer.corb.ExportBatchToFileTask
EXPORT-FILE-NAME=Report.csv
PRE-BATCH-TASK=com.marklogic.developer.corb.PreBatchUpdateFileTask
EXPORT-FILE-TOP-CONTENT=Col1,col2,....col16 -- i have 16 columns 

It had taken more than 1 hour for creating a csv file . And also for trying in cluster i need to configure a load balancer first. Whereas Java Client api will distribute the work among all nodes without any load balancer.

How can i implement the same in Java Client APi , i know i can trigger transform module using ServerTransform and ApplyTransformListener .

public static void main(String[] args) {
  // TODO Auto-generated method stub

  DatabaseClient client = DatabaseClientFactory.newClient
            ("localhost", pwd, "x", "x",  DatabaseClientFactory.Authentication.DIGEST);

  ServerTransform txform = new ServerTransform("tsm"); -- Here i am implementing same logic of above `tranform module` .

  QueryManager qm = client.newQueryManager();
  StructuredQueryBuilder query = qm.newStructuredQueryBuilder();
  query.collection();

  DataMovementManager dmm = client.newDataMovementManager();
  QueryBatcher batcher = dmm.newQueryBatcher(query.collections("data"));
  batcher.withBatchSize(2000)
         .withThreadCount(16)
         .withConsistentSnapshot()
         .onUrisReady(
           new ApplyTransformListener().withTransform(txform))
         .onBatchSuccess(batch-> {
                   System.out.println(
                       batch.getTimestamp().getTime() +
                       " documents written: " +
                       batch.getJobWritesSoFar());
         })
         .onBatchFailure((batch,throwable) -> {
           throwable.printStackTrace();
         });

  // start the job and feed input to the batcher
  dmm.startJob(batcher);

  batcher.awaitCompletion();
  dmm.stopJob(batcher);
  client.release();
}

But how can i send the csv file header like that one in CORB(i.e. EXPORT-FILE-TOP-CONTENT) . Is there any documentation for implementing CSV file ? Which class will implement that ?

Any help is appreciated

Thanks


Solution

  • Probably the easiest option is ml-gradle Exporting data to CSV which uses Java Client API and DMSDK under the hood.

    Note that you'll probably want to install a server-side REST transform to extract only the data you want in the CSV output, rather than download the entire doc contents then extract on the Java side.

    For a working example of the code required to use DMSDK and create an aggregate CSV (one CSV for all records), see ExporToWriterListenerTest.testMassExportToWriter. For the sake of SO, here's the key code snippet (with a couple a minor simplification changes, including writing column headers (untested code)):

    try (FileWriter writer = new FileWriter(outputFile)) {
      writer.write("uri,collection,contents");
      writer.flush();
      ExportToWriterListener exportListener = new ExportToWriterListener(writer)
        .withRecordSuffix("\n")
        .withMetadataCategory(DocumentManager.Metadata.COLLECTIONS)
        .onGenerateOutput(
          record -> {
            String uri = record.getUri();
            String collection = record.getMetadata(new DocumentMetadataHandle()).getCollections().iterator().next();
            String contents = record.getContentAs(String.class);
            return uri + "," + collection + "," + contents;
          }
        );
    
      QueryBatcher queryJob =
        moveMgr.newQueryBatcher(query)
          .withThreadCount(5)
          .withBatchSize(10)
          .onUrisReady(exportListener)
          .onQueryFailure( throwable -> throwable.printStackTrace() );
      moveMgr.startJob( queryJob );
      queryJob.awaitCompletion();
      moveMgr.stopJob( queryJob );
    }
    

    However, unless you know your content has no double quotes, newlines, or non-ascii characters, a CSV library is recommended to make sure your output is properly escaped. To use a CSV library, you can of course use any tutorial out there for your library. You don't need to worry about thread safety because ExportToWriterListener runs your listeners in a synchronized block to prevent overlapping writes to the writer. Here's an example of using one CSV library, Jackson CsvMapper.

    Please note that you don't have to use ExportToWriterListener . . . you can use it as a starting point to write your own listener. In particular, since your major concern is performance, you may want to have your listeners write to one file per thread, then post-process to combine things together. It's up to you.