Search code examples
cassandraconnection-poolingpycassa

Cassandra Pycassa connection pool, how to use properly?


In order to get a Cassandra insert going faster I'm using multithreading, its working ok, but if I add more threads it doesnt make any difference, I think I'm not generating more connections, I think maybe I should be using pool.execute(f, *args, **kwargs) but I dont know how to use it, the documentation is quite scanty. Heres my code so far..

import connect_to_ks_bp
from connect_to_ks_bp import ks_refs
import time
import pycassa
from datetime import datetime 
import json
import threadpool
pool = threadpool.ThreadPool(20)
count = 1
bench = open("benchCassp20_100000.txt", "w")

def process_tasks(lines):

    #let threadpool format your requests into a list
    requests = threadpool.makeRequests(insert_into_cfs, lines)

    #insert the requests into the threadpool
    for req in requests:
        pool.putRequest(req) 

    pool.wait()

def read(file):
    """read data from json and insert into keyspace"""
    json_data=open(file)
    lines = []
    for line in json_data:
        lines.append(line)
    print len(lines)
    process_tasks(lines)


def insert_into_cfs(line):
    global count
    count +=1
    if count > 5000:
            bench.write(str(datetime.now())+"\n")
            count = 1
    #print count
    #print kspool.checkedout()
    """
    user_tweet_cf = pycassa.ColumnFamily(kspool, 'UserTweet')
    user_name_cf = pycassa.ColumnFamily(kspool, 'UserName')
    tweet_cf = pycassa.ColumnFamily(kspool, 'Tweet')
    user_follower_cf = pycassa.ColumnFamily(kspool, 'UserFollower')
    """
    tweet_data = json.loads(line)
    """Format the tweet time as an epoch seconds int value"""
    tweet_time = time.strptime(tweet_data['created_at'],"%a, %d %b %Y %H:%M:%S +0000")
    tweet_time  = int(time.mktime(tweet_time))

    new_user_tweet(tweet_data['from_user_id'],tweet_time,tweet_data['id'])
    new_user_name(tweet_data['from_user_id'],tweet_data['from_user_name'])
    new_tweet(tweet_data['id'],tweet_data['text'],tweet_data['to_user_id'])

    if tweet_data['to_user_id'] != 0:
        new_user_follower(tweet_data['from_user_id'],tweet_data['to_user_id'])


""""4 functions below carry out the inserts into specific column families"""        
def new_user_tweet(from_user_id,tweet_time,id):
    ks_refs.user_tweet_cf.insert(from_user_id,{(tweet_time): id})

def new_user_name(from_user_id,user_name):
    ks_refs.user_name_cf.insert(from_user_id,{'username': user_name})

def new_tweet(id,text,to_user_id):
    ks_refs.tweet_cf.insert(id,{
    'text': text
    ,'to_user_id': to_user_id
    })  

def new_user_follower(from_user_id,to_user_id):
    ks_refs.user_follower_cf.insert(from_user_id,{to_user_id: 0})   

    read('tweets.json')
if __name__ == '__main__':

This is just another file..

import pycassa
from pycassa.pool import ConnectionPool
from pycassa.columnfamily import ColumnFamily

"""This is a static class I set up to hold the global database connection stuff,
I only want to connect once and then the various insert functions will use these fields a lot"""
class ks_refs():
    pool = ConnectionPool('TweetsKS',use_threadlocal = True,max_overflow = -1)

    @classmethod
    def cf_connect(cls, column_family):
        cf = pycassa.ColumnFamily(cls.pool, column_family)
        return cf

ks_refs.user_name_cfo = ks_refs.cf_connect('UserName')
ks_refs.user_tweet_cfo = ks_refs.cf_connect('UserTweet')
ks_refs.tweet_cfo = ks_refs.cf_connect('Tweet')
ks_refs.user_follower_cfo = ks_refs.cf_connect('UserFollower')

#trying out a batch mutator whihc is supposed to increase performance
ks_refs.user_name_cf = ks_refs.user_name_cfo.batch(queue_size=10000)
ks_refs.user_tweet_cf = ks_refs.user_tweet_cfo.batch(queue_size=10000)
ks_refs.tweet_cf = ks_refs.tweet_cfo.batch(queue_size=10000)
ks_refs.user_follower_cf = ks_refs.user_follower_cfo.batch(queue_size=10000)

Solution

  • A few thoughts:

    • Batch sizes of 10,000 are way too large. Try 100.
    • Make your ConnectionPool size at least as large as the number of threads using the pool_size parameter. The default is 5. Pool overflow should only be used when the number of active threads may vary over time, not when you have a fixed number of threads. The reason is that it will result in a lot of unnecessary opening and closing of new connections, which is a fairly expensive process.

    After you've resolved those issues, look into these:

    • I'm not familiar with the threadpool library that you're using. Make sure that if you take the insertions to Cassandra out of the picture that you see an increase in the performance when you increase the number of threads
    • Python itself has a limit to how many threads may be useful due to the GIL. It shouldn't normally max out at 20, but it might if you're doing something CPU intensive or something that requires a lot of Python interpretation. The test that I described in my previous point will cover this as well. It may be the case that you should consider using the multiprocessing module, but you would need some code changes to handle that (namely, not sharing ConnectionPools, CFs, or hardly anything else between processes).