Search code examples
pythonjsonpython-3.xmultiprocessingpool

Python Multiprocessing - Writing to a JSON file


I have a requirement where I am reading Order Dictionary from an Rest-API as given below:-

OrderedDict([('_bkt', 'ii~1809~C76785F7-95DF-4D1D-A5E7-A2202947734C'), ('_cd', '1809:718727061')])
OrderedDict([('_bkt', 'ii~1809~C76785F7-95DF-4D1D-A5E7-A2202947735C'), ('_cd', '1809:718727063')])
OrderedDict([('_bkt', 'ii~1809~C76785F7-95DF-4D1D-A5E7-A2202947736C'), ('_cd', '1809:718727065')])
OrderedDict([('_bkt', 'ii~1809~C76785F7-95DF-4D1D-A5E7-A2202947737C'), ('_cd', '1809:718727067')])

My requirement is to read Ordered Dictionary and write data in JSON format in Multiprocessing to a JSON file. But my code is not working properly, it is not writing data in JSON format to my target file. Please suggest.

Code is given below:-


from multiprocessing import Pool
from collections import OrderedDict
import simplejson as json

rr = OrderedDict([('_bkt', 'ii~1809~C76785F7-95DF-4D1D-A5E7-A2202947734C'), ('_cd', '1809:718727061')])

f = open('iitp222.json', "a")

def write_data(args):
    f.write(args + '\n')

###Get the results and display them using the ResultsReader.
if __name__ == "__main__":
    for result in rr:
            print result
            p = Pool()
            result = p.map(write_data, json.dumps(result))
            p.close()
            p.join()
    f.close()

Solution

  • I am able to solve my problem through following code

    #------------Import Lib-----------------------#
    import splunklib.results as results
    from collections import OrderedDict
    import splunklib.client  as client
    import simplejson as json, sys
    from datetime import datetime
    import multiprocessing as mp
    
    fn=sys.argv[1]
    HOST = "restapi.xxxx.com"
    PORT = 443
    
    #----Capturing Current Hour & Min--------------#
    Cur_min1 = datetime.now().strftime('%M')
    Cur_min = int(Cur_min1)/2
    
    #----Evaluating time to flip different users --- #
    if int(Cur_min) % 4 == 0:
            USERNAME = "xxxxxx"
            PASSWORD = "yyyyyy"
    elif int(Cur_min) % 4 == 1:
            USERNAME = "xxxxxx"
            PASSWORD = "yyyyyy"
    elif int(Cur_min) % 4 == 2:
            USERNAME = "kuuuu1"
            PASSWORD = "yyyyyy"
    else:
            USERNAME = "xxxx"
            PASSWORD = "yyyyyy"
    
    # Create a Service instance and log in
    try:
        service = client.connect(
            host=HOST,
            port=PORT,
            username=USERNAME,
            password=PASSWORD)
        rr = results.ResultsReader(service.jobs.export("search index=xxx host=yyyyyy* sourcetype=xxxx splunk_server=idx* earliest=-2m@m"))
        f1=open(fn, 'w')
        f1.close()
    except:
        os.system("python /home/xxx/MS_TeamsNotification.py 'Unable to connect Splunk Rest-API'")
        exit()
    
    ###Get the results and display them using the ResultsReader.
    
    def worker(arg, q):
        res = json.dumps(arg)
        q.put(res)
        return res
    
    def listener(q):
        '''listens for messages on the q, writes to file. '''
        with open(fn, 'w') as f:
            while 1:
                m = q.get()
                if m == 'kill':
                    break
                f.write(str(m) + '\n')
                f.flush()
    
    def main():
        #must use Manager queue here, or will not work
        manager = mp.Manager()
        q = manager.Queue()
        pool = mp.Pool(mp.cpu_count() + 2)
    
        #put listener to work first
        watcher = pool.apply_async(listener, (q,))
    
        #fire off workers
        jobs = []
        for result in rr:
            if isinstance(result, dict):
                job = pool.apply_async(worker, (result, q))
                jobs.append(job)
        assert rr.is_preview == False
    
        # collect results from the workers through the pool result queue
        for job in jobs:
            job.get()
    
        #now we are done, kill the listener
        q.put('kill')
        pool.close()
        pool.join()
    
    if __name__ == "__main__":
       main()