I have a requirement where I am reading Order Dictionary from an Rest-API as given below:-
OrderedDict([('_bkt', 'ii~1809~C76785F7-95DF-4D1D-A5E7-A2202947734C'), ('_cd', '1809:718727061')])
OrderedDict([('_bkt', 'ii~1809~C76785F7-95DF-4D1D-A5E7-A2202947735C'), ('_cd', '1809:718727063')])
OrderedDict([('_bkt', 'ii~1809~C76785F7-95DF-4D1D-A5E7-A2202947736C'), ('_cd', '1809:718727065')])
OrderedDict([('_bkt', 'ii~1809~C76785F7-95DF-4D1D-A5E7-A2202947737C'), ('_cd', '1809:718727067')])
My requirement is to read Ordered Dictionary and write data in JSON format in Multiprocessing to a JSON file. But my code is not working properly, it is not writing data in JSON format to my target file. Please suggest.
Code is given below:-
from multiprocessing import Pool
from collections import OrderedDict
import simplejson as json
rr = OrderedDict([('_bkt', 'ii~1809~C76785F7-95DF-4D1D-A5E7-A2202947734C'), ('_cd', '1809:718727061')])
f = open('iitp222.json', "a")
def write_data(args):
f.write(args + '\n')
###Get the results and display them using the ResultsReader.
if __name__ == "__main__":
for result in rr:
print result
p = Pool()
result = p.map(write_data, json.dumps(result))
p.close()
p.join()
f.close()
I am able to solve my problem through following code
#------------Import Lib-----------------------#
import splunklib.results as results
from collections import OrderedDict
import splunklib.client as client
import simplejson as json, sys
from datetime import datetime
import multiprocessing as mp
fn=sys.argv[1]
HOST = "restapi.xxxx.com"
PORT = 443
#----Capturing Current Hour & Min--------------#
Cur_min1 = datetime.now().strftime('%M')
Cur_min = int(Cur_min1)/2
#----Evaluating time to flip different users --- #
if int(Cur_min) % 4 == 0:
USERNAME = "xxxxxx"
PASSWORD = "yyyyyy"
elif int(Cur_min) % 4 == 1:
USERNAME = "xxxxxx"
PASSWORD = "yyyyyy"
elif int(Cur_min) % 4 == 2:
USERNAME = "kuuuu1"
PASSWORD = "yyyyyy"
else:
USERNAME = "xxxx"
PASSWORD = "yyyyyy"
# Create a Service instance and log in
try:
service = client.connect(
host=HOST,
port=PORT,
username=USERNAME,
password=PASSWORD)
rr = results.ResultsReader(service.jobs.export("search index=xxx host=yyyyyy* sourcetype=xxxx splunk_server=idx* earliest=-2m@m"))
f1=open(fn, 'w')
f1.close()
except:
os.system("python /home/xxx/MS_TeamsNotification.py 'Unable to connect Splunk Rest-API'")
exit()
###Get the results and display them using the ResultsReader.
def worker(arg, q):
res = json.dumps(arg)
q.put(res)
return res
def listener(q):
'''listens for messages on the q, writes to file. '''
with open(fn, 'w') as f:
while 1:
m = q.get()
if m == 'kill':
break
f.write(str(m) + '\n')
f.flush()
def main():
#must use Manager queue here, or will not work
manager = mp.Manager()
q = manager.Queue()
pool = mp.Pool(mp.cpu_count() + 2)
#put listener to work first
watcher = pool.apply_async(listener, (q,))
#fire off workers
jobs = []
for result in rr:
if isinstance(result, dict):
job = pool.apply_async(worker, (result, q))
jobs.append(job)
assert rr.is_preview == False
# collect results from the workers through the pool result queue
for job in jobs:
job.get()
#now we are done, kill the listener
q.put('kill')
pool.close()
pool.join()
if __name__ == "__main__":
main()