Search code examples

Airflow scheduler failure

I have followed this tutorial in attempt to build an airflow cluster on localhost with my own DAGs. When I ran airflow scheduler after having set executor = CeleryExecutor in the config file, I received the following traceback:

Traceback (most recent call last):

File "/home/yurii/Tools/anaconda3/bin/airflow", line 28, in args.func(args)

File"/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/airflow/bin/", line 839, in scheduler

File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/airflow/", line 200, in run self._execute()

File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/airflow/", line 1309, in _execute self._execute_helper(processor_manager)

File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/airflow/", line 1441, in _execute_helper self.executor.heartbeat()

File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/airflow/executors/", line 124, in heartbeat self.execute_async(key, command=command, queue=queue)

File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/airflow/executors/", line 80, in execute_async args=[command], queue=queue)

File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/celery/app/", line 573, in apply_async **dict(self._get_exec_options(), **options)

File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/celery/app/", line 354, in send_task reply_to=reply_to or self.oid, **options

File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/celery/app/", line 310, in publish_task **kwargs

File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/kombu/", line 172, in publish routing_key, mandatory, immediate, exchange, declare)

File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/kombu/", line 449, in _ensured return fun(*args, **kwargs)

File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/kombu/", line 188, in _publish mandatory=mandatory, immediate=immediate,

File "/home/yurii/Tools/anaconda3/lib/python3.6/site-packages/librabbitmq/", line 122, in basic_publish mandatory or False, immediate or False,

TypeError: an integer is required (got type NoneType)

Some additional information:

  • I am using Airflow 1.8.0 along with Celery 3.1.25 and RabbitMQ 3.5.7 as a broker and backend, but also tried Airflow 1.9.0 with Celery 4.2.
  • Airflow with sequential executor works without any problems.
  • `airflow test "dag_name" "task_name" "exec_date" runs succeessfully.

I am new to Airflow/Celery/RabbitMQ/SQL, so any help would be appreciated!


  • It seems you are using librabbitmq as amqp broker which is not recommended by celery core team. Use py-amqp as the rabbitmq broker and you should get rid of this error.