Search code examples
transactionsxquerymarklogic

marklogic delete > insert > cpf action on new document


see UPDATE below!

I have the following issue: We are collecting (millions) of documents (tweets) into ML and on insert we have a cpf job that creates metadata for each document. More precise it adds a geotag based on location(if location or coordinates are present).

Now we have a database that has been collecting tweets without the geotagger active. We would like to process the stored tweets with this cpf job by deleting and re-inserting each document that does not yet have a proper metadata geotag element. Then cpf does it's job and geotaggs the "new" document.

We have written the following code to delete and insert the documents but we get a XDMP-CONFLICTUPDATES error. I have been reading about transactions and tried several things, the ";" trick. wrapping in xdmp:eval or splitting up the delete and insert in two separate function calls from xdmp:spawn.

Still no luck.

spawn-rename.xqy

xquery version "1.0-ml";

declare namespace j = "http://marklogic.com/xdmp/json/basic";
declare variable $to_process external;

declare function local:document-rename(
   $old-uri as xs:string, $new-uri as xs:string)
  as empty-sequence()
{
    (:xdmp:set-transaction-mode("update"),:)
    xdmp:eval(xdmp:document-delete($old-uri)),
    (:xdmp:commit():)

    let $permissions := xdmp:document-get-permissions($old-uri)
    let $collections := xdmp:document-get-collections($old-uri)
    return xdmp:document-insert(
      $new-uri, doc($old-uri),
      if ($permissions) then $permissions
      else xdmp:default-permissions(),
      if ($collections) then $collections
      else xdmp:default-collections(),
      xdmp:document-get-quality($old-uri)
    )
};

for $d in map:keys($to_process)
let $rename := local:document-rename($d, map:get($to_process,$d))
return true()

and to run the job for a specific set of documents we use:

xquery version "1.0-ml";
declare namespace j = "http://marklogic.com/xdmp/json/basic";
declare namespace dikw = 'http://www.example.com/dikw_functions.xqy';
import module namespace json = "http://marklogic.com/xdmp/json" at "/MarkLogic/json/json.xqy";

let $foo := cts:uris((),(), cts:not-query(cts:element-query(xs:QName("j:dikwmetadata"), cts:element-query(xs:QName("j:data"), cts:and-query(())))))
let $items := cts:uri-match("/twitter/403580066367815680.json") (:any valid uri or set of uris:)

let $map := map:map()

    let $f := doc($items[1])
    let $id := $f/j:json/j:id/text()
    let $oldUri := xdmp:node-uri($f)
    let $newUri := fn:concat("/twitter/", $f/j:json/j:id/text(), ".json")
    let $put := map:put($map,$oldUri,$newUri)

    let $spawn := xdmp:spawn("/Modules/DIKW/spawn-rename-split.xqy", (xs:QName("to_process"), $map))

return ($oldUri, " - ", $newUri) 

Question:

How can I set up the code so that it deleted the documents in the map first in a separate transaction and inserts them back later so cpf can do it's geotagging?


UPDATE

Ok so per grtjn his comments (thx so far!) I try to rewrite my code like :

xquery version "1.0-ml";
declare namespace j = "http://marklogic.com/xdmp/json/basic";

let $entries := cts:uri-match("//twitter/*")
let $entry-count := fn:count($entries)

let $transaction-size := 100 (: batch size $max :)
let $total-transactions := ceiling($entry-count div $transaction-size)

(: set total documents and total transactions so UI displays collecting :)
(: skip 84 85
let $set-total := infodev:ticket-set-total-documents($ticket-id, $entry-count)
let $set-trans := infodev:ticket-set-total-transactions($ticket-id,$total-transactions)
:)
    (: create transactions by breaking document set into maps
each maps's documents are saved to the db in their own transaction :)
let $transactions :=
    for $i at $index in 1 to $total-transactions
    let $map := map:map()
    let $start := (($i -1) *$transaction-size) + 1
    let $finish := min((($start - 1 + $transaction-size),$entry-count))
    let $put :=
        for $entry in ($entries)[$start to $finish]
        (: 96
        let $id := fn:concat(fn:string($entry/atom:id),".xml")
        :)
        let $id := fn:doc($entry)/j:json/j:id/text()
        return map:put($map,$id,$entry)
    return $map

(: the callback function for ingest 
skip 101 let $function := xdmp:function(xs:QName("feed:process-file"))
:)
let $ingestion :=
    for $transaction at $index in $transactions
    return true()
    return $ingestion (: this second return statement seems odd? :)
    (: do spawn here? :)
    (: xdmp:spawn("/modules/spawn-move.xqy", (xs:QName("to_process"), $map)) :)

Now I am puzzled, to get this 'working' I needed to add the last return which seems not right. Also I am trying to figure out what exactly happens, If I run the query as is it returns with a timeout error. I would like to first understand what the transaction actually does. Sorry for my ignorance but seems that performing a (relatively simple) task as renaming some docs looks not that simple?

for completeness my spawn-move.qry here:

xquery version "1.0-ml";

declare namespace j = "http://marklogic.com/xdmp/json/basic";
declare variable $to_process external;


declare function local:document-move(
   $id as xs:string, $doc as xs:string)
  as empty-sequence()
{
    let $newUri := fn:concat("/twitter/", $id, ".json")
    let $ins := xdmp:document-insert($newUri,fn:doc($doc))
    let $del := xdmp:document-delete($doc) 
    return true()
};

for $d in map:keys($to_process)
let $move := local:document-move($d, map:get($to_process,$d))
return true()

Solution

  • I suspect you are not actually renaming the documents, but just re-inserting them. The rename function you quote does not anticipate that situation and does a superfluous document-delete if $old-uri is identical to $new-uri. Add an if around the delete to skip it in that case. Keep everything else to preserve permissions, collections, quality, and properties. The document-insert function already removes pre-existing document before the actual insert. See also:

    http://docs.marklogic.com/xdmp:document-insert

    You might also consider adding a bit of logic to do multiple spawns. You would want to re-insert docs in batches of 100 to 500 docs ideally, depending on hardware and forest config. There is a nice example of how to calculate 'transactions' in this infostudio collector on github (starting from line 80):

    https://github.com/marklogic/infostudio-plugins/blob/master/collectors/collector-feed.xqy

    You can also consider doing the geo-work inside those transactions, instead of delegating that to CPF. But if your geo-lookup involves external calls, which could for instance be slow, then stick with CPF..

    HTH!