Search code examples
pythoncassandra

Cassandra throws error('required argument is not an integer') when I set the consistency level to quorum


Here is my table's definition.

CREATE TABLE chat_data.twitch_chat_by_broadcaster_and_timestamp (
    broadcaster_id int,
    year_month int,
    timestamp bigint,
    message_id uuid,
    message text,
    PRIMARY KEY ((broadcaster_id, year_month), timestamp, message_id)
) WITH CLUSTERING ORDER BY (timestamp ASC, message_id ASC)

Here is my code to insert.

import logging
import auth.secrets as secrets
from cassandra.auth import PlainTextAuthProvider
from cassandra.cluster import Cluster
from cassandra.policies import DCAwareRoundRobinPolicy, TokenAwarePolicy
from cassandra.query import BatchStatement, BatchType, tuple_factory
from datetime_helpers import get_month, get_next_month

class DatabaseConnection:
    def __init__(self, keyspace):
        auth_provider = PlainTextAuthProvider(
            secrets.get_astra_client_id(), secrets.get_astra_secret()
        )
        load_balancing_policy = TokenAwarePolicy(DCAwareRoundRobinPolicy())
        cluster = Cluster(
            cloud=secrets.get_astra_cloud_config(),
            auth_provider=auth_provider,
            load_balancing_policy=load_balancing_policy,
        )
        self.session = cluster.connect(keyspace)

    def insert_chats(self, messages):
        logging.info(f"Inserting {len(messages)} message")

        batch = BatchStatement(
            consistency_level="QUORUM", batch_type=BatchType.UNLOGGED
        )

        statement = self.session.prepare(
            """
            INSERT INTO twitch_chat_by_broadcaster_and_timestamp (broadcaster_id, year_month, timestamp, message_id, message)
            VALUES (?, ?, ?, ?, ?)
            """
        )

        for m in messages:
            broadcaster_id, timestamp, message_id, message = m
            month = get_month(timestamp)
            batch.add(
                statement, (broadcaster_id, month, timestamp, message_id, message)
            )

        try:
            self.session.execute(batch)
            logging.info("Messages inserted successfully")
            return True
        except Exception as e:
            logging.error(f"Exception: {e}")
            return False

execute throws

Exception: ('Unable to complete the operation against any hosts', {<Host: <astra datastax machine #1>: error('required argument is not an integer'), <Host: <astra datastax machine #2>: error('required argument is not an integer'), <Host: <astra datastax machine #3>>: error('required argument is not an integer')}

I printed out the types I'm passing in the batch.add statement and confirmed they are correct. I eventually got the database to accept my insertion by removing consistency_level="QUORUM" from the BatchStatement. It now reads batch = BatchStatement(batch_type=BatchType.UNLOGGED)

Any ideas why I'm only getting the exception when I try to set the consistency level to quorum? Also, what's going on under the hood that's causing this exception?


Solution

  • You are setting the consistency level to a string value, instead of the enum ConsistencyLevel.QUORUM . The enum is an integer value (https://github.com/datastax/python-driver/blob/master/cassandra/__init__.py#L29)

    batch = BatchStatement(
            consistency_level=ConsistencyLevel.QUORUM, batch_type=BatchType.UNLOGGED
    )