使用SQS与Celery


10个月前,当我加入PCH/Media时,我被分配到一个倡议,以提高现有Python Django服务的可扩展性和可维护性,特别是使用任务队列。任务队列管理后台工作,这些长期运行的工作极大地降低了HTTP请求-响应周期的性能。

在现有的系统中,我们将数据保存到数据库,作为HTTP请求-响应周期的一部分,这可能很耗时。除了降低HTTP响应的性能外,如果我们想安排数据库维护,就需要我们关闭服务--可能会丢失关键数据,从合规的角度看,这是不可接受的。

现有的系统。

Existing System

通过实施任务队列,我们将能够解耦组件,根据需要扩展处理,并在没有服务停机的情况下执行数据库维护。

建议的系统。

Image title

Celery和SQS

我的第一个任务是决定一个任务队列和一个消息传输系统。

我审查了几个任务队列,包括Celery、RQ、Huey等。与RQ和Huey相比,Celery对多个消息代理的支持、其广泛的文档和极其活跃的用户社区让我迷上了它。  

我们的基础设施完全在Amazon Web Services上,这使得选择SQS的消息传输变得非常简单明了。Celery支持SQS作为其消息中介之一,因此使Celery和SQS成为我的明显选择。

Celery RQ Huey
轻量级 取决于我们需要什么功能
支持SQS作为消息代理 不是 不是
文档 详细 有限 有限
用户社区 非常活跃 中等活跃 不活跃
可配置性 极其可配置 有限 最少

我在实施过程中注意到,尽管Celery有大量的文档。但对于初学者来说,它可能会让人不知所措。尽管他们在解释如何设置Celery方面做了出色的工作,但我不得不在使用SQS作为消息代理时克服无数的挑战。这是他们实验性的传输实现之一,我经常遇到没有记录的功能和限制。我在谷歌上也找不到很多帮助。

我克服了许多障碍,最终实现了精确地按计划工作,我对此感到非常自豪。这篇文章描述了如何将Celery和SQS结合起来使用,我希望我的经验能够对其他开发者有所帮助。

配置

下面是如何让Celery在Amazon Linux上启动和运行的完整步骤说明。  

安装依赖性

Celery可以用pip安装。 Celery需要 boto 库来与Amazon SQS(注意:不是boto3)进行通信,这也可以用pip来安装。  

$pip install celery
$pip install boto

在Linux中配置Celery

使用下面的命令创建用户和组 celery " ,让celery工作者运行。

adduser celery

这里安装Celeryd init脚本 。将Celeryd init.d 脚本复制到。

/etc/init.d/celery

修改权限,如下所示。

chmod 755 /etc/init.d/celery

创建一个像下面这样的celeryd配置脚本。

# Name of nodes to start
CELERYD_NODES="name_of_node(s)"
# Absolute or relative path to the 'celery' command:
CELERY_BIN="/usr/local/bin/celery"
# Fully qualified app instance to use
CELERY_APP="name_of_your_service"
# Where to chdir at start (path to your service).
CELERYD_CHDIR="/var/webapps/name_of_your_service"
# Extra arguments to celeryd
CELERYD_OPTS="--time-limit=300 --concurrency=8"
# Name of the celery config module.
CELERY_CONFIG_MODULE="celeryconfig"
# %n will be replaced with the node name.
CELERYD_LOG_FILE="/var/log/celery/%n.log"
CELERYD_PID_FILE="/var/run/celery/%n.pid"
# Workers should run as an unprivileged user.
CELERYD_USER="celery"
CELERYD_GROUP="celery"
# Name of the projects settings module.
export DJANGO_SETTINGS_MODULE="name_of_your_service.settings"

将你上面创建的配置文件复制到。       

/etc/default/celery

修改权限,如下所示。   

chmod 644 /etc/default/celeryd

/var/run//var/log/ 处创建一个目录 celery ,并按如下所示修改权限。

mkdir /var/run/celery
mkdir /var/log/celery
chmod 755 /var/run/celery
chmod 755 /var/log/celery
chown celery:celery /var/run/celery
chown celery:celery /var/log/celery

在Celery设置成功后,导航到 /etc/init.d 目录,用下面的命令检查Celery的状态。

cd /etc/init.d
sudo service celeryd status

如果它还没有运行,使用下面的命令启动celery。

sudo celeryd start

如果启动celery有任何问题,可以用下面的命令看到verbose信息。  

sudo celeryd dryrun

可以用下面的命令来停止Celery。

sudo celeryd stop

在Django中配置Celery

在settings.py中加入下面的Celery设置,在Django中用Celery配置SQS。

# CELERY SETTINGS
BROKER_URL = “sqs://aws_access_key_id:aws_secret_access_key@”
# It is not a good practice to embed AWS credentials here. 
# More information on this below.
BROKER_TRANSPORT_OPTIONS = {
    ‘region’: ‘us-east-1’, 
    ‘polling_interval’: 60, 
    # Number of seconds to sleep between unsuccessful polls, 
    # default value is 30 seconds
}
CELERY_DEFAULT_QUEUE = ‘name_of_the_default_queue_that_you_created_in_AWS’
CELERY_ACCEPT_CONTENT = [‘application/json’]
CELERY_TASK_SERIALIZER = ‘json’
CELERY_RESULT_SERIALIZER = ‘json’
CELERY_CONTENT_ENCODING = ‘utf-8’
CELERY_ENABLE_REMOTE_CONTROL = False 
CELERY_SEND_EVENTS = False
# Reason why we need the above is explained in Configuration Gotchas section.
SQS_QUEUE_NAME = ‘name_of_the_default_queue_that_you_created_in_AWS’
TASK_APPS = (
   'name_of_the_module_containing_the_celery_task_definition',
)

使用Celery创建任务

接下来,创建一个celery的实例,它将被用作创建任务、管理工人等的入口,而且必须能够让其他模块导入它。

在你的Django项目中,在根目录下创建一个名为 celery_tasks.py 的文件,包含以下内容。

from __future__ import absolute_import
from django.conf import settings
from celery import Celery
import os

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', "name_of_your_service.settings")
# argument to Celery is name of the current module
app = Celery('name_of_your_service')
# Loads configuration from a configuration object
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.TASK_APPS)

定义一个Celery任务

Celery带有装饰器,使创建任务不费力气。

你的第一个Celery任务可能看起来像。

from name_of_your_service.celery_tasks import app

@app.task()
def my_task(fill_me_in):
    do_something_here

写入任务队列

Celery使写入任务队列变得很容易,从而将任务延迟到一个工作者可以从队列中取出。有一堆选项用于 创建一个新的任务 ,下面的例子使用 delay ,这是一个很好的快捷方式。

def some_block():
    my_task.delay()

配置问题

这些是需要注意的几个问题。

SQS凭证

我们可以使用环境变量或在IAM角色中设置登录凭证,在这种情况下,上面的经纪人URL可以被设置为只是 s qs:// 。  

为EC2实例创建一个IAM角色,以便能够从SQS访问任务,而不是将我们的AWS凭证存储在盒子上,或者通过我们的代码中的API调用来传递,这是很安全的,因为我们的代码通常会被检查到一个代码库。

PID箱

在上面的配置中,我们指定了两个在文档中难以找到的变量。   CELERY_ENABLE_REMOTE_CONTROL CELERY_SEND_EVENTS

Celery使用一个叫做pidbox的广播消息系统来支持fanout。Fanout是一种交换类型,可以用来将所有消息广播到所有队列。它为每个节点创建一个pidbox队列,这个队列可以根据你的应用程序的规模而变化,在SQS中,这意味着将为AWS中的每个节点创建一个新队列,这是我们不希望看到的。我们可以通过在settings.py文件中添加以下两个变量来安全地禁用这一点,以减少一些pidbox所产生的混乱。

CELERY_ENABLE_REMOTE_CONTROL = False
CELERY_SEND_EVENTS = False

CloudWatch监控

使用SQS的好处之一是我们可以使用CloudWatch监控我们的队列。CloudWatch有几个指标,我们可以跟踪这些指标,以确保消息被及时写入和从队列中读取。关于SQS的CloudWatch指标的详细列表可以在这里找到 。  

我们可以使用CloudWatch监测的几个有用的指标包括。

  • 可见消息的大致数量 :可从队列中检索的消息的数量。

  • 发送的消息数量 : 被添加到队列中的消息数量。

  • 收到的消息数 :从队列中返回的消息数。

Monitoring SQS using CloudWatch

上述指标使我们能够监测队列的性能,我们可以在CloudWatch中设置警报,提醒我们注意任何异常行为。

在克服了这些障碍之后,包括作为主要挑战的配置结点,我们的队列范式已经运行得非常好了。我们已经能够使用容纳超过一百万个请求的队列,通过自动扩展后端处理器进行处理,并且能够在零停机时间内进行数据库维护。

原文链接: Using SQS With Celery