Search code examples
dockerdocker-composecassandradatastax-enterprisedatastax-node-driver

Apache Cassandra: Cannot import values of type `map<smallint, blob>` using CSV files


The problem

I have defined a table into Apache Cassandra which has a column of type map<smallint, blob> and I can't find a way to insert records in it via CSV file, because of a parsing error thrown by CQL shell. I am able to insert records manually (via queries), but the amount of data that has to be inserted is not suitable for normal queries nor batch requests.

Development environment

I have configured an Apache Cassandra cluster (3 nodes) using Docker compose (Docker Desktop 4.19.0) with the DataStax image: "datastax/dse-server:6.8.34-ubi7".
I am a Windows user (Windows 11 Home 22H2) therefore I am using the WSL 2 backend configured with 8GB of RAM and 4 virtual processors. (via the .wslconfig file)

Database schema

I defined my database schema using the following CQL commands:

Keyspace

CREATE KEYSPACE IF NOT EXISTS ks WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': '3' };  
CONSISTENCY QUORUM;  

Table

CREATE TABLE IF NOT EXISTS test (  
col_1 blob,  
col_2 smallint,  
col_3 map<smallint, blob>,  
col_4 map<smallint, tinyint>,  
PRIMARY KEY ((col_1), col_2)  
) WITH CLUSTERING ORDER BY (col_2 ASC);

What I have to do

I have to insert ~2K rows in the table, each of those having ~4K values in each map<x,y> field.

NDR: I read the docs and I am aware that this is something feasible, as far as the disk space occupied I will perform further performance analysis later on.. by now the disk usage it is not a concern.

First failed attempt

I initially tried to load the data using batch statements and prepared statements, using a script written in Typescript. (using DataStax Node.js Driver for Apache Cassandra®)(v4.6)
This method failed immediately because of timeout errors thrown by the library.

Second failed attempt (where the real problem lies)

I created a trivial one-line CSV file whose contents is:

0xa269374a6d82a6caea0143ed627ce414,1,{0: 0x0000000000000000000000000000000000000000},{0: 0}

I loaded the CSV file inside a Docker container filesystem:

docker cp <path_to_csv_file> <node_1>:/opt/dse

I established a connection to one of my Cassandra nodes with the following command:

docker exec -it <node_1> bash

I launched the command to import the CSV file inside the table:

COPY ks.test (col_1,col_2,col_3,col_4) FROM './file.csv' WITH HEADER = FALSE;

The following error is thrown:

Failed to import 1 rows: ParseError - Failed to parse {0: 0x0000000000000000000000000000000000000000} : unhashable type: 'bytearray',  given up without retries  
Failed to process 1 rows; failed rows written to import_ks_test.err  

At this point I immediately thought that I was writing incorrectly the blob value, even though the first column field, which is a blob, is parsed without any problem.
I wasted plenty of time changing the way I was writing the blob value to the CSV file, so at the end I decided to "manually" insert the row I intended to import via the CSV file.
Once I did that, I asked Cassandra to export the table as a CSV, in order to see how it would have represented the critical "mapping value" and the result, ironically, was the same file I wrote in the first place.. and as I expected, Cassandra couldn't import its own exported file.

Third failed attempt

I downloaded and installed the DataStax Bulk Loader (DSBulk), then I launched the following command (inside the ./bin folder, where the executable is located) to import the CSV file:

./dsbulk load -url file.csv -k ks -t test -h '127.0.0.1' -header false

The command printed in the console the following error:

At least 1 record does not match the provided schema.mapping or schema.query.  
Please check that the connector configuration and the schema configuration are correct.  
total | failed | rows/s | p50ms | p99ms | p999ms | batches  
    1 |      1 |      0 |  0,00 |  0,00 |   0,00 |    0,00  
Operation LOAD_20230515-095755-727000 completed with 1 errors in less than one second.  

So I opened the log file generated and it states the following error:

Resource: file:/C:/Users/Daniel/Desktop/dsbulk-1.10.0/bin/file.csv  
Position: 1  
Source: 0xa269374a6d82a6caea0143ed627ce414,1,{0: 0x0000000000000000000000000000000000000000},{0: 0}\u000a  
com.datastax.oss.dsbulk.workflow.commons.schema.InvalidMappingException: Could not map field 2 to variable col_3; conversion from Java type java.lang.String to CQL type Map(SMALLINT => BLOB, not frozen) failed for   raw value: {0: 0x0000000000000000000000000000000000000000}.  
[at ...]  
Caused by: java.lang.IllegalArgumentException: Could not parse '{0: 0x0000000000000000000000000000000000000000}' as Json  
[at ...]  
Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('x' (code 120)): was expecting comma to separate Object entries  
at [Source: (String)"{0: 0x0000000000000000000000000000000000000000}"; line: 1, column: 7]

My question

Is this a known problem that occurs with map collections that contain a blob data type?
Are there other solutions to load such a large amount of data? I am thinking of writing a Python script to run multiple requests in batch using multiple threads, abandoning the use of .CSV files.

I thank in advance anyone who can help me with this problem.


Solution

  • The issue is the format of the file, you are specifying JSON for the map values but for JSON the key should be a string, key's within JSON are not permitted to be numeric, they must be a string, e.g. enclosed by quotes. The value part doesn't need quotes depending on the type although I added them.

    Create a quick test container:

    docker run -e DS_LICENSE=accept --name my-dse -d datastax/dse-server:6.8.32
    

    I also copied in DSBulk to the container.

    Using CQLSH - add the schema (changed to RF 1 due to it being a single container)

    CREATE KEYSPACE IF NOT EXISTS ks WITH REPLICATION = { 'class': 'SimpleStrategy', 'replication_factor': '1' };  
    
    
    CREATE TABLE IF NOT EXISTS test (  
    col_1 blob,  
    col_2 smallint,  
    col_3 map<smallint, blob>,  
    col_4 map<smallint, tinyint>,  
    PRIMARY KEY ((col_1), col_2)  
    ) WITH CLUSTERING ORDER BY (col_2 ASC);
    

    edit the file.csv

    0xa269374a6d82a6caea0143ed627ce414,1,{"0": "0x0000000000000000000000000000000000000000"},{"0": "0"}
    

    Load it:

    dse@d8587ff4853e:~/dsbulk-1.10.0/bin$ ./dsbulk load -url ~/file.csv -k ks -t test -header false
    Operation directory: /opt/dse/dsbulk-1.10.0/bin/logs/LOAD_20230531-164854-470879
    total | failed | rows/s | p50ms | p99ms | p999ms | batches
        1 |      0 |      3 | 35.00 | 35.13 |  35.13 |    1.00
    Operation LOAD_20230531-164854-470879 completed successfully in less than one second.
    Checkpoints for the current operation were written to checkpoint.csv.
    To resume the current operation, re-run it with the same settings, and add the following command line flag:
    --dsbulk.log.checkpoint.file=/opt/dse/dsbulk-1.10.0/bin/logs/LOAD_20230531-164854-470879/checkpoint.csv
    

    Query the table:

    cqlsh> select * from ks.test;
    
     col_1                              | col_2 | col_3                                           | col_4
    ------------------------------------+-------+-------------------------------------------------+--------
     0xa269374a6d82a6caea0143ed627ce414 |     1 | {0: 0x0000000000000000000000000000000000000000} | {0: 0}