1. 程式人生 > >celery-rabbitmq 安裝部署

celery-rabbitmq 安裝部署

down creat wait 路徑 新建 poll 改密 stop 處理

一:Python安裝

1.下載python3源碼
wget https://www.python.org/ftp/python/3.5.0/Python-3.5.0.tgz
2.解壓
tar xf Python-3.5.0.tgz
3.進入下載目錄

cd Python-3.5.0
4.編譯
./configure --prefix=/usr/local/python3
make&& make install
5.創建軟鏈接
ln -s /usr/local/python3/bin/python3 /usr/bin/python3
ln -s /usr/local/python3/bin/pip3 /usr/bin/pip3
6.celery安裝
pip install celery
ln -s /usr/local/python3/bin/celery /usr/bin/celery

二:rabbitmp安裝使用
1.rabbitmq安裝包下載
wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.5.0/rabbitmq-server-3.5.0-1.noarch.rpm
2.erlang依賴下載
wget http://www.rabbitmq.com/releases/erlang/erlang-19.0.4-1.el7.centos.x86_64.rpm
3.安裝
rpm -ivh xxx.rpm
4.配置
1)添加用戶並設置密碼:rabbitmqctl add_user admin 123456

2)添加權限(使admin用戶對虛擬主機“/” 具有所有權限):rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"

3)修改用戶角色(加入administrator用戶組):rabbitmqctl set_user_tags admin administrator

4)在/etc/rabbitmq/rabbitmq.config 添加:
[{rabbit, [{tcp_listeners, [5672]}, {loopback_users, ["admin"]}]}].

5.啟動服務

service rabbitmq-server start
service rabbitmq-server status
service rabbitmq-server stop
6.啟動監控界面
rabbitmq-plugins enable rabbitmq_management


三、監控安裝
1. pip3 install flower
2.啟動flower
假設在server2上啟動flower,flower默認的端口是5555.

celery flower --broker=amqp://admin:[email protected]:5672//

啟動worker命令
celery -A center worker --loglevel=debug


配置mq
RABBITMQ_HOSTS = "172.16.1.8"

RABBITMQ_PORT = 5672

RABBITMQ_VHOST = ‘/‘

RABBITMQ_USER = ‘admin‘

RABBITMQ_PWD = ‘admin‘

BROKER_URL = ‘amqp://%s:%s@%s:%d/%s‘ % (RABBITMQ_USER, RABBITMQ_PWD, RABBITMQ_HOSTS, RABBITMQ_PORT, RABBITMQ_VHOST)


添加用戶並設置密碼:
rabbitmqctl add_user admin 123456
添加權限(使admin用戶對虛擬主機“/” 具有所有權限):
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
修改用戶角色(加入administrator用戶組)
rabbitmqctl set_user_tags admin administrator

後臺啟動服務
celery multi start w1 -A celery_app worker --loglevel=debug

啟動: rabbitmq-server –detached
關閉:rabbitmqctl stop
若單機有多個實例,則在rabbitmqctlh後加–n 指定名稱


新建用戶:rabbitmqctl add_user xxxpwd
刪除用戶: rabbitmqctl delete_user xxx
查看用戶:rabbitmqctl list_users
改密碼: rabbimqctlchange_password {username} {newpassword}
設置用戶角色:rabbitmqctlset_user_tags {username} {tag ...}
Tag可以為 administrator,monitoring, management


權限設置:set_permissions [-pvhostpath] {user} {conf} {write} {read}
Vhostpath
Vhost路徑
user
用戶名
Conf
一個正則表達式match哪些配置資源能夠被該用戶訪問。
Write
一個正則表達式match哪些配置資源能夠被該用戶讀。
Read
一個正則表達式match哪些配置資源能夠被該用戶訪問。

celery -A worker worker --loglevel=debug -n workerA.%h -Q for_task_1,for_task_2 gevent -c 1

celery worker -A celery_app --loglevel=info -P gevent -c 1

celery -A celery_app worker --loglevel=debug

優先級配置
from kombu import Exchange, Queue


CELERY_QUEUES = (

Queue(‘task1‘, Exchange(‘task1‘, type=‘direct‘), routing_key=‘task1‘,

consumer_arguments={‘x-priority‘: 0}),

Queue(‘task2‘, Exchange(‘task2‘, type=‘direct‘), routing_key=‘task2‘,

consumer_arguments={‘x-priority‘: 10}),
)

from kombu import Queue
from kombu import Exchange


task_queues = (
Queue(‘priority_low‘, exchange=Exchange(‘priority‘, type=‘direct‘), routing_key=‘priority_low‘),
Queue(‘priority_high‘, exchange=Exchange(‘priority‘, type=‘direct‘), routing_key=‘priority_high‘),
)

task_routes = ([
(‘tasks.add‘, {‘queue‘: ‘priority_low‘}),
(‘tasks.multiply‘, {‘queue‘: ‘priority_high‘}),
],)

你可以限制任務的速率,這樣每分鐘只允許處理 10 個該類型的任務:
celeryconfig.py:

CELERY_ANNOTATIONS = {
‘tasks.add‘: {‘rate_limit‘: ‘10/m‘}
}
使用 rabbitmq 缺陷及註意事項:
RabbitMQ Result Backend
The RabbitMQ result backend (amqp) is special as it does not actually store the states, but rather sends them as messages. This is an important difference as it means that a result can only be retrieved once; If you have two processes waiting for the same result, one of the processes will never receive the result!

Even with that limitation, it is an excellent choice if you need to receive state changes in real-time. Using messaging means the client does not have to poll for new states.

There are several other pitfalls you should be aware of when using the RabbitMQ result backend:

Every new task creates a new queue on the server, with thousands of tasks the broker may be overloaded with queues and this will affect performance in negative ways. If you’re using RabbitMQ then each queue will be a separate Erlang process, so if you’re planning to keep many results simultaneously you may have to increase the Erlang process limit, and the maximum number of file descriptors your OS allows.
Old results will be cleaned automatically, based on the CELERY_TASK_RESULT_EXPIRES setting. By default this is set to expire after 1 day: if you have a very busy cluster you should lower this value.
For a list of options supported by the RabbitMQ result backend, please see AMQP backend settings.

使用傳統數據庫達的缺陷及註意事項:
Database Result Backend
Keeping state in the database can be convenient for many, especially for web applications with a database already in place, but it also comes with limitations.

Polling the database for new states is expensive, and so you should increase the polling intervals of operations such as result.get().

Some databases use a default transaction isolation level that is not suitable for polling tables for changes.

In MySQL the default transaction isolation level is REPEATABLE-READ, which means the transaction will not see changes by other transactions until the transaction is committed. It is recommended that you change to the READ-COMMITTED isolation level.


隊列優先級未測試完成


1.資料博客
https://blog.csdn.net/weixin_40475396/article/details/80439781


https://github.com/celery/celery/issues/2635

erlang 版本查詢
erl -V

erlang 下載地址
1.http://erlang.org/download/otp_src_R16B03.tar.gz
2. ./configure --prefix=/usr/local/erlang --enable-hipe --enable-threads --enable-smp-support --enable-kernel-poll --without-javac
3. make && make install

** celery 隊列只能實現隊列內的優先級,不能實現隊列之間的優先級

celery-rabbitmq 安裝部署