i would like use airflow for 2 tasks : The first is a request in openweathermap and i created json every 1 mn file and the second use the 20 last files and create un new file with : temperature key-value, city key-value and pression key-value. I would like run the first task and after the second task. I don't know why i have an error. I think, i have a problem with my code and maybe it's necessary use xcom. Can you help me ? thank you so much :-)
import requests
import json
import datetime
import os
import pandas as pd
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
my_dag = DAG(
dag_id='eval_airflow',
description="recup_data and transform data to csv",
schedule_interval='*/1 * * * *',
default_args={
'owner': 'airflow',
'start_date': days_ago(0),
}#,
#catchup=False
)
# définition de la fonction de récupération des données depuis OpenWeatherMap
def recup_data():
filepath = '/app/raw_files'
# création du dossier '/app/raw_files' de destination des fichiers résultats des requêtes
if os.path.exists(filepath) == False:
os.makedirs(filepath, mode = 511, exist_ok= True)
# positionnement dans le dossier '/app/raw_files'
os.chdir(filepath)
# création de la liste des villes pour lesquelles les données météo vont être demandées
villes = ["paris", "london","washington"]
cities = {}
for ville in villes:
r = requests.get(f"https://api.openweathermap.org/data/2.5/weather?q={ville}&appid=0eb6409c528ceeabc733ad3b07a67b58")
cities[ville] = r.json()
# obtenir l'heure et la date actuelles
now = datetime.datetime.now()
# créer un nom de fichier basé sur l'heure et la date
filename = f"{now.year}-{now.month}-{now.day} {now.hour}:{now.minute}.json"
# ouvrir le fichier en mode écriture
with open(filename, 'w') as file:
# écrire les données au format JSON dans le fichier
json.dump(cities, file)
r.status_code
return r.status_code
def transform_data_into_csv(n_files=None, filename='data.csv'):
parent_folder = '/app/raw_files'
files = sorted(os.listdir(parent_folder), reverse=True)[:20]
if n_files:
files = files[:n_files]
dfs = []
print('dfs', dfs)
for f in files:
with open(os.path.join(parent_folder, f), 'r') as file:
data_temp = json.load(file)
for data_city in data_temp:
dfs.append(
{
'temperature': data_temp[data_city]['main']['temp'],
'city': data_temp [data_city]['name'],
'pression': data_temp[data_city]['main']['pressure'],
'date': f.split('.')[0]
}
)
df = pd.DataFrame(dfs)
df.to_csv(os.path.join('/app/clean_data', filename), index=False)
task1 = PythonOperator(
task_id='task1_recup_data',
python_callable=recup_data,
dag=my_dag
)
task2 = PythonOperator(
task_id='task2_Transform_data_into_csv',
python_callable=transform_data_into_csv,
dag=my_dag
)
task1 >> task2
and my error :
File "/opt/airflow/dags/tache1.py", line 58, in transform_data_into_csv
data_temp = json.load(file)
File "/usr/local/lib/python3.6/json/__init__.py", line 299, in load
parse_constant=parse_constant, object_pairs_hook=object_pairs_hook, **kw)
File "/usr/local/lib/python3.6/json/__init__.py", line 354, in loads
return _default_decoder.decode(s)
File "/usr/local/lib/python3.6/json/decoder.py", line 339, in decode
obj, end = self.raw_decode(s, idx=_w(s, 0).end())
File "/usr/local/lib/python3.6/json/decoder.py", line 357, in raw_decode
raise JSONDecodeError("Expecting value", s, err.value) from None
json.decoder.JSONDecodeError: Expecting value: line 1 column 1 (char 0)
[2023-01-12 16:52:03,300] {taskinstance.py:1551} INFO - Marking task as FAILED. dag_id=eval_***, task_id=task2_Transform_data_into_csv, execution_date=20230112T165100, start_date=20230112T165203, end_date=20230112T165203
[2023-01-12 16:52:03,334] {local_task_job.py:151} INFO - Task exited with return code 1
Under the assumption that the files exist and are valid JSON. The issue might be encoding.
Try this:
def transform_data_into_csv(n_files=None, filename='data.csv'):
parent_folder = '/app/raw_files'
files = sorted(os.listdir(parent_folder), reverse=True)[:20]
if n_files:
files = files[:n_files]
dfs = []
print('dfs', dfs)
for f in files:
with open(os.path.join(parent_folder, f, encoding="utf-8"), 'r') as file:
data_temp = json.load(file)
for data_city in data_temp:
dfs.append(
{
'temperature': data_temp[data_city]['main']['temp'],
'city': data_temp [data_city]['name'],
'pression': data_temp[data_city]['main']['pressure'],
'date': f.split('.')[0]
}
)
df = pd.DataFrame(dfs)
df.to_csv(os.path.join('/app/clean_data', filename), index=False)
A good idea is to check if the file contains data before trying to deserialize it. So this can be added before you open the file f
:
import os
if os.stat(f).st_size > 0:
...
In your recup_data
it could also be an idea to check if you have an empty JSON object before writing to file, to avoid creating an empty file.
if bool(cities):
<write file>