Search code examples
pythonclassificationpysparknaivebayes

NaiveBayes model training with separate training set and data using pyspark


So, I am trying to train a naive bayes clasifier. Went into a lot of trouble of preprocessing the data and I have now produced two RDDs:

  1. Traininng set: composed of a set of sparse-vectors;
  2. Labels: a corresponding list of labels (0,1) for every vector.

I need to run something like this:

# Train a naive Bayes model.
model = NaiveBayes.train(training, 1.0)

but "training" is a dataset derived from running:

def parseLine(line):
    parts = line.split(',')
    label = float(parts[0])
    features = Vectors.dense([float(x) for x in parts[1].split(' ')])
    return LabeledPoint(label, features)

data = sc.textFile('data/mllib/sample_naive_bayes_data.txt').map(parseLine)

based on the documentation for python here. My question is, given that I don't want to load the data from a txt file and that I have already created the training set in the form of records mapped to sparse-vectors (RDD) and a corresponding labelled list, how can I run naive-bayes?

Here is part of my code:

# Function
def featurize(tokens_kv, dictionary):
    """
    :param tokens_kv: list of tuples of the form (word, tf-idf score)
    :param dictionary: list of n words
    :return: sparse_vector of size n
    """

    # MUST sort tokens_kv by key
    tokens_kv = collections.OrderedDict(sorted(tokens_kv.items()))

    vector_size = len(dictionary)
    non_zero_indexes = []
    index_tfidf_values = []

    for key, value in tokens_kv.iteritems():
        index = 0
        for word in dictionary:
            if key == word:
                non_zero_indexes.append(index)
                index_tfidf_values.append(value)
            index += 1

    print non_zero_indexes
    print index_tfidf_values

    return SparseVector(vector_size, non_zero_indexes, index_tfidf_values)

# Feature Extraction
Training_Set_Vectors = (TFsIDFs_Vector_Weights_RDDs
                        .map(lambda (tokens): featurize(tokens, Dictionary_BV.value))
                        .cache())

... and labels is just a list of 1s and 0s. I understand that I may need to somehow use labeledpoint somehow but I am confused a to how... RDDs are not a list while labels is a list am hoping for something as simple as coming up with a way to create labeledpoint objets[i] combining sparse-vectors[i],corresponding-labels[i] respective values... any ideas?


Solution

  • I was able to solve this by first collecting the SparseVectors RDDs - effectively converting them to a list. Then, I run a function that constructed a list of labelledpoint objects:

    def final_form_4_training(SVs, labels):
        """
        :param SVs: List of Sparse vectors.
        :param labels: List of labels
        :return: list of labeledpoint objects
        """
    
        to_train = []
        for i in range(len(labels)):
            to_train.append(LabeledPoint(labels[i], SVs[i]))
        return to_train
    
    # Feature Extraction
    Training_Set_Vectors = (TFsIDFs_Vector_Weights_RDDs
                            .map(lambda (tokens): featurize(tokens, Dictionary_BV.value))
                            .collect())
    
    raw_input("Generate the LabeledPoint parameter... ")
    labelled_training_set = sc.parallelize(final_form_4_training(Training_Set_Vectors, training_labels))
    
    raw_input("Train the model... ")
    model = NaiveBayes.train(labelled_training_set, 1.0)
    

    However, this assumes that the RDDs maintain their order (with which I am not messing with) throughout the process pipeline. I also hate the part where I had to collect everything on the master. Any better ideas?