Search code examples
javasparqlgraphdblinked-datardf4j

Is there a possibility to batch a select-query with SPARQL and RDF4J?


I am working with a quite large dataset (around 500Mio-Triples) stored in graphDB Free and running on my local developer machine.

I want to do some operations with the dataset with RDF4J and have to SELECT more or less the whole dataset. To do a test, I just SELECT the desired tuples. The code runs fine for the first Million tuples, after that it gets really slow since graphDB continues to allocate more RAM.

Is there the possibility to do a SELECT-Query on very big datasets and get them in batches ?

Basically I want just to "Iterate" trough some selected triples, so there should be no need to use that much RAM from graphDB. I can see that I allready get data in RDF4J before the query finishes, since it crashes (HeapSpaceError) only at about 1.4 Mio read tuples. Unfortunately somehow graphDB doesn't free the memory of the allready read tuples. Am I missing something?

Thanks a lot for your help.

ps. I allready set the usable heapSpace of graphDB to 20GB.

The RDF4J (Java) Code looks like following:

package ch.test;


import org.eclipse.rdf4j.query.*;
import org.eclipse.rdf4j.repository.RepositoryConnection;
import org.eclipse.rdf4j.repository.http.HTTPRepository;

import java.io.File;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;

public class RDF2RDF {

    public static void main(String[] args) {
        System.out.println("Running RDF2RDF");

        HTTPRepository sourceRepo = new HTTPRepository("http://localhost:7200/repositories/datatraining");
        try {
            String path = new File("").getAbsolutePath();
            String sparqlCommand= Files.readString(Paths.get(path + "/src/main/resources/sparql/select.sparql"), StandardCharsets.ISO_8859_1);

            int chunkSize = 10000;
            int positionInChunk = 0;
            long loadedTuples = 0;

            RepositoryConnection sourceConnection = sourceRepo.getConnection();
            TupleQuery query = sourceConnection.prepareTupleQuery(sparqlCommand);

            try (TupleQueryResult result = query.evaluate()) {
                for (BindingSet solution:result) {
                    loadedTuples++;
                    positionInChunk++;

                    if (positionInChunk >= chunkSize) {
                        System.out.println("Got " + loadedTuples + " Tuples");
                        positionInChunk = 0;
                    }
                }
            }

        } catch (IOException err) {
            err.printStackTrace();
        }
    }
}

select.sparql:

PREFIX XXX_meta_schema: <http://schema.XXX.ch/meta/>
PREFIX XXX_post_schema: <http://schema.XXX.ch/post/>
PREFIX XXX_post_tech_schema: <http://schema.XXX.ch/post/tech/>

PREFIX XXX_geo_schema: <http://schema.XXX.ch/geo/>
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
PREFIX XXX_raw_schema: <http://schema.XXX.ch/raw/>
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>

SELECT * WHERE {

    BIND(<http://data.XXX.ch/raw/Table/XXX.csv> as ?table).

    ?row XXX_raw_schema:isDefinedBy ?table.

    ?cellStreetAdress XXX_raw_schema:isDefinedBy ?row;
        XXX_raw_schema:ofColumn <http://data.XXX.ch/raw/Column/Objektadresse>;
        rdf:value ?valueStreetAdress.

    ?cellOrt mobi_raw_schema:isDefinedBy ?row;
        XXX_raw_schema:ofColumn <http://XXX.mobi.ch/raw/Column/Ort>;
        rdf:value ?valueOrt.

    ?cellPlz mobi_raw_schema:isDefinedBy ?row;
        XXX_raw_schema:ofColumn <http://XXX.mobi.ch/raw/Column/PLZ>;
        rdf:value ?valuePLZ.

    BIND (URI(concat("http://data.XXX.ch/post/tech/Adress/", MD5(STR(?cellStreetAdress)))) as ?iri_tech_Adress).
}

My Solution: Using a subselect statemant which gets all "rows" first.

PREFIX mobi_post_schema: <http://schema.mobi.ch/post/>
PREFIX mobi_post_tech_schema: <http://schema.mobi.ch/post/tech/>

PREFIX mobi_geo_schema: <http://schema.mobi.ch/geo/>
PREFIX rdfs: <http://www.w3.org/2000/01/rdf-schema#>
PREFIX mobi_raw_schema: <http://schema.mobi.ch/raw/>
PREFIX rdf: <http://www.w3.org/1999/02/22-rdf-syntax-ns#>

SELECT * WHERE {

    {
        SELECT ?row WHERE
        {
            BIND(<http://data.mobi.ch/raw/Table/Gebaeudeobjekte_August2020_ARA_Post.csv> as ?table).

            ?row mobi_raw_schema:isDefinedBy ?table.
        }
    }


    ?cellStreetAdress mobi_raw_schema:isDefinedBy ?row;
        mobi_raw_schema:ofColumn <http://data.mobi.ch/raw/Column/Objektadresse>;
        rdf:value ?valueStreetAdress.

    ?cellOrt mobi_raw_schema:isDefinedBy ?row;
        mobi_raw_schema:ofColumn <http://data.mobi.ch/raw/Column/Ort>;
        rdf:value ?valueOrt.

    ?cellPlz mobi_raw_schema:isDefinedBy ?row;
        mobi_raw_schema:ofColumn <http://data.mobi.ch/raw/Column/PLZ>;
        rdf:value ?valuePLZ.

    BIND (URI(concat("http://data.mobi.ch/post/tech/Adress/", MD5(STR(?cellStreetAdress)))) as ?iri_tech_Adress).
}

Solution

  • I don't know immediately why the query given would be so costly, memory-wise, for GraphDB Free to execute, but generally a lot can depend on the shape and size of your dataset. Of course, doing a query that basically retrieves the entire database is not necessarily a wise thing to do in the first place.

    Having said that, there's a couple of things you can try. Working with LIMIT and OFFSET as a pagination mechanism is one way.

    Another option you could try is to split your query in two: one query retrieves all identifiers of resources you're interested in, and then you iterate over those and for each do a separate query to get the details (attributes and relations) for that particular resource.

    In your example, you could split on ?row, so you'd first do a query to get all rows for the given table:

    SELECT ?row WHERE {
        VALUES ?table { <http://data.XXX.ch/raw/Table/XXX.csv> }
        ?row XXX_raw_schema:isDefinedBy ?table.
    }
    

    And then you iterate over that result, injecting each returned value for ?row into the query that retrieves details:

    SELECT * WHERE {
        VALUES ?row { <http://data.XXX.ch/raw/Table/XXX.csv#row1> }
    
        ?cellStreetAdress XXX_raw_schema:isDefinedBy ?row;
            XXX_raw_schema:ofColumn <http://data.XXX.ch/raw/Column/Objektadresse>;
            rdf:value ?valueStreetAdress.
    
        ?cellOrt mobi_raw_schema:isDefinedBy ?row;
            XXX_raw_schema:ofColumn <http://XXX.mobi.ch/raw/Column/Ort>;
            rdf:value ?valueOrt.
    
        ?cellPlz mobi_raw_schema:isDefinedBy ?row;
            XXX_raw_schema:ofColumn <http://XXX.mobi.ch/raw/Column/PLZ>;
            rdf:value ?valuePLZ.
    
        BIND (URI(concat("http://data.XXX.ch/post/tech/Adress/", MD5(STR(?cellStreetAdress)))) as ?iri_tech_Adress).
    }
    

    In Java code, something like this:

    
    String sparqlCommand1 = // the query for all rows of the table
    
    // query for details of each row. Value of row will be injected via the API
    String sparqlCommand2 = "SELECT * WHERE { \n"
                        + "    ?cellStreetAdress XXX_raw_schema:isDefinedBy ?row;\n"
                        + "        XXX_raw_schema:ofColumn <http://data.XXX.ch/raw/Column/Objektadresse>;\n"
                        + "        rdf:value ?valueStreetAdress.\n"
                        + "    ?cellOrt mobi_raw_schema:isDefinedBy ?row;\n"
                        + "        XXX_raw_schema:ofColumn <http://XXX.mobi.ch/raw/Column/Ort>;\n"
                        + "        rdf:value ?valueOrt.\n"
                        + "    ?cellPlz mobi_raw_schema:isDefinedBy ?row;\n"
                        + "        XXX_raw_schema:ofColumn <http://XXX.mobi.ch/raw/Column/PLZ>;\n"
                        + "        rdf:value ?valuePLZ.\n"
                        + "    BIND (URI(concat(\"http://data.XXX.ch/post/tech/Adress/\", MD5(STR(?cellStreetAdress)))) as ?iri_tech_Adress).\n"
                        + "}";
    
    try(RepositoryConnection sourceConnection = sourceRepo.getConnection()) {
         TupleQuery rowQuery = sourceConnection.prepareTupleQuery(sparqlCommand1);     
         TupleQuery detailsQuery = sourceConnection.prepareTupleQuery(sparqlCommand2);
    
         try (TupleQueryResult result = rowQuery.evaluate()) {
             for (BindingSet solution: result) {
                    // inject the current row identifier
                    detailsQuery.setBinding("row", solution.getValue("row"));
    
                    // execute the details query for the row and do something with 
                    // the result
                    detailsQuery.evaluate().forEach(System.out::println);
             }
         }
    }
    

    You're doing more queries this way of course (N+1 where N is the number of rows), but each individual query result is only a small chunk, and probably easier for GraphDB Free (as well as your own application) to manage.