Search code examples
amazon-s3airflowslackairflow-scheduler

Make airflow read from S3 and post to slack?


I have a requirement where I want my airflow job to read a file from S3 and post its contents to slack.

Background

Currently, the airflow job has an S3 key sensor that waits for a file to be put in an S3 location and if that file doesn't appear in the stipulated time, it fails and pushes error messages to slack.

What needs to be done now

If airflow job succeeds, it needs to check another S3 location and if file there exists, then push its contents to slack.

Is this usecase possible with airflow?


Solution

  • You have already figured that the first step of your workflow has to be an S3KeySensor

    As for the subsequent steps, depending of what you mean by ..it needs to check another S3 location and if file there exists,.., go can go about it in the following way

    1. Step 1

      a. If the file at another S3 location is also supposed to appear there in sometime, then of course you will require another S3KeySensor

      b. Or else if this other file is expected to be there (or to not be there, but need not be waited upon to appear in sometime), we perform the check for presence of this file using check_for_key(..) function of S3_Hook (this can be done within python_callable of a simple PythonOperator / any other custom operator that you are using for step 2)

    2. Step 2

      By now, it is ascertained that either the second file is present in the expected location (or else we won't have come this far). Now you just need to read the contents of this file using read_key(..) function. After this you can push the contents to Slack using call(..) function of SlackHook. You might have an urge to use SlackApiOperator, (which you can, of course) but still reading the file from S3 and sending contents to Slack should be clubbed into single task. So you are better off doing these things in a generic PythonOperator by employing the same hooks that are used by the native operators also