1. 程式人生 > 實用技巧 >五、Celery 高階用法【20200912】

五、Celery 高階用法【20200912】

1、通過Celery,獲取原生的RabbitMQ連結進行操作

案例:生產者與消費者

# 生產者
from django_celery_project import celery_app
conn = celery_app.broker_connection()
with conn.channel() as channel:
    producer = Producer(channel)
    from kombu import Exchange, Queue
    task_queue = Queue('tasks', Exchange('tasks'), routing_key='tasks
') producer.publish( {'hello': 'world'}, retry=True, exchange=task_queue.exchange, routing_key=task_queue.routing_key, declare=[task_queue], # declares exchange, queue and binds. ) # 消費者 def callback(body, message): print(body) message.ack()
from django_celery_project import celery_app conn = celery_app.broker_connection() task_queue = Queue('tasks', Exchange('tasks'), routing_key='tasks') with conn.channel() as channel: consumer = conn.Consumer(queues=task_queue, channel=channel) consumer.register_callback(callback) with consumer: conn.drain_events(timeout
=1)

2、通過Celery獲取連結,實現獲取佇列大小

獲取佇列大小的作用,就是在刪除佇列之前一定在判斷佇列還有沒有資料,有資料的話,不能被刪除
    from django_celery_project import celery_app
    broker_connection = celery_app.broker_connection()
    # tasks 是佇列名字
    print(broker_connection.channel().basic_get('tasks', no_ack=False).delivery_info)
    print(broker_connection.channel().basic_get('tasks', no_ack=False).headers)

執行結果

{'delivery_tag': 1, 'redelivered': True, 'exchange': 'tasks', 'routing_key': 'tasks', 'message_count': 5}<== message_count 就是佇列的大小,記得再加上1,才是總的數量
{'content_type': 'application/json', 'content_encoding': 'utf-8', 'application_headers': {}, 'delivery_mode': 2, 'priority': 0}

3、通過Celery獲取連結,刪除佇列和交換介面

    from django_celery_project import celery_app
    broker_connection = celery_app.broker_connection()
    broker_connection.channel().exchange_delete('tasks') # 
    broker_connection.channel().queue_delete('tasks')