五、Celery 高階用法【20200912】
阿新 • • 發佈:2020-09-13
1、通過Celery,獲取原生的RabbitMQ連結進行操作
案例:生產者與消費者
# 生產者 from django_celery_project import celery_app conn = celery_app.broker_connection() with conn.channel() as channel: producer = Producer(channel) from kombu import Exchange, Queue task_queue = Queue('tasks', Exchange('tasks'), routing_key='tasks') producer.publish( {'hello': 'world'}, retry=True, exchange=task_queue.exchange, routing_key=task_queue.routing_key, declare=[task_queue], # declares exchange, queue and binds. ) # 消費者 def callback(body, message): print(body) message.ack()from django_celery_project import celery_app conn = celery_app.broker_connection() task_queue = Queue('tasks', Exchange('tasks'), routing_key='tasks') with conn.channel() as channel: consumer = conn.Consumer(queues=task_queue, channel=channel) consumer.register_callback(callback) with consumer: conn.drain_events(timeout=1)
2、通過Celery獲取連結,實現獲取佇列大小
獲取佇列大小的作用,就是在刪除佇列之前一定在判斷佇列還有沒有資料,有資料的話,不能被刪除
from django_celery_project import celery_app broker_connection = celery_app.broker_connection() # tasks 是佇列名字 print(broker_connection.channel().basic_get('tasks', no_ack=False).delivery_info) print(broker_connection.channel().basic_get('tasks', no_ack=False).headers)
執行結果
{'delivery_tag': 1, 'redelivered': True, 'exchange': 'tasks', 'routing_key': 'tasks', 'message_count': 5}<== message_count 就是佇列的大小,記得再加上1,才是總的數量
{'content_type': 'application/json', 'content_encoding': 'utf-8', 'application_headers': {}, 'delivery_mode': 2, 'priority': 0}
3、通過Celery獲取連結,刪除佇列和交換介面
from django_celery_project import celery_app broker_connection = celery_app.broker_connection() broker_connection.channel().exchange_delete('tasks') # broker_connection.channel().queue_delete('tasks')