Hi I am trying to do a flume with pyspark integration but getting the error.
This is the code
from pyspark import SparkConf,SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.flume import FlumeUtils
import sys
from operator import add
from pyspark.sql import functions
hostname= sys.argv[1]
port= sys.argv[2]
renga = SparkConf().setMaster("yarn-client").setAppName("karthik")
amma= SparkContext(conf=renga)
appa=StreamingContext(amma,30)
rajagopal= FlumeUtils.createPollingStream(appa,hostname,port)
rajagopal.persist(StorageLevel.MEMORY_ONLY)
mohan= rajagopal.map(lambda m:m[1])
kowsi= mohan.flatMap(lambda fm : fm.split(" ")[6] == "deparment")
ujesh= kowsi.map(lambda m : (m.split(" ")[6].split("/")[1],1))
balaji=ujesh.reduceByKey(add)
balaji.saveAsTextFiles("xxxxx/user/shashankbh/jarvis/data/flume/conf")
appa.start()
appa.awaitTermination()
Thanks in advance
Regards, Renganathan
The function signature of FlumeUtils.createPollingStream
is
FlumeUtils.createPollingStream(
ssc,
addresses,
storageLevel=StorageLevel(True, True, False, False, 2),
maxBatchSize=1000,
parallelism=5,
bodyDecoder=<function utf8_decoder at 0x7efe1daea488>)
That means the third positional argument is the storage level. In your code, you pass port
, which is a string you get from sys.argv[2]
, therefore not a valid storage level.
This is what the stacktrace means. Learning how to understand stacktraces is a good skill for any programmer.