Search code examples
pysparkflume

Exception("storageLevel must be of type pyspark.StorageLevel")


Hi I am trying to do a flume with pyspark integration but getting the error.

enter image description here

This is the code

from pyspark import SparkConf,SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.flume import FlumeUtils
import sys
from operator import add
from pyspark.sql import functions
hostname= sys.argv[1]
port= sys.argv[2]
renga = SparkConf().setMaster("yarn-client").setAppName("karthik")
amma= SparkContext(conf=renga)
appa=StreamingContext(amma,30)
rajagopal= FlumeUtils.createPollingStream(appa,hostname,port)
rajagopal.persist(StorageLevel.MEMORY_ONLY)
mohan= rajagopal.map(lambda m:m[1])
kowsi= mohan.flatMap(lambda fm : fm.split(" ")[6] == "deparment")
ujesh= kowsi.map(lambda m : (m.split(" ")[6].split("/")[1],1))
balaji=ujesh.reduceByKey(add)

balaji.saveAsTextFiles("xxxxx/user/shashankbh/jarvis/data/flume/conf")

appa.start()
appa.awaitTermination()

Thanks in advance

Regards, Renganathan


Solution

  • The function signature of FlumeUtils.createPollingStream is

    FlumeUtils.createPollingStream(
      ssc,
      addresses,
      storageLevel=StorageLevel(True, True, False, False, 2), 
      maxBatchSize=1000, 
      parallelism=5,
      bodyDecoder=<function utf8_decoder at 0x7efe1daea488>)
    

    That means the third positional argument is the storage level. In your code, you pass port, which is a string you get from sys.argv[2], therefore not a valid storage level.

    This is what the stacktrace means. Learning how to understand stacktraces is a good skill for any programmer.