Search code examples
pythonairflowairflow-xcom

Use airflow for 2 task and i have a problem for run


i would like use airflow for 2 tasks : The first is a request in openweathermap and i created json every 1 mn file and the second use the 20 last files and create un new file with : temperature key-value, city key-value and pression key-value. I would like run the first task and after the second task. I don't know why i have an error. I think, i have a problem with my code and maybe it's necessary use xcom. Can you help me ? thank you so much :-)

import requests
import json
import datetime
import os
import pandas as pd
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago


my_dag = DAG(
    dag_id='eval_airflow',
    description="recup_data and transform data to csv",
    schedule_interval='*/1 * * * *',
    default_args={
        'owner': 'airflow',
        'start_date': days_ago(0),
    }#,
    #catchup=False
)

# définition de la fonction de récupération des données depuis OpenWeatherMap
def recup_data():
    filepath = '/app/raw_files'
    # création du dossier  '/app/raw_files' de destination des fichiers résultats des requêtes
    if os.path.exists(filepath) == False:
            os.makedirs(filepath, mode = 511, exist_ok= True)
    # positionnement dans le dossier '/app/raw_files'
    os.chdir(filepath)
    # création de la liste des villes pour lesquelles les données météo vont être demandées
    villes = ["paris", "london","washington"]
    cities = {}
    for ville in villes:
            r = requests.get(f"https://api.openweathermap.org/data/2.5/weather?q={ville}&appid=0eb6409c528ceeabc733ad3b07a67b58")
            cities[ville] = r.json()
# obtenir l'heure et la date actuelles
    now = datetime.datetime.now()
# créer un nom de fichier basé sur l'heure et la date
    filename = f"{now.year}-{now.month}-{now.day} {now.hour}:{now.minute}.json"
# ouvrir le fichier en mode écriture
    with open(filename, 'w') as file:
    # écrire les données au format JSON dans le fichier
        json.dump(cities, file)
        r.status_code
    return r.status_code
    


def transform_data_into_csv(n_files=None, filename='data.csv'):
    parent_folder = '/app/raw_files'
    files = sorted(os.listdir(parent_folder), reverse=True)[:20]
    if n_files:
        files = files[:n_files]
    dfs = []
    print('dfs', dfs)
    for f in files:
        with open(os.path.join(parent_folder, f), 'r') as file:
            data_temp = json.load(file)
        for data_city in data_temp:
            dfs.append(
               {
                    'temperature': data_temp[data_city]['main']['temp'],
                    'city': data_temp [data_city]['name'],
                    'pression': data_temp[data_city]['main']['pressure'],
                    'date': f.split('.')[0]
                }
            )
    df = pd.DataFrame(dfs)
    df.to_csv(os.path.join('/app/clean_data', filename), index=False)


task1 = PythonOperator(
    task_id='task1_recup_data',
    python_callable=recup_data,
    dag=my_dag
)

task2 = PythonOperator(
    task_id='task2_Transform_data_into_csv',
    python_callable=transform_data_into_csv,
    dag=my_dag
)

task1 >> task2

and my error :

      File "/opt/airflow/dags/tache1.py", line 58, in transform_data_into_csv
    data_temp = json.load(file)
  File "/usr/local/lib/python3.6/json/__init__.py", line 299, in load
    parse_constant=parse_constant, object_pairs_hook=object_pairs_hook, **kw)
  File "/usr/local/lib/python3.6/json/__init__.py", line 354, in loads
    return _default_decoder.decode(s)
  File "/usr/local/lib/python3.6/json/decoder.py", line 339, in decode
    obj, end = self.raw_decode(s, idx=_w(s, 0).end())
  File "/usr/local/lib/python3.6/json/decoder.py", line 357, in raw_decode
    raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
[2023-01-12 16:52:03,300] {taskinstance.py:1551} INFO - Marking task as FAILED. dag_id=eval_***, task_id=task2_Transform_data_into_csv, execution_date=20230112T165100, start_date=20230112T165203, end_date=20230112T165203
[2023-01-12 16:52:03,334] {local_task_job.py:151} INFO - Task exited with return code 1

Solution

  • Under the assumption that the files exist and are valid JSON. The issue might be encoding.

    Try this:

    def transform_data_into_csv(n_files=None, filename='data.csv'):
        parent_folder = '/app/raw_files'
        files = sorted(os.listdir(parent_folder), reverse=True)[:20]
        if n_files:
            files = files[:n_files]
        dfs = []
        print('dfs', dfs)
        for f in files:
            with open(os.path.join(parent_folder, f, encoding="utf-8"), 'r') as file:
                data_temp = json.load(file)
            for data_city in data_temp:
                dfs.append(
                   {
                        'temperature': data_temp[data_city]['main']['temp'],
                        'city': data_temp [data_city]['name'],
                        'pression': data_temp[data_city]['main']['pressure'],
                        'date': f.split('.')[0]
                    }
                )
        df = pd.DataFrame(dfs)
        df.to_csv(os.path.join('/app/clean_data', filename), index=False)
    

    A good idea is to check if the file contains data before trying to deserialize it. So this can be added before you open the file f:

    import os
    if os.stat(f).st_size > 0:
        ...
    

    In your recup_data it could also be an idea to check if you have an empty JSON object before writing to file, to avoid creating an empty file.

    if bool(cities):
        <write file>