Search code examples
pythoncassandradatastax-astra

Problems inserting a new entry in Astra Cassandra


I am doing a migration from Cassandra on an AWS machine to Astra Cassandra and there are some problems :

I cannot insert new data in Astra Cassandra with a column which is around 2 million characters and 1.77 MB (and I have bigger data to insert - around 20 millions characters). Any one knows how to address the problem?

I am inserting it via a Python app (cassandra-driver==3.17.0) and this is the error stack I get :

start.sh[5625]: [2022-07-12 15:14:39,336] 
INFO in db_ops: error = Error from server: code=1500
[Replica(s) failed to execute write] 
message="Operation failed - received 0 responses and 2 failures: UNKNOWN from 0.0.0.125:7000, UNKNOWN from 0.0.0.181:7000" 
info={'consistency': 'LOCAL_QUORUM', 'required_responses': 2, 'received_responses': 0, 'failures': 2}

If I used half of those characters it works.

new Astra Cassandra CQL Console table description :

token@cqlsh> describe mykeyspace.series;

CREATE TABLE mykeyspace.series (
    type text,
    name text,
    as_of timestamp,
    data text,
    hash text,
    PRIMARY KEY ((type, name, as_of))
) WITH additional_write_policy = '99PERCENTILE'
    AND bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.UnifiedCompactionStrategy'}
    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair = 'BLOCKING'
    AND speculative_retry = '99PERCENTILE';

Old Cassandra table description:

ansible@cqlsh> describe mykeyspace.series;

CREATE TABLE mykeyspace.series (
    type text,
    name text,
    as_of timestamp,
    data text,
    hash text,
    PRIMARY KEY ((type, name, as_of))
) WITH bloom_filter_fp_chance = 0.01
    AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
    AND comment = ''
    AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
    AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
    AND crc_check_chance = 1.0
    AND dclocal_read_repair_chance = 0.1
    AND default_time_to_live = 0
    AND gc_grace_seconds = 864000
    AND max_index_interval = 2048
    AND memtable_flush_period_in_ms = 0
    AND min_index_interval = 128
    AND read_repair_chance = 0.0
    AND speculative_retry = '99PERCENTILE';

Data sample :

{"type": "OP", "name": "book", "as_of": "2022-03-17", "data": [{"year": 2022, "month": 3, "day": 17, "hour": 0, "quarter": 1, "week": 11, "wk_year": 2022, "is_peak": 0, "value": 1.28056854009628e-08}, .... ], "hash": "84421b8d934b06488e1ac464bd46e83ccd2beea5eb2f9f2c52428b706a9b2a10"}

where this json contains 27.000 entries inside the data array like : 

{"year": 2022, "month": 3, "day": 17, "hour": 0, "quarter": 1, "week": 11, "wk_year": 2022, "is_peak": 0, "value": 1.28056854009628e-08}

Python part of the code :

def insert_to_table(self, table_name, **kwargs):
        try:
            ...
            elif table_name == "series":
                self.session.execute(
                    self.session.prepare("INSERT INTO series (type, name, as_of, data, hash) VALUES (?, ?, ?, ?, ?)"),
                    (
                        kwargs["type"],
                        kwargs["name"],
                        kwargs["as_of"],
                        kwargs["data"],
                        kwargs["hash"],
                    ),
                )
            return True
        except Exception as error:
            current_app.logger.error('src/db/db_ops.py insert_to_table() table_name = %s error = %s', table_name, error)
            return False

Big thanks !


Solution

  • Finally, the short solution to make a data migration possible for an inhouse Cassandra DB to Astra Cassandra (DataStax) was to use compression (zlib).

    So the "data" field of each entry was compress in Python with zlib and then stored in Cassandra to reduce the size of the entries.

        def insert_to_table(self, table_name, **kwargs):
            try:
                if table_name == 'series_as_of':
                    ...
                elif table_name == 'series':
                    list_of_bytes = bytes(json.dumps(kwargs["data"]),'latin1')
                    compressed_data = zlib.compress(list_of_bytes)
                    a = str(compressed_data, 'latin1')
    
                    self.session.execute(
                        self.session.prepare(INSERT_SERIES_QUERY),
                            (
                                kwargs["type"],
                                kwargs["name"],
                                kwargs["as_of"],
                                a,
                                kwargs["hash"],
                            ),
                        )
        ....
    

    Then, when reading the entries, a decompression step is needed :

        def get_series(self, query_type, **kwargs):
            try:
                if query_type == "data":
                    execution = self.session.execute_async(
                        self.session.prepare(GET_SERIES_DATA_QUERY),
                        (kwargs["type"], kwargs["name"], kwargs["as_of"]),
                        timeout=30,
                    )
                    decompressed = None
                    a = None
                    for row in execution.result():
                        my_bytes = row[0].encode('latin1')
                        decompressed = zlib.decompress(my_bytes)
                        a = str(decompressed, encoding="latin1")
                    return a
        ...
    

    The data looks like this now in Astra Cassandra :

    token@cqlsh> select * from series where type = 'OP' and name = 'ZTP_PFM_H_LUX_PHY' and as_of = '2022-09-30';
    
     type | name              | as_of                           | data                                                                                                                                                                                                                                                                                                 | hash
    ------+-------------------+---------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------
       OP | ZTP_PFM_H_LUX_PHY | 2022-09-30 00:00:00.000000+0000 | x\x9c-\x8dÉ\nÂ@\x10D\x7fEúl`z\x16Mü\x80\x90\x83â\x1c\x14\F\x86îYÈÅ\x05\x92\x8b\x88ÿnÂx*ªÞ\x83\x82\x8f\x83ñýJ\x0e6\x0b\x07{ë`9å\x83îÿår°Þ¶;ßùíñämw.\x02\rþ\x99\x8b!\x85\x94\x95h*%\n\x8a4ÒL®·¹õ4ôÅÓÙ¨\x10\të \x99H\x04¡\x8cf6¹!\x95\x02'\x93"Jb\x1dë\x84ÈT¯U\x90\x19\x11W8\x1dp£\x8d\x83/ü\x00Ó<0\x99 | 4f53cda18c2baa0c0354bb5f9a3ecbe5ed12ab4d8e11ba873c2f11161202b945
    
    (1 rows)
    

    Long term solution is to rearrange the data like @Stefano Lottini said.