Search code examples
pythonlogginggoogle-cloud-platformcloudbucket

How to make Cloud Logging from Python while copying file to GCP Bucket


I have written a python script to copy files from local to gcp bucket and capture log info.

The gsutil rsync command is working fine and files are getting copied to corresponding target folders.

However, log info are not appearing on gcp log viewer. The sample script is given below. Please suggest.

## python3 /home/sant/multiprocessing_gs.py
from multiprocessing import Pool
from subprocess import Popen, PIPE, TimeoutExpired, run, CalledProcessError
import os
import sys
import logging as lg
import google.cloud.logging as gcl
from google.cloud.logging.handlers import CloudLoggingHandler

os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "/home/sant/key.json"
ftp_path1 = "/home/sant"
GCS_DATA_INGEST_BUCKET_URL = "dev2-ingest-manual"


class GcsMover:
    def __init__(self):
        self.folder_list = ["raw_amr", "osr_data"]
        self.logger = self.create_logger()

    def create_logger(self, log_name="Root_Logger", log_level=lg.INFO):
        try:
            log_format = lg.Formatter("%(levelname)s %(asctime)s - %(message)s")
            client = gcl.Client()
            log_handler = CloudLoggingHandler(client)
            log_handler.setFormatter(log_format)
            logger = lg.getLogger(log_name)
            logger.setLevel(log_level)
            logger.addHandler(log_handler)
            return logger
        except Exception as e:
            sys.exit("WARNING - Invalid cloud logging")

    def execute_jobs(self, cmd):
        try:
            gs_sp = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE, shell=True)
            print(f"starting process with Pid {str(gs_sp.pid)} for command {cmd}")
            self.logger.info(f"starting process with Pid {str(gs_sp.pid)} for command {cmd}")
            sp_out, sp_err = gs_sp.communicate(timeout=int(3600))
        except OSError:
            self.logger.error(f"Processing aborted for Pid {str(gs_sp.pid)}")
        except TimeoutExpired:
            gs_sp.kill()
            self.logger.error(f"Processing aborted for Pid {str(gs_sp.pid)}")
        else:
            if gs_sp.returncode:
                self.logger.error(f"Failure due to {sp_err} for Pid {str(gs_sp.pid)} and command {cmd}")
            else:
                print(f"Loading successful for Pid {str(gs_sp.pid)}")
                self.logger.info(f"Loading successful for Pid {str(gs_sp.pid)}")

    def move_files(self):
        command_list = []
        for folder in self.folder_list:
            gs_command = f"gsutil -m rsync -r {ftp_path1}/{folder} gs://{GCS_DATA_INGEST_BUCKET_URL}/{folder}"
            command_list.append(gs_command)
        pool = Pool(processes=2, maxtasksperchild=1)
        pool.map(self.execute_jobs, iterable=command_list)
        pool.close()
        pool.join()


def main():
    gsu = GcsMover()
    gsu.move_files()


if __name__ == "__main__":
    main()

Solution

  • Google Cloud Storage can log the actions taken on objects, as described in the documentation. You might need to activate audit logs in your project.

    Since your script uses rsync, this takes a few actions on GCS (details in the code of the command), but as an overview, it will check if the object exists in the bucket (by listing the bucket), if it exists it will compare the hash of the local file with the remote one, and it will upload the file if it has changed or if it didn't exist previously.

    All of those actions will be logged in the data access logs, which you can access from the console.

    If you want to also keep the local logs (in case there's a local error not logged in the cloud), you can change the command executed by appending a redirect to a log file:

    gsutil -m rsync -r /source/path gs://bucket/folder &> /path/to/log