Source From Here
Preface
在程序的運行過程中,我們經常會碰到一些耗時耗資源的操作,為了避免它們阻塞主程序的運行,我們經常會採用多線程或異步任務。比如,在Web 開發中,對新用戶的註冊,我們通常會給他發一封激活郵件,而發郵件是個IO 阻塞式任務,如果直接把它放到應用當中,就需要等郵件發出去之後才能進行下一步操作,此時用戶只能等待再等待。更好的方式是在業務邏輯中觸發一個發郵件的異步任務,而主程序可以繼續往下運行。
Celery 是一個強大的分佈式任務隊列,它可以讓任務的執行完全脫離主程序,甚至可以被分配到其他主機上運行。我們通常使用它來實現異步任務(async task)和定時任務(crontab)。它的架構組成如下圖:
可以看到,Celery 主要包含以下幾個模塊:
* 任務模塊
* 消息中間件 Broker
* 任務執行單元 Worker
* 任務結果存儲 Backend
異步任務
使用 Celery 實現異步任務主要包含三個步驟:
快速入門
為了簡單起見,對於 Broker 和 Backend,這裡都使用 Redis。在運行下面的例子之前,請確保 Redis 已正確安裝,並開啟 Redis 服務,當然,Celery 也是要安裝的。可以使用下面的命令來安裝celery 及相關依賴:
Install & Config Radis
1. Add the EPEL repository, and update YUM to confirm your change:
2. Install Redis:
3. Start Redis:
4. Optional: To automatically start Redis on boot:
創建 Celery 實例
將下面的代碼保存為文件 tasks.py:
上面的代碼做了幾件事:
啟動 Celery Worker
在當前目錄,使用如下方式啟動 Celery Worker:
其中:
在生產環境中,我們通常會使用 Supervisor 來控制 Celery Worker 進程。啟動成功後,控制台會顯示如下輸出:
調用任務
現在,我們可以在應用程序中使用 delay() 或 apply_async() 方法來調用任務。在當前目錄打開 Python 控制台,輸入以下代碼:
在上面,我們從 tasks.py 文件中導入了 add 任務對象,然後使用 delay() 方法將任務發送到消息中間件(Broker),Celery Worker進程監控到該任務後,就會進行執行。我們將窗口切換到
Worker 的啟動窗口,會看到多了兩條日誌:
這說明任務已經被調度並執行成功。另外,我們如果想獲取執行後的結果,可以這樣做:
在上面,我們是在 Python 的環境中調用任務。事實上,我們通常在應用程序中調用任務。比如,將下面的代碼保存為 client.py:
運行命令
python client.py,可以看到,雖然任務函數 add 需要等待 5 秒才返回執行結果,但由於它是一個異步任務,不會阻塞當前的主程序,因此主程序會往下執行 print 語句,打印出結果。
使用配置
在上面的例子中,我們直接把 Broker 和 Backend 的配置寫在了程序當中,更好的做法是將配置項統一寫入到一個配置文件中,通常我們將該文件命名為 celeryconfig.py。Celery 的配置比較多,可以在 官方文檔 查詢每個配置項的含義。下面,我們再看一個例子。項目結構如下:
__init__.py 代碼如下:
celeryconfig.py 代碼如下:
task1.py 代碼如下:
task2.py 代碼如下:
client.py 代碼如下:
現在,讓我們啟動 Celery Worker 進程,在項目的根目錄下執行下面命令:
接著,運行 python client.py,它會發送兩個異步任務到 Broker,在 Worker 的窗口我們可以看到如下輸出:
delay 和 apply_async
在前面的例子中,我們使用 delay() 或 apply_async() 方法來調用任務。事實上,delay 方法封裝了 apply_async,如下:
也就是說,delay 是使用 apply_async 的快捷方式。apply_async 支持更多的參數,它的一般形式如下:
apply_async 常用的參數如下:
* countdown:指定多少秒後執行任務
* eta (estimated time of arrival):指定任務被調度的具體時間,參數類型是
datetime
* expires:任務過期時間,參數類型可以是 int,也可以是
datetime
更多的參數列表可以在
官方文檔 中查看。
定時任務
Celery 除了可以執行 異步任務,也支持執行 週期性任務(Periodic Tasks),或者說定時任務。Celery Beat 進程通過讀取配置文件的內容,週期性地將定時任務發往任務隊列。讓我們看看例子,項目結構如下:
__init__.py 代碼如下:
celeryconfig.py 代碼如下:
task1.py 代碼如下:
task2.py 代碼如下:
現在,讓我們啟動 Celery Worker 進程,在項目的根目錄下執行下面命令:
接著,啟動 Celery Beat 進程,定時將任務發送到 Broker,在項目根目錄下執行下面命令:
之後,在Worker窗口我們可以看到,任務 task1 每30秒執行一次,而 task2 每天早上9點50分執行一次。在上面,我們用兩個命令啟動了 Worker 進程和 Beat 進程,我們也可以將它們放在一個命令中:
Celery 週期性任務也有多個配置項,可參考 官方文檔。
Supplement
* Celery Doc - First Steps with Celery
* Getting started with Celery and RabbitMQ
* 資料庫的好夥伴:Redis
* How To Install and Use Redis
* Install and Configure Redis on CentOS 7
Preface
在程序的運行過程中,我們經常會碰到一些耗時耗資源的操作,為了避免它們阻塞主程序的運行,我們經常會採用多線程或異步任務。比如,在Web 開發中,對新用戶的註冊,我們通常會給他發一封激活郵件,而發郵件是個IO 阻塞式任務,如果直接把它放到應用當中,就需要等郵件發出去之後才能進行下一步操作,此時用戶只能等待再等待。更好的方式是在業務邏輯中觸發一個發郵件的異步任務,而主程序可以繼續往下運行。
Celery 是一個強大的分佈式任務隊列,它可以讓任務的執行完全脫離主程序,甚至可以被分配到其他主機上運行。我們通常使用它來實現異步任務(async task)和定時任務(crontab)。它的架構組成如下圖:
可以看到,Celery 主要包含以下幾個模塊:
* 任務模塊
* 消息中間件 Broker
* 任務執行單元 Worker
* 任務結果存儲 Backend
異步任務
使用 Celery 實現異步任務主要包含三個步驟:
快速入門
為了簡單起見,對於 Broker 和 Backend,這裡都使用 Redis。在運行下面的例子之前,請確保 Redis 已正確安裝,並開啟 Redis 服務,當然,Celery 也是要安裝的。可以使用下面的命令來安裝celery 及相關依賴:
Install & Config Radis
1. Add the EPEL repository, and update YUM to confirm your change:
2. Install Redis:
3. Start Redis:
4. Optional: To automatically start Redis on boot:
創建 Celery 實例
將下面的代碼保存為文件 tasks.py:
- # -*- coding: utf-8 -*-
- import time
- from celery import Celery
- broker = 'redis://127.0.0.1:6379'
- backend = 'redis://127.0.0.1:6379/0'
- app = Celery('my_task', broker=broker, backend=backend)
- @app.task
- def add(x, y):
- time.sleep(5) # 模拟耗时操作
- return x + y
啟動 Celery Worker
在當前目錄,使用如下方式啟動 Celery Worker:
其中:
在生產環境中,我們通常會使用 Supervisor 來控制 Celery Worker 進程。啟動成功後,控制台會顯示如下輸出:
調用任務
現在,我們可以在應用程序中使用 delay() 或 apply_async() 方法來調用任務。在當前目錄打開 Python 控制台,輸入以下代碼:
在上面,我們從 tasks.py 文件中導入了 add 任務對象,然後使用 delay() 方法將任務發送到消息中間件(Broker),Celery Worker進程監控到該任務後,就會進行執行。我們將窗口切換到
Worker 的啟動窗口,會看到多了兩條日誌:
這說明任務已經被調度並執行成功。另外,我們如果想獲取執行後的結果,可以這樣做:
在上面,我們是在 Python 的環境中調用任務。事實上,我們通常在應用程序中調用任務。比如,將下面的代碼保存為 client.py:
- # -*- coding: utf-8 -*-
- from tasks import add
- # 异步任务
- add.delay(2, 8)
- print 'hello world'
使用配置
在上面的例子中,我們直接把 Broker 和 Backend 的配置寫在了程序當中,更好的做法是將配置項統一寫入到一個配置文件中,通常我們將該文件命名為 celeryconfig.py。Celery 的配置比較多,可以在 官方文檔 查詢每個配置項的含義。下面,我們再看一個例子。項目結構如下:
- celery_demo # 项目根目录
- ├── celery_app # 存放 celery 相关文件
- │ ├── __init__.py
- │ ├── celeryconfig.py # 配置文件
- │ ├── task1.py # 任务文件 1
- │ └── task2.py # 任务文件 2
- └── client.py # 应用程序
- # -*- coding: utf-8 -*-
- from celery import Celery
- app = Celery('demo') # 创建 Celery 实例
- app.config_from_object('celery_app.celeryconfig') # 通过 Celery 实例加载配置模块
- BROKER_URL = 'redis://127.0.0.1:6379' # 指定 Broker
- CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0' # 指定 Backend
- CELERY_TIMEZONE='Asia/Shanghai' # 指定时区,默认是 UTC
- # CELERY_TIMEZONE='UTC'
- CELERY_IMPORTS = ( # 指定导入的任务模块
- 'celery_app.task1',
- 'celery_app.task2'
- )
- import time
- from celery_app import app
- @app.task
- def add(x, y):
- time.sleep(2)
- return x + y
- import time
- from celery_app import app
- @app.task
- def multiply(x, y):
- time.sleep(2)
- return x * y
- # -*- coding: utf-8 -*-
- from celery_app import task1
- from celery_app import task2
- task1.add.apply_async(args=[2, 8]) # 也可用 task1.add.delay(2, 8)
- task2.multiply.apply_async(args=[3, 7]) # 也可用 task2.multiply.delay(3, 7)
- print('hello world')
接著,運行 python client.py,它會發送兩個異步任務到 Broker,在 Worker 的窗口我們可以看到如下輸出:
delay 和 apply_async
在前面的例子中,我們使用 delay() 或 apply_async() 方法來調用任務。事實上,delay 方法封裝了 apply_async,如下:
- def delay(self, *partial_args, **partial_kwargs):
- """Shortcut to :meth:`apply_async` using star arguments."""
- return self.apply_async(partial_args, partial_kwargs)
- apply_async(args=(), kwargs={}, route_name=None, **options)
* countdown:指定多少秒後執行任務
- task1.apply_async(args=(2, 3), countdown=5) # 5 秒后执行任务
- from datetime import datetime, timedelta
- # 当前 UTC 时间再加 10 秒后执行任务
- task1.multiply.apply_async(args=[3, 7], eta=datetime.utcnow() + timedelta(seconds=10))
- task1.multiply.apply_async(args=[3, 7], expires=10) # 10 秒后过期
定時任務
Celery 除了可以執行 異步任務,也支持執行 週期性任務(Periodic Tasks),或者說定時任務。Celery Beat 進程通過讀取配置文件的內容,週期性地將定時任務發往任務隊列。讓我們看看例子,項目結構如下:
- celery_demo # 项目根目录
- ├── celery_app # 存放 celery 相关文件
- ├── __init__.py
- ├── celeryconfig.py # 配置文件
- ├── task1.py # 任务文件
- └── task2.py # 任务文件
- # -*- coding: utf-8 -*-
- from celery import Celery
- app = Celery('demo')
- app.config_from_object('celery_app.celeryconfig')
- # -*- coding: utf-8 -*-
- from datetime import timedelta
- from celery.schedules import crontab
- # Broker and Backend
- BROKER_URL = 'redis://127.0.0.1:6379'
- CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'
- # Timezone
- CELERY_TIMEZONE='Asia/Shanghai' # 指定时区,不指定默认为 'UTC'
- # CELERY_TIMEZONE='UTC'
- # import
- CELERY_IMPORTS = (
- 'celery_app.task1',
- 'celery_app.task2'
- )
- # schedules
- CELERYBEAT_SCHEDULE = {
- 'add-every-30-seconds': {
- 'task': 'celery_app.task1.add',
- 'schedule': timedelta(seconds=30), # 每 30 秒执行一次
- 'args': (5, 8) # 任务函数参数
- },
- 'multiply-at-some-time': {
- 'task': 'celery_app.task2.multiply',
- 'schedule': crontab(hour=9, minute=50), # 每天早上 9 点 50 分执行一次
- 'args': (3, 7) # 任务函数参数
- }
- }
- import time
- from celery_app import app
- @app.task
- def add(x, y):
- time.sleep(2)
- return x + y
- import time
- from celery_app import app
- @app.task
- def multiply(x, y):
- time.sleep(2)
- return x * y
接著,啟動 Celery Beat 進程,定時將任務發送到 Broker,在項目根目錄下執行下面命令:
之後,在Worker窗口我們可以看到,任務 task1 每30秒執行一次,而 task2 每天早上9點50分執行一次。在上面,我們用兩個命令啟動了 Worker 進程和 Beat 進程,我們也可以將它們放在一個命令中:
Celery 週期性任務也有多個配置項,可參考 官方文檔。
Supplement
* Celery Doc - First Steps with Celery
* Getting started with Celery and RabbitMQ
* 資料庫的好夥伴:Redis
* How To Install and Use Redis
* Install and Configure Redis on CentOS 7
沒有留言:
張貼留言