在Docker容器中配置和執行Django + Celery
閱讀完這篇博文後,你將能夠使用Django,PostgreSQL,Redis和RabbitMQ配置Celery,然後在Docker容器中執行它們。
今天,你將學習如何建立一個分散式任務處理系統來快速建立原型。你將使用Django,PostgreSQL,Redis和RabbitMQ配置Celery,然後在Docker容器中執行它們。閱讀本教程前你需要一些Docker的實踐知識,你可以在我以前的帖子中找到這些知識。
Django是一個非常著名的Python Web框架,Celery是一個分散式任務佇列。您將使用PostgreSQL作為常規資料庫來儲存作業,RabbitMQ作為訊息代理,並將Redis作為任務儲存後端。
動機
當你構建一個Web應用程式時,你遲早要實現某種離線任務處理。
例如:
Alice希望將她的貓的照片從.jpg轉換為.png,或者從她的.jpg檔案集中建立.pdf。 在一個HTTP請求中執行這些任務中的任何一個都需要很長時間才能執行完,並且會給Web伺服器帶來不必要的負擔 - 這意味著我們無法同時處理其他請求。 常見的解決方案是在後臺執行任務 - 通常在另一臺機器上 - 並輪詢結果。
離線任務處理的簡單設定可能如下所示:
1. Alice上傳圖片。
2. Web伺服器安排worker的任務。
3. worker獲得任務並轉換照片。
4. worker建立一些任務的處理結果(在這種情況下,結果是轉換後的照片)。
5. Web瀏覽器查詢結果。
6. Web瀏覽器從伺服器獲取結果。
這個設定看起來很清晰,但它有一個嚴重的缺陷 - 它不能很好地擴充套件。如果Alice有很多貓的圖片,一臺伺服器不足以一次性處理它們呢?或者,如果還有其他一些非常大的工作,並且所有其他工作都被它阻塞?她是否在乎是否所有圖片都一次性處理?如果處理在某些時候失敗會怎麼樣?
坦率地說,有一種解決方案不會在每次獲得更大圖片時都會讓機器宕機。在Web伺服器和worker之間你需要的一些東西:broker。Web伺服器將通過與broker進行通訊來安排新任務,broker將與worker通訊以實際執行這些任務。你可能還想緩衝任務,如果失敗則重試,並監視其中有多少任務已處理。
你將不得不為不同優先順序的任務建立佇列,或者為適合不同型別工作的任務建立佇列。
所有這些都可以通過使用Celery–一個開源的分散式任務佇列來大大簡化。
如何建立Celery
Celery包括:
- 任務,如你在應用中定義的
- 將任務路由到worker和佇列的broker
- 做實際工作的worker
- 儲存後端
你可以在這裡觀看對Celery更深入的介紹或直接跳到Celery的入門指南
你的設定
從標準的Django專案結構開始。它可以使用django-admin建立,通過在shell中執行:
$ django-admin startproject myproject
這建立了一個專案結構:
.
└── myproject
├── manage.py
└── myproject
├── __init__.py
├── settings.py
├── urls.py
└── wsgi.py
在本教程結束時,它看起來像這樣:
.
├── Dockerfile
├── docker-compose.yml
├── myproject
│ ├── manage.py
│ └── myproject
│ ├── celeryconf.py
│ ├── __init__.py
│ ├── models.py
│ ├── serializers.py
│ ├── settings.py
│ ├── tasks.py
│ ├── urls.py
│ ├── views.py
│ └── wsgi.py
├── requirements.txt
├── run_celery.sh
└── run_web.sh
建立Docker容器
由於我們正在使用Docker 1.12,因此我們需要一個正確的Dockerfile
檔案來指定如何構建映象。
自定義容器
Dockerfile
# use base python image with python 2.7
FROM python:2.7
# add requirements.txt to the image
ADD requirements.txt /app/requirements.txt
# set working directory to /app/
WORKDIR /app/
# install python dependencies
RUN pip install -r requirements.txt
# create unprivileged user
RUN adduser --disabled-password --gecos '' myuser
我們的python依賴是:
requirements.txt
Django==1.9.8
celery==3.1.20
djangorestframework==3.3.1
psycopg2==2.5.3
redis==2.10.5
我已經凍結了版本的依賴關係(指定了版本號),以確保你將有一個能正常工作的設定。如果你願意,你可以更新它們中的任何一個版本,但不保證能工作。
選擇服務的映象
現在我們只需要設定RabbitMQ,PostgreSQL和Redis。由於Docker推出了它們的官方庫,我儘可能使用官方映象。但是,這些可能有時會被打破。當發生這種情況時,你將不得不使用別的東西(非官方映象)。
這裡是我為這個專案測試並選擇的映象:
使用docker-compose設定多容器應用程式
現在你將使用docker-compose將你自己的容器與我們在上一節中選擇的容器組合起來。
docker-compose.yml
version: '2'
services:
# PostgreSQL database
db:
image: postgres:9.4
hostname: db
environment:
- POSTGRES_USER=postgres
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=postgres
ports:
- "5432:5432"
# Redis
redis:
image: redis:2.8.19
hostname: redis
# RabbitMQ
rabbit:
hostname: rabbit
image: rabbitmq:3.6.0
environment:
- RABBITMQ_DEFAULT_USER=admin
- RABBITMQ_DEFAULT_PASS=mypass
ports:
- "5672:5672" # we forward this port because it's useful for debugging
- "15672:15672" # here, we can access rabbitmq management plugin
# Django web server
web:
build:
context: .
dockerfile: Dockerfile
hostname: web
command: ./run_web.sh
volumes:
- .:/app # mount current directory inside container
ports:
- "8000:8000"
# set up links so that web knows about db, rabbit and redis
links:
- db
- rabbit
- redis
depends_on:
- db
# Celery worker
worker:
build:
context: .
dockerfile: Dockerfile
command: ./run_celery.sh
volumes:
- .:/app
links:
- db
- rabbit
- redis
depends_on:
- rabbit
配置Web伺服器和worker
你可能已經注意到,worker和web伺服器都會執行一些啟動指令碼。在這裡他們是(確保它們是可執行的):
run_web.sh
#!/bin/sh
# wait for PSQL server to start
sleep 10
cd myproject
# prepare init migration
su -m myuser -c "python manage.py makemigrations myproject"
# migrate db, so we have the latest db schema
su -m myuser -c "python manage.py migrate"
# start development server on public ip interface, on port 8000
su -m myuser -c "python manage.py runserver 0.0.0.0:8000"
run_celery.sh
#!/bin/sh
# wait for RabbitMQ server to start
sleep 10
cd myproject
# run Celery worker for our project myproject with Celery configuration stored in Celeryconf
su -m myuser -c "celery worker -A myproject.celeryconf -Q default -n [email protected]%h"
第一個指令碼 - run_web.sh - 將遷移資料庫並在埠8000上啟動Django開發伺服器。
第二個指令碼 run_celery.sh 將啟動一個Celery worker監聽佇列預設值。
在這個階段,這些指令碼將無法正常工作,因為我們還沒有配置它們。我們的應用程式仍然不知道我們想使用PostgreSQL作為資料庫,或者在哪裡找到它(在某個容器中)。我們還必須配置Redis和RabbitMQ。
但在開始之前,有一些有用的Celery設定可以使您的系統更好地執行。以下是這個Django應用程式的完整設定。
myproject/settings.py
import os
from kombu import Exchange, Queue
BASE_DIR = os.path.dirname(os.path.dirname(__file__))
# SECURITY WARNING: keep the secret key used in production secret!
SECRET_KEY = '[email protected]^+)it4e&ueu#!4tl9p1h%2sjr7ey0)m25f'
# SECURITY WARNING: don't run with debug turned on in production!
DEBUG = True
TEMPLATE_DEBUG = True
ALLOWED_HOSTS = []
# Application definition
INSTALLED_APPS = (
'rest_framework',
'myproject',
'django.contrib.sites',
'django.contrib.staticfiles',
# required by Django 1.9
'django.contrib.auth',
'django.contrib.contenttypes',
)
MIDDLEWARE_CLASSES = (
)
REST_FRAMEWORK = {
'DEFAULT_PERMISSION_CLASSES': ('rest_framework.permissions.AllowAny',),
'PAGINATE_BY': 10
}
ROOT_URLCONF = 'myproject.urls'
WSGI_APPLICATION = 'myproject.wsgi.application'
# Localization ant timezone settings
TIME_ZONE = 'UTC'
USE_TZ = True
CELERY_ENABLE_UTC = True
CELERY_TIMEZONE = "UTC"
LANGUAGE_CODE = 'en-us'
USE_I18N = True
USE_L10N = True
# Static files (CSS, JavaScript, Images)
# https://docs.djangoproject.com/en/1.7/howto/static-files/
STATIC_URL = '/static/'
# Database Condocker-composeuration
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.postgresql_psycopg2',
'NAME': os.environ.get('DB_ENV_DB', 'postgres'),
'USER': os.environ.get('DB_ENV_POSTGRES_USER', 'postgres'),
'PASSWORD': os.environ.get('DB_ENV_POSTGRES_PASSWORD', 'postgres'),
'HOST': os.environ.get('DB_PORT_5432_TCP_ADDR', 'db'),
'PORT': os.environ.get('DB_PORT_5432_TCP_PORT', ''),
},
}
# Redis
REDIS_PORT = 6379
REDIS_DB = 0
REDIS_HOST = os.environ.get('REDIS_PORT_6379_TCP_ADDR', 'redis')
RABBIT_HOSTNAME = os.environ.get('RABBIT_PORT_5672_TCP', 'rabbit')
if RABBIT_HOSTNAME.startswith('tcp://'):
RABBIT_HOSTNAME = RABBIT_HOSTNAME.split('//')[1]
BROKER_URL = os.environ.get('BROKER_URL',
'')
if not BROKER_URL:
BROKER_URL = 'amqp://{user}:{password}@{hostname}/{vhost}/'.format(
user=os.environ.get('RABBIT_ENV_USER', 'admin'),
password=os.environ.get('RABBIT_ENV_RABBITMQ_PASS', 'mypass'),
hostname=RABBIT_HOSTNAME,
vhost=os.environ.get('RABBIT_ENV_VHOST', ''))
# We don't want to have dead connections stored on rabbitmq, so we have to negotiate using heartbeats
BROKER_HEARTBEAT = '?heartbeat=30'
if not BROKER_URL.endswith(BROKER_HEARTBEAT):
BROKER_URL += BROKER_HEARTBEAT
BROKER_POOL_LIMIT = 1
BROKER_CONNECTION_TIMEOUT = 10
# Celery configuration
# configure queues, currently we have only one
CELERY_DEFAULT_QUEUE = 'default'
CELERY_QUEUES = (
Queue('default', Exchange('default'), routing_key='default'),
)
# Sensible settings for celery
CELERY_ALWAYS_EAGER = False
CELERY_ACKS_LATE = True
CELERY_TASK_PUBLISH_RETRY = True
CELERY_DISABLE_RATE_LIMITS = False
# By default we will ignore result
# If you want to see results and try out tasks interactively, change it to False
# Or change this setting on tasks level
CELERY_IGNORE_RESULT = True
CELERY_SEND_TASK_ERROR_EMAILS = False
CELERY_TASK_RESULT_EXPIRES = 600
# Set redis as celery result backend
CELERY_RESULT_BACKEND = 'redis://%s:%d/%d' % (REDIS_HOST, REDIS_PORT, REDIS_DB)
CELERY_REDIS_MAX_CONNECTIONS = 1
# Don't use pickle as serializer, json is much safer
CELERY_TASK_SERIALIZER = "json"
CELERY_ACCEPT_CONTENT = ['application/json']
CELERYD_HIJACK_ROOT_LOGGER = False
CELERYD_PREFETCH_MULTIPLIER = 1
CELERYD_MAX_TASKS_PER_CHILD = 1000
這些設定將配置Django應用程式,以便它能夠發現PostgreSQL資料庫,Redis快取和Celery。
現在,是時候將Celery連線到應用程式。建立一個檔案celeryconf.py並貼上這段程式碼:
myproject/celeryconf.py
import os
from celery import Celery
from django.conf import settings
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "myproject.settings")
app = Celery('myproject')
CELERY_TIMEZONE = 'UTC'
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
這應該足以將Celery連線到我們的應用程式,因此run_X指令碼將起作用。你可以在這裡閱讀更多關於Django和Celery的第一步。
定義任務
Celery在每個Django應用程式的tasks.py檔案中查詢任務。通常,任務是使用裝飾器或通過繼承Celery Task Class建立的。
以下是如何使用裝飾器建立任務:
@app.task
def power(n):
"""Return 2 to the n'th power"""
return 2 ** n
以下是如何通過繼承Celery任務類來建立任務:
class PowerTask(app.Task):
def run(self, n):
"""Return 2 to the n'th power"""
return 2 ** n
兩者都很好,適合稍微不同的用例。
myproject/tasks.py
from functools import wraps
from myproject.celeryconf import app
from .models import Job
# decorator to avoid code duplication
def update_job(fn):
"""Decorator that will update Job with result of the function"""
# wraps will make the name and docstring of fn available for introspection
@wraps(fn)
def wrapper(job_id, *args, **kwargs):
job = Job.objects.get(id=job_id)
job.status = 'started'
job.save()
try:
# execute the function fn
result = fn(*args, **kwargs)
job.result = result
job.status = 'finished'
job.save()
except:
job.result = None
job.status = 'failed'
job.save()
return wrapper
# two simple numerical tasks that can be computationally intensive
@app.task
@update_job
def power(n):
"""Return 2 to the n'th power"""
return 2 ** n
@app.task
@update_job
def fib(n):
"""Return the n'th Fibonacci number.
"""
if n < 0:
raise ValueError("Fibonacci numbers are only defined for n >= 0.")
return _fib(n)
def _fib(n):
if n == 0 or n == 1:
return n
else:
return _fib(n - 1) + _fib(n - 2)
# mapping from names to tasks
TASK_MAPPING = {
'power': power,
'fibonacci': fib
}
為排程任務構建API
如果你的系統中有任務,你如何執行它們呢?在本節中,你將建立用於作業排程的使用者介面。在後端應用程式中,API將是你的使用者介面。我們為你的API 使用Django REST框架。
為了使其儘可能簡單,你的應用程式將只有一個模型並且只有一個ViewSet(具有許多HTTP方法的端點)。
在*myproject/models.py中建立名為Job的模型:
from django.db import models
class Job(models.Model):
"""Class describing a computational job"""
# currently, available types of job are:
TYPES = (
('fibonacci', 'fibonacci'),
('power', 'power'),
)
# list of statuses that job can have
STATUSES = (
('pending', 'pending'),
('started', 'started'),
('finished', 'finished'),
('failed', 'failed'),
)
type = models.CharField(choices=TYPES, max_length=20)
status = models.CharField(choices=STATUSES, max_length=20)
created_at = models.DateTimeField(auto_now_add=True)
updated_at = models.DateTimeField(auto_now=True)
argument = models.PositiveIntegerField()
result = models.IntegerField(null=True)
def save(self, *args, **kwargs):
"""Save model and if job is in pending state, schedule it"""
super(Job, self).save(*args, **kwargs)
if self.status == 'pending':
from .tasks import TASK_MAPPING
task = TASK_MAPPING[self.type]
task.delay(job_id=self.id, n=self.argument)
然後建立一個序列化器,檢視和URL配置來訪問它。
myproject/serializers.py
from rest_framework import serializers
from .models import Job
class JobSerializer(serializers.HyperlinkedModelSerializer):
class Meta:
model = Job
myproject/views.py
from rest_framework import mixins, viewsets
from .models import Job
from .serializers import JobSerializer
class JobViewSet(mixins.CreateModelMixin,
mixins.ListModelMixin,
mixins.RetrieveModelMixin,
viewsets.GenericViewSet):
"""
API endpoint that allows jobs to be viewed or created.
"""
queryset = Job.objects.all()
serializer_class = JobSerializer
myproject/urls.py
from django.conf.urls import url, include
from rest_framework import routers
from myproject import views
router = routers.DefaultRouter()
# register job endpoint in the router
router.register(r'jobs', views.JobViewSet)
# Wire up our API using automatic URL routing.
# Additionally, we include login URLs for the browsable API.
urlpatterns = [
url(r'^', include(router.urls)),
url(r'^api-auth/', include('rest_framework.urls', namespace='rest_framework'))
]
為了完整性,還有myproject/wsgi.py,為專案定義WSGI配置:
import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "myproject.settings")
from django.core.wsgi import get_wsgi_application
application = get_wsgi_application()
和manage.py
#!/usr/bin/env python
import os
import sys
if __name__ == "__main__":
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "myproject.settings")
from django.core.management import execute_from_command_line
execute_from_command_line(sys.argv)
將_init_.py留空。
就這樣。呃…很多程式碼。幸運的是,所有的東西都在GitHub上,所以你可以把它分開。
執行設定
由於所有內容都是從Docker Compose執行的,因此在嘗試啟動應用程式之前,請確保已經安裝了Docker和Docker Compose:
$ cd /path/to/myproject/where/is/docker-compose.yml
$ docker-compose build
$ docker-compose up
最後一個命令將啟動五個不同的容器,所以開始使用你的API並在此期間與Celery一起愉快玩耍。
訪問API
在瀏覽器中導航到127.0.0.1:8000
以瀏覽你的API並安排一些工作。
擴大規模
目前,我們只有每個容器的一個例項。我們可以通過docker-compose ps
命令獲取有關我們的一組容器的資訊。
$ docker-compose ps
Name Command State Ports
------------------------------------------------------------------------------------------------------------------------------------------------------
dockerdjangocelery_db_1 /docker-entrypoint.sh postgres Up 0.0.0.0:5432->5432/tcp
dockerdjangocelery_rabbit_1 /docker-entrypoint.sh rabb ... Up 0.0.0.0:15672->15672/tcp, 25672/tcp, 4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp
dockerdjangocelery_redis_1 /entrypoint.sh redis-server Up 6379/tcp
dockerdjangocelery_web_1 ./run_web.sh Up 0.0.0.0:8000->8000/tcp
dockerdjangocelery_worker_1 ./run_celery.sh Up
使用docker-compose擴充套件容器非常容易。 只需將容器名稱和容器數量使用docker-compose scale
命令即可:
$ docker-compose scale worker=5
Creating and starting dockerdjangocelery_worker_2 ... done
Creating and starting dockerdjangocelery_worker_3 ... done
Creating and starting dockerdjangocelery_worker_4 ... done
Creating and starting dockerdjangocelery_worker_5 ... done
輸出表明docker-compose為我們建立了另外四個worker容器。 我們可以再次用docker-compose ps
命令檢視它:
$ docker-compose ps
Name Command State Ports
------------------------------------------------------------------------------------------------------------------------------------------------------
dockerdjangocelery_db_1 /docker-entrypoint.sh postgres Up 0.0.0.0:5432->5432/tcp
dockerdjangocelery_rabbit_1 /docker-entrypoint.sh rabb ... Up 0.0.0.0:15672->15672/tcp, 25672/tcp, 4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp
dockerdjangocelery_redis_1 /entrypoint.sh redis-server Up 6379/tcp
dockerdjangocelery_web_1 ./run_web.sh Up 0.0.0.0:8000->8000/tcp
dockerdjangocelery_worker_1 ./run_celery.sh Up
dockerdjangocelery_worker_2 ./run_celery.sh Up
dockerdjangocelery_worker_3 ./run_celery.sh Up
dockerdjangocelery_worker_4 ./run_celery.sh Up
dockerdjangocelery_worker_5 ./run_celery.sh Up
你會看到那裡有五位強大的Celery worker。太好了!
總結
恭喜!您只需將Django與Celery結合即可構建分散式非同步計算系統。我認為你會同意建立一個API非常容易,甚至更容易擴充套件它的工作人員!然而,生活對我們來說並不總是那麼好,有時我們需要排除故障。
貢獻
Justyna Ilczuk撰寫的原創文章,由MichałKobus更新。