Search code examples
pythonapache-sparkpysparkrecommendation-enginebigdata

PySpark similarities retrieved by IndexedRowMatrix().columnSimilarities() are not acessible: INFO ExternalSorter: Thread * spilling in-memory map


When I run the code:

from pyspark import SparkContext
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating
from random import random
import os
from scipy.sparse import csc_matrix
import pandas as pd
from pyspark.mllib.linalg.distributed import RowMatrix
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.linalg.distributed import CoordinateMatrix, MatrixEntry

from pyspark.sql import SQLContext

sc =SparkContext()
sqlContext = SQLContext(sc)
df = pd.read_csv("/Users/Andre/Code/blitsy-analytics/R_D/Data/cust_item_counts.csv", header=None)
customer_map = {x[1]:x[0] for x in enumerate(df[0].unique())}
item_map = {x[1]:x[0] for x in enumerate(df[1].unique())}
df[0] = df[0].map(lambda x: customer_map[x])
df[1] = df[1].map(lambda x: item_map[x])
#matrix = csc_matrix((df[2], (df[0], df[1])),shape=(max(df[0])+1, max(df[1])+1))

entries = sc.parallelize(df.apply(lambda x: tuple(x), axis=1).values)
mat = CoordinateMatrix(entries).toIndexedRowMatrix()
sim = mat.columnSimilarities()
sim.entries.map(lambda x: x).first()

I get thrown into a loop of threads spilling onto disk:

> 16/04/01 12:09:25 INFO ContextCleaner: Cleaned accumulator 294
> 16/04/01 12:09:25 INFO ContextCleaner: Cleaned accumulator 293
> 16/04/01 12:09:25 INFO ContextCleaner: Cleaned accumulator 292
> 16/04/01 12:09:25 INFO ContextCleaner: Cleaned accumulator 291
> 16/04/01 12:09:42 INFO ExternalSorter: Thread 108 spilling in-memory
> map of 137.6 MB to disk (1 time so far) 16/04/01 12:09:42 INFO
> ExternalSorter: Thread 112 spilling in-memory map of 158.1 MB to disk
> (1 time so far) 16/04/01 12:09:42 INFO ExternalSorter: Thread 114
> spilling in-memory map of 154.2 MB to disk (1 time so far) 16/04/01
> 12:09:42 INFO ExternalSorter: Thread 113 spilling in-memory map of
> 143.4 MB to disk (1 time so far)

This isn't true from the matrix 'mat' which returns it's first row entry.

Is this to do with memory management or the function columnSimilarity() itself?

I have ~86000 rows and columns in the sim variable.

My dataset was a list of tuples (user_id, item_id, value). I turn the user_id and item_id range into values between 0 and len(user_id| tem_id). This is so an id of 800000 doesn't force a matrix that large.

There are 800,000 entries of this type. The matrix in the variable 'mat' holds the value from the tuple at the coordinates of (user_id, item_id). This is verified by me as being the case.

The matrix at 'mat' has ~41,000 users and ~86,000 items. The column Similarity creates comparisons between each item which is why it has dimensions 86k x 86k

This was all done in the pyspark terminal ./bin/pyspark.


Solution

  • As discussed in the comment, the issue is related to the fact that you have lots of data which were not well partitioned considering your cluster configuration. That's why it was spilling on disk.

    You'll need to give your application more resources memory wise and/or augment data partitions.