When I save a model I want to create some folders, then download a few things to those folders.
Folders need to be created for the downloads, obviously, downloads can happen in parallel.
HEre's what I have:
@receiver(post_save, sender=Game)
def after_game_model_save(sender, instance, created, **kwargs):
logger.info("after_game_model_save")
task = None
task_id = uuid4()
tasks_chain = chain(create_game_folder.s(instance.id))
if created:
tasks_chain |= download_game.s(instance.id, instance.download_link).set(
task_id=str(task_id)
)
else:
if instance.tracker.has_changed("screenshots_urls"):
tasks_group = group(
[
download_game.s(instance.id, instance.download_link).set(
task_id=str(task_id)
),
download_screenshots.s(instance.id),
]
)
if instance.tracker.has_changed("download_link"):
tasks_group = group(
[
download_game_update.s(instance.id, instance.download_link).set(
task_id=str(task_id)
),
download_screenshots.s(instance.id),
]
)
tasks_chain |= tasks_group
tasks_chain.delay()
try:
task_obj = Task.objects.get(game=instance)
task_obj.task_id = str(task_id)
task_obj.save()
except Task.DoesNotExist:
Task.objects.create(game=instance, task_id=str(task_id))
I'm getting the error
TypeError: download_game() takes 2 positional arguments but 3 were given
If I interpret the examples correctly, the result of the first chained task get's sent as an argument to the second task? How can I chain tasks so they're executed in order without worrying about this?
The functions return nothing. So I guess right now I end up with something like download_game.s(instance.id, instance.download_link, None)
Update:
@shared_task()
def download_game(game_id, download_link):
game_folder = Path(settings.GAMES_FOLDER / str(game_id))
tmp_folder = game_folder / 'tmp'
logger.info(f"Downloading {download_link} to {tmp_folder}")
sleep(10)
with open(tmp_folder/'test.txt', 'w') as fp:
fp.write("##########################")
fp.write("##########################")
fp.write("##########################")
fp.write("##########################")
fp.write("##########################")
logger.info(f"Downloaded {download_link} to {tmp_folder}")
You should use .si
, if you don't want to use the previous task's output. This internally sets immutable=True
in the celery signature, which is the main class that deals with serialisation of messages along with its metadata.
tasks_chain = chain(create_game_folder.si(instance.id, immutable = True))
if created:
tasks_chain |= download_game.si(instance.id, instance.download_link).set(
task_id=str(task_id)
)