Search code examples
apache-sparkscikit-learnspark-streaming

Pickle object(model data) running into memory issue in SPARK streaming


Have been trying to apply a pickled model to predict on streaming data. Initially the model was almost 1 GB and thought reducing that might take care of the issue. Used a different protocol and compression to pickle the object and reduced it to 60 MB.

The input data stream is a json record and a prediction is applied on 3 keys.

Pickle object creation:

Previously:

joblib.dump(pipeline, 'itemc_nb.pkl') 

Current:

joblib.dump(pipeline, 'itemc_nb.pkl',compress=1,protocol=-1) 

Another theory I tested is the memory consumption on the edge node where the streaming script is running. At full capacity it is running at 70% as seen here

Edge node is of 22 GB capacity.

Another thought is around how many times the model might be getting invoked and not garbage collected. How could that possibly be resolved to pick up only once?

    model = joblib.load(os.path.join(__location__, 'itemc_nb.pkl'))

The function call to evaluate the input string is as below. There could inefficiencies here that might be causing this as well.

def predict_result(text):
    ret_val = ''
    try:
        if text is not None and (type(text) == str or type(text) == unicode):
        text = text.strip()
        text = text.lower()
        text = ''.join([i for i in text if not i.isdigit()])
        text = ' '.join(text.split())
        text = ' '.join([word for word in text.split() if word not in (stopwords.words('english'))])
        text = text.split(' ', 0)
        if re.match(r"^([a-z]|[0-9])\b", text[0]): #single letter removal
            return 'non-relevant'
        elif text[0] in ('n/a','na','.','nada','no','xx',''):  #cleaning list
            return 'non-relevant'
        elif not text[0]:
            return 'non-relevant'
        else:
            prediction = model.predict(text)
            cat_name = cat_dict.get(prediction[0], 'No key found')
            ret_val = cat_name
except (AttributeError, KeyError) as e:
        ret_val = 'error'

return ret_val

At this point looking for some opinions.

Exception encountered while processing data:
An error occurred while calling o394689.insertInto.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 366.0 failed 1 times, most recent failure: Lost task 0.0 in stage 366.0 (TID 366, localhost): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/hdp/current/spark-client/python/lib/pyspark.zip/pyspark/worker.py", line 98, in main
    command = pickleSer._read_with_length(infile)
  File "/usr/hdp/current/spark-client/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
    return self.loads(obj)
  File "/usr/hdp/current/spark-client/python/lib/pyspark.zip/pyspark/serializers.py", line 442, in loads
    return pickle.loads(obj)
  File "/usr/hdp/current/spark-client/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 700, in subimport
    __import__(name)
  File "/tmp/spark-9e6c86f3-4d80-4bef-833e-e5a225d2824f/userFiles-1784ee88-ee98-467d-9abd-f017cccecf49/streaming_models.zip/itemc/itemc_tagger.py", line 14, in <module>
  File "/u/users/svcerpp/virtualenvs/spark_kernel/lib/python2.7/site-packages/sklearn/externals/joblib/numpy_pickle.py", line 578, in load
    obj = _unpickle(fobj, filename, mmap_mode)
  File "/u/users/svcerpp/virtualenvs/spark_kernel/lib/python2.7/site-packages/sklearn/externals/joblib/numpy_pickle.py", line 508, in _unpickle
    obj = unpickler.load()
  File "/opt/rh/python27/root/usr/lib64/python2.7/pickle.py", line 864, in load
    dispatch[key](self)
  File "/u/users/svcerpp/virtualenvs/spark_kernel/lib/python2.7/site-packages/sklearn/externals/joblib/numpy_pickle.py", line 341, in load_build
    self.stack.append(array_wrapper.read(self))
  File "/u/users/svcerpp/virtualenvs/spark_kernel/lib/python2.7/site-packages/sklearn/externals/joblib/numpy_pickle.py", line 184, in read
    array = self.read_array(unpickler)
  File "/u/users/svcerpp/virtualenvs/spark_kernel/lib/python2.7/site-packages/sklearn/externals/joblib/numpy_pickle.py", line 130, in read_array
    array = unpickler.np.empty(count, dtype=self.dtype)
MemoryError: (MemoryError(), <function subimport at 0x7f1d4f353050>, ('itemc.itemc_tagger',))

    at org.apache.spark.api.python.PythonRunner$$anon$1.read(PythonRDD.scala:166)
    at org.apache.spark.api.python.PythonRunner$$anon$1.<init>(PythonRDD.scala:207)
    at org.apache.spark.api.python.PythonRunner.compute(PythonRDD.scala:125)
    at org.apache.spark.sql.execution.BatchPythonEvaluation$$anonfun$doExecute$1.apply(python.scala:405)
    at org.apache.spark.sql.execution.BatchPythonEvaluation$$anonfun$doExecute$1.apply(python.scala:370)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$22.apply(RDD.scala:717)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$22.apply(RDD.scala:717)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:313)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:277)
    at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:96)
    at org.apache.spark.rdd.CoalescedRDD$$anonfun$compute$1.apply(CoalescedRDD.scala:95)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
    at scala.collection.Iterator$class.foreach(Iterator.scala:727)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
    at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.org$apache$spark$sql$hive$execution$InsertIntoHiveTable$$writeToFile$1(InsertIntoHiveTable.scala:170)
    at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:150)
    at org.apache.spark.sql.hive.execution.InsertIntoHiveTable$$anonfun$saveAsHiveFile$3.apply(InsertIntoHiveTable.scala:150)

Solution

  • This was a memory error due to large pkl file trying to interact with a live stream data. Tackled this by deployed a compressed model.

    joblib.dump(pipeline, 'item.pkl',compress=1,protocol=-1) 
    

    The model size went from 1GB to 60 MB.