see UPDATE below!
I have the following issue: We are collecting (millions) of documents (tweets) into ML and on insert we have a cpf job that creates metadata for each document. More precise it adds a geotag based on location(if location or coordinates are present).
Now we have a database that has been collecting tweets without the geotagger active. We would like to process the stored tweets with this cpf job by deleting and re-inserting each document that does not yet have a proper metadata geotag element. Then cpf does it's job and geotaggs the "new" document.
We have written the following code to delete and insert the documents but we get a XDMP-CONFLICTUPDATES error. I have been reading about transactions and tried several things, the ";" trick. wrapping in xdmp:eval or splitting up the delete and insert in two separate function calls from xdmp:spawn.
Still no luck.
spawn-rename.xqy
xquery version "1.0-ml";
declare namespace j = "http://marklogic.com/xdmp/json/basic";
declare variable $to_process external;
declare function local:document-rename(
$old-uri as xs:string, $new-uri as xs:string)
as empty-sequence()
{
(:xdmp:set-transaction-mode("update"),:)
xdmp:eval(xdmp:document-delete($old-uri)),
(:xdmp:commit():)
let $permissions := xdmp:document-get-permissions($old-uri)
let $collections := xdmp:document-get-collections($old-uri)
return xdmp:document-insert(
$new-uri, doc($old-uri),
if ($permissions) then $permissions
else xdmp:default-permissions(),
if ($collections) then $collections
else xdmp:default-collections(),
xdmp:document-get-quality($old-uri)
)
};
for $d in map:keys($to_process)
let $rename := local:document-rename($d, map:get($to_process,$d))
return true()
and to run the job for a specific set of documents we use:
xquery version "1.0-ml";
declare namespace j = "http://marklogic.com/xdmp/json/basic";
declare namespace dikw = 'http://www.example.com/dikw_functions.xqy';
import module namespace json = "http://marklogic.com/xdmp/json" at "/MarkLogic/json/json.xqy";
let $foo := cts:uris((),(), cts:not-query(cts:element-query(xs:QName("j:dikwmetadata"), cts:element-query(xs:QName("j:data"), cts:and-query(())))))
let $items := cts:uri-match("/twitter/403580066367815680.json") (:any valid uri or set of uris:)
let $map := map:map()
let $f := doc($items[1])
let $id := $f/j:json/j:id/text()
let $oldUri := xdmp:node-uri($f)
let $newUri := fn:concat("/twitter/", $f/j:json/j:id/text(), ".json")
let $put := map:put($map,$oldUri,$newUri)
let $spawn := xdmp:spawn("/Modules/DIKW/spawn-rename-split.xqy", (xs:QName("to_process"), $map))
return ($oldUri, " - ", $newUri)
Question:
How can I set up the code so that it deleted the documents in the map first in a separate transaction and inserts them back later so cpf can do it's geotagging?
UPDATE
Ok so per grtjn his comments (thx so far!) I try to rewrite my code like :
xquery version "1.0-ml";
declare namespace j = "http://marklogic.com/xdmp/json/basic";
let $entries := cts:uri-match("//twitter/*")
let $entry-count := fn:count($entries)
let $transaction-size := 100 (: batch size $max :)
let $total-transactions := ceiling($entry-count div $transaction-size)
(: set total documents and total transactions so UI displays collecting :)
(: skip 84 85
let $set-total := infodev:ticket-set-total-documents($ticket-id, $entry-count)
let $set-trans := infodev:ticket-set-total-transactions($ticket-id,$total-transactions)
:)
(: create transactions by breaking document set into maps
each maps's documents are saved to the db in their own transaction :)
let $transactions :=
for $i at $index in 1 to $total-transactions
let $map := map:map()
let $start := (($i -1) *$transaction-size) + 1
let $finish := min((($start - 1 + $transaction-size),$entry-count))
let $put :=
for $entry in ($entries)[$start to $finish]
(: 96
let $id := fn:concat(fn:string($entry/atom:id),".xml")
:)
let $id := fn:doc($entry)/j:json/j:id/text()
return map:put($map,$id,$entry)
return $map
(: the callback function for ingest
skip 101 let $function := xdmp:function(xs:QName("feed:process-file"))
:)
let $ingestion :=
for $transaction at $index in $transactions
return true()
return $ingestion (: this second return statement seems odd? :)
(: do spawn here? :)
(: xdmp:spawn("/modules/spawn-move.xqy", (xs:QName("to_process"), $map)) :)
Now I am puzzled, to get this 'working' I needed to add the last return which seems not right. Also I am trying to figure out what exactly happens, If I run the query as is it returns with a timeout error. I would like to first understand what the transaction actually does. Sorry for my ignorance but seems that performing a (relatively simple) task as renaming some docs looks not that simple?
for completeness my spawn-move.qry here:
xquery version "1.0-ml";
declare namespace j = "http://marklogic.com/xdmp/json/basic";
declare variable $to_process external;
declare function local:document-move(
$id as xs:string, $doc as xs:string)
as empty-sequence()
{
let $newUri := fn:concat("/twitter/", $id, ".json")
let $ins := xdmp:document-insert($newUri,fn:doc($doc))
let $del := xdmp:document-delete($doc)
return true()
};
for $d in map:keys($to_process)
let $move := local:document-move($d, map:get($to_process,$d))
return true()
I suspect you are not actually renaming the documents, but just re-inserting them. The rename
function you quote does not anticipate that situation and does a superfluous document-delete
if $old-uri
is identical to $new-uri
. Add an if
around the delete to skip it in that case. Keep everything else to preserve permissions, collections, quality, and properties. The document-insert
function already removes pre-existing document before the actual insert. See also:
http://docs.marklogic.com/xdmp:document-insert
You might also consider adding a bit of logic to do multiple spawns. You would want to re-insert docs in batches of 100 to 500 docs ideally, depending on hardware and forest config. There is a nice example of how to calculate 'transactions' in this infostudio collector on github (starting from line 80):
https://github.com/marklogic/infostudio-plugins/blob/master/collectors/collector-feed.xqy
You can also consider doing the geo-work inside those transactions, instead of delegating that to CPF. But if your geo-lookup involves external calls, which could for instance be slow, then stick with CPF..
HTH!