Search code examples
pythonpython-3.xcassandranosqldatastax-python-driver

How do I configure and execute BatchStatement in Cassandra correctly?


In my Python (3.8) application, I make a request to the Cassandra database via DataStax Python Driver 3.24.

I have several CQL operations that I am trying to execute with a single query via BatchStatement according to the official documentation. Unfortunately, my code causes an error with the following content:

"errorMessage": "retry_policy should implement cassandra.policies.RetryPolicy"
"errorType": "ValueError"

As you can see from my code I set the value for the reply_policy attribute inside BatchStatement. Anyway my code raise error which you see above. What kind of value must be inside the reply_policy property? What is the reason for the current conflict?

Code Snippet:

from cassandra.cluster import Cluster, ExecutionProfile, EXEC_PROFILE_DEFAULT
from cassandra.auth import PlainTextAuthProvider
from cassandra.policies import DCAwareRoundRobinPolicy
from cassandra import ConsistencyLevel
from cassandra.query import dict_factory
from cassandra.query import BatchStatement, SimpleStatement
from cassandra.policies import RetryPolicy


auth_provider = PlainTextAuthProvider(username=db_username, password=db_password)
default_profile = ExecutionProfile(
   load_balancing_policy=DCAwareRoundRobinPolicy(local_dc=db_local_dc),
   consistency_level=ConsistencyLevel.LOCAL_QUORUM,
   request_timeout=60,
   row_factory=dict_factory
)
cluster = Cluster(
   db_host,
   auth_provider=auth_provider,
   port=db_port,
   protocol_version=4,
   connect_timeout=60,
   idle_heartbeat_interval=0,
   execution_profiles={EXEC_PROFILE_DEFAULT: default_profile}
)
session = cluster.connect()

name_1, name_2, name_3  = "Bob", "Jack", "Alex"
age_1, age_2, age_3 = 25, 30, 18

cql_statement = "INSERT INTO users (name, age) VALUES (%s, %s)"

batch = BatchStatement(retry_policy=RetryPolicy)
batch.add(SimpleStatement(cql_statement, (name_1, age_1)))
batch.add(SimpleStatement(cql_statement, (name_2, age_2)))
batch.add(SimpleStatement(cql_statement, (name_3, age_3)))
session.execute(batch)

Solution

  • Well, I finally found the error.

    I removed the retry_policy property from the BatchStatement. Then my mistake was that I put CQL arguments inside SimpleStatement.

    Here is working example code snippet:

    ...
    batch = BatchStatement(batch_type=BatchType.UNLOGGED)
    batch.add(SimpleStatement(cql_statement), (name_1, age_1))
    batch.add(SimpleStatement(cql_statement), (name_2, age_2))
    batch.add(SimpleStatement(cql_statement), (name_3, age_3))
    session.execute(batch)
    

    EDITED:

    As a result, I abandoned BatchStatement after comments left at the bottom of this post. I beg you to pay attention to them! CQL batches are not the same as RBDMS batches. CQL batches are not an optimization but for achieving atomic updates of a denormalized record across multiple tables.