2018年1月8日 星期一

[ Python 文章收集 ] Celery - Distributed Task Queue

Source From Here
Preface
在程序的運行過程中,我們經常會碰到一些耗時耗資源的操作,為了避免它們阻塞主程序的運行,我們經常會採用多線程或異步任務。比如,在Web 開發中,對新用戶的註冊,我們通常會給他發一封激活郵件,而發郵件是個IO 阻塞式任務,如果直接把它放到應用當中,就需要等郵件發出去之後才能進行下一步操作,此時用戶只能等待再等待。更好的方式是在業務邏輯中觸發一個發郵件的異步任務,而主程序可以繼續往下運行。

Celery 是一個強大的分佈式任務隊列,它可以讓任務的執行完全脫離主程序,甚至可以被分配到其他主機上運行。我們通常使用它來實現異步任務(async task)和定時任務(crontab)。它的架構組成如下圖:

可以看到,Celery 主要包含以下幾個模塊:
* 任務模塊
包含 異步任務 和 定時任務。其中,異步任務通常在業務邏輯中被觸發並發往任務隊列,而定時任務由 Celery Beat 進程週期性地將任務發往任務隊列

* 消息中間件 Broker
Broker,即為任務調度隊列,接收任務生產者發來的消息(即任務),將任務存入隊列。Celery本身不提供隊列服務,官方推薦使用 RabbitMQ 和 Redis 等。

* 任務執行單元 Worker
Worker 是執行任務的處理單元,它實時監控消息隊列,獲取隊列中調度的任務,並執行它。

* 任務結果存儲 Backend
Backend 用於存儲任務的執行結果,以供查詢。同消息中間件一樣,存儲也可使用 RabbitMQRedis 和 MongoDB 等。

異步任務
使用 Celery 實現異步任務主要包含三個步驟:
1. 創建一個 Celery 實例
2. 啟動 Celery Worker
3. 應用程序調用異步任務

快速入門
為了簡單起見,對於 Broker 和 Backend,這裡都使用 Redis。在運行下面的例子之前,請確保 Redis 已正確安裝,並開啟 Redis 服務,當然,Celery 也是要安裝的。可以使用下面的命令來安裝celery 及相關依賴:
# pip install flask celery pandas numpy redis


Install & Config Radis
1. Add the EPEL repository, and update YUM to confirm your change:
# sudo yum install epel-release
# sudo yum update

2. Install Redis:
# sudo yum install redis

3. Start Redis:
# sudo systemctl start redis
# netstat -tunlp | grep redis
tcp 0 0 127.0.0.1:6379 0.0.0.0:* LISTEN 9291/redis-server 1

4. Optional: To automatically start Redis on boot:
# sudo systemctl enable redis


創建 Celery 實例
將下面的代碼保存為文件 tasks.py
  1. # -*- coding: utf-8 -*-  
  2.   
  3. import time  
  4. from celery import Celery  
  5.   
  6. broker = 'redis://127.0.0.1:6379'  
  7. backend = 'redis://127.0.0.1:6379/0'  
  8.   
  9. app = Celery('my_task', broker=broker, backend=backend)  
  10.   
  11. @app.task  
  12. def add(x, y):  
  13.     time.sleep(5)     # 模拟耗时操作  
  14.     return x + y  
上面的代碼做了幾件事:
1. 創建了一個 Celery 實例 app,名稱為 my_task;
2. 指定消息中間件用 redis,URL 為 redis://127.0.0.1:6379;
3. 指定存儲用 redis,URL為 redis://127.0.0.1:6379/0;
4. 創建了一個 Celery 任務 add,當函數被 @app.task 裝飾後,就成為可被 Celery 調度的任務;

啟動 Celery Worker
在當前目錄,使用如下方式啟動 Celery Worker:
// -A APP--app APP
// --loglevel LOGLEVEL
// Logging level, choose between DEBUG, INFO, WARNING,
// ERROR, CRITICAL, or FATAL.

# celery worker -A tasks --loglevel=info

其中:
* 參數 -A 指定了Celery 實例的位置,本例是在 tasks.py 中,Celery 會自動在該文件中尋找 Celery 對象實例,當然,我們也可以自己指定,在本例,使用 -A tasks.app
* 參數 --loglevel 指定了日誌級別,默認為 warning,也可以使用 -l info 來表示;

在生產環境中,我們通常會使用 Supervisor 來控制 Celery Worker 進程。啟動成功後,控制台會顯示如下輸出:

調用任務
現在,我們可以在應用程序中使用 delay() 或 apply_async() 方法來調用任務。在當前目錄打開 Python 控制台,輸入以下代碼:
>>> from tasks import add
>>> add.delay(2, 8)

在上面,我們從 tasks.py 文件中導入了 add 任務對象,然後使用 delay() 方法將任務發送到消息中間件(Broker),Celery Worker進程監控到該任務後,就會進行執行。我們將窗口切換到
Worker 的啟動窗口,會看到多了兩條日誌:
[2018-01-08 13:30:02,773: INFO/MainProcess] Received task: tasks.add[243c708b-3a5f-459d-a980-e3de83d98978]
[2018-01-08 13:30:07,785: INFO/ForkPoolWorker-1] Task tasks.add[243c708b-3a5f-459d-a980-e3de83d98978] succeeded in 5.0100485370494425s: 10

這說明任務已經被調度並執行成功。另外,我們如果想獲取執行後的結果,可以這樣做:
>>> result = add.delay(2, 6)
>>> result.ready() # 使用 ready() 判断任务是否执行完毕
True
>>> result.get() # 使用 get() 获取任务结果
8

在上面,我們是在 Python 的環境中調用任務。事實上,我們通常在應用程序中調用任務。比如,將下面的代碼保存為 client.py:
  1. # -*- coding: utf-8 -*-  
  2.   
  3. from tasks import add  
  4.   
  5. # 异步任务  
  6. add.delay(28)  
  7.   
  8. print 'hello world'  
運行命令 python client.py,可以看到,雖然任務函數 add 需要等待 5 秒才返回執行結果,但由於它是一個異步任務,不會阻塞當前的主程序,因此主程序會往下執行 print 語句,打印出結果。

使用配置
在上面的例子中,我們直接把 Broker 和 Backend 的配置寫在了程序當中,更好的做法是將配置項統一寫入到一個配置文件中,通常我們將該文件命名為 celeryconfig.py。Celery 的配置比較多,可以在 官方文檔 查詢每個配置項的含義。下面,我們再看一個例子。項目結構如下:
  1. celery_demo                    # 项目根目录  
  2.     ├── celery_app             # 存放 celery 相关文件  
  3.     │   ├── __init__.py  
  4.     │   ├── celeryconfig.py    # 配置文件  
  5.     │   ├── task1.py           # 任务文件 1  
  6.     │   └── task2.py           # 任务文件 2  
  7.     └── client.py              # 应用程序  
__init__.py 代碼如下:
  1. # -*- coding: utf-8 -*-  
  2.   
  3. from celery import Celery  
  4.   
  5. app = Celery('demo')                                # 创建 Celery 实例  
  6. app.config_from_object('celery_app.celeryconfig')   # 通过 Celery 实例加载配置模块  
celeryconfig.py 代碼如下:
  1. BROKER_URL = 'redis://127.0.0.1:6379'               # 指定 Broker  
  2. CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'  # 指定 Backend  
  3.   
  4. CELERY_TIMEZONE='Asia/Shanghai'                     # 指定时区,默认是 UTC  
  5. # CELERY_TIMEZONE='UTC'                               
  6.   
  7. CELERY_IMPORTS = (                                  # 指定导入的任务模块  
  8.     'celery_app.task1',  
  9.     'celery_app.task2'  
  10. )  
task1.py 代碼如下:
  1. import time  
  2. from celery_app import app  
  3.   
  4. @app.task  
  5. def add(x, y):  
  6.     time.sleep(2)  
  7.     return x + y  
task2.py 代碼如下:
  1. import time  
  2. from celery_app import app  
  3.   
  4. @app.task  
  5. def multiply(x, y):  
  6.     time.sleep(2)  
  7.     return x * y  
client.py 代碼如下:
  1. # -*- coding: utf-8 -*-  
  2.   
  3. from celery_app import task1  
  4. from celery_app import task2  
  5.   
  6. task1.add.apply_async(args=[28])        # 也可用 task1.add.delay(28)  
  7. task2.multiply.apply_async(args=[37])   # 也可用 task2.multiply.delay(37)  
  8.   
  9. print('hello world')  
現在,讓我們啟動 Celery Worker 進程,在項目的根目錄下執行下面命令:
# celery -A celery_app worker --loglevel=info &

接著,運行 python client.py,它會發送兩個異步任務到 Broker,在 Worker 的窗口我們可以看到如下輸出:
[2018-01-08 19:22:59,123: INFO/MainProcess] Received task: celery_app.task1.add[76ee26ba-94d5-4827-9250-18a40d158fe0]
[2018-01-08 19:22:59,125: INFO/MainProcess] Received task: celery_app.task2.multiply[e9f6be34-6245-428f-8e51-1b431f33c261]
[2018-01-08 19:23:01,134: INFO/ForkPoolWorker-2] Task celery_app.task2.multiply[e9f6be34-6245-428f-8e51-1b431f33c261] succeeded in 2.0075598466210067s: 21
[2018-01-08 19:23:01,134: INFO/ForkPoolWorker-1] Task celery_app.task1.add[76ee26ba-94d5-4827-9250-18a40d158fe0] succeeded in 2.0062049366533756s: 10

delay 和 apply_async
在前面的例子中,我們使用 delay() 或 apply_async() 方法來調用任務。事實上,delay 方法封裝了 apply_async,如下:
  1. def delay(self, *partial_args, **partial_kwargs):  
  2.     """Shortcut to :meth:`apply_async` using star arguments."""  
  3.     return self.apply_async(partial_args, partial_kwargs)  
也就是說,delay 是使用 apply_async 的快捷方式。apply_async 支持更多的參數,它的一般形式如下:
  1. apply_async(args=(), kwargs={}, route_name=None, **options)  
apply_async 常用的參數如下:
* countdown:指定多少秒後執行任務
  1. task1.apply_async(args=(23), countdown=5)    # 5 秒后执行任务  
* eta (estimated time of arrival):指定任務被調度的具體時間,參數類型是 datetime
  1. from datetime import datetime, timedelta  
  2.   
  3. # 当前 UTC 时间再加 10 秒后执行任务  
  4. task1.multiply.apply_async(args=[37], eta=datetime.utcnow() + timedelta(seconds=10))  
* expires:任務過期時間,參數類型可以是 int,也可以是 datetime
  1. task1.multiply.apply_async(args=[37], expires=10)    # 10 秒后过期  
更多的參數列表可以在 官方文檔 中查看。

定時任務
Celery 除了可以執行 異步任務,也支持執行 週期性任務Periodic Tasks),或者說定時任務。Celery Beat 進程通過讀取配置文件的內容,週期性地將定時任務發往任務隊列。讓我們看看例子,項目結構如下:
  1. celery_demo                    # 项目根目录  
  2.     ├── celery_app             # 存放 celery 相关文件  
  3.         ├── __init__.py  
  4.         ├── celeryconfig.py    # 配置文件  
  5.         ├── task1.py           # 任务文件  
  6.         └── task2.py           # 任务文件  
__init__.py 代碼如下:
  1. # -*- coding: utf-8 -*-  
  2.   
  3. from celery import Celery  
  4.   
  5. app = Celery('demo')  
  6. app.config_from_object('celery_app.celeryconfig')  
celeryconfig.py 代碼如下:
  1. # -*- coding: utf-8 -*-  
  2.   
  3. from datetime import timedelta  
  4. from celery.schedules import crontab  
  5.   
  6. # Broker and Backend  
  7. BROKER_URL = 'redis://127.0.0.1:6379'  
  8. CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/0'  
  9.   
  10. # Timezone  
  11. CELERY_TIMEZONE='Asia/Shanghai'    # 指定时区,不指定默认为 'UTC'  
  12. # CELERY_TIMEZONE='UTC'  
  13.   
  14. import  
  15. CELERY_IMPORTS = (  
  16.     'celery_app.task1',  
  17.     'celery_app.task2'  
  18. )  
  19.   
  20. # schedules  
  21. CELERYBEAT_SCHEDULE = {  
  22.     'add-every-30-seconds': {  
  23.          'task''celery_app.task1.add',  
  24.          'schedule': timedelta(seconds=30),       # 每 30 秒执行一次  
  25.          'args': (58)                           # 任务函数参数  
  26.     },  
  27.     'multiply-at-some-time': {  
  28.         'task''celery_app.task2.multiply',  
  29.         'schedule': crontab(hour=9, minute=50),   # 每天早上 9 点 50 分执行一次  
  30.         'args': (37)                            # 任务函数参数  
  31.     }  
  32. }  
task1.py 代碼如下:
  1. import time  
  2. from celery_app import app  
  3.   
  4. @app.task  
  5. def add(x, y):  
  6.     time.sleep(2)  
  7.     return x + y  
task2.py 代碼如下:
  1. import time  
  2. from celery_app import app  
  3.   
  4. @app.task  
  5. def multiply(x, y):  
  6.     time.sleep(2)  
  7.     return x * y  
現在,讓我們啟動 Celery Worker 進程,在項目的根目錄下執行下面命令:
# celery -A celery_app worker --loglevel=info

接著,啟動 Celery Beat 進程,定時將任務發送到 Broker,在項目根目錄下執行下面命令:

之後,在Worker窗口我們可以看到,任務 task1 每30秒執行一次,而 task2 每天早上9點50分執行一次。在上面,我們用兩個命令啟動了 Worker 進程和 Beat 進程,我們也可以將它們放在一個命令中:
// -B, --beat Also run the celery beat periodic task scheduler.
// Please note that there must only be one instance of
// this service. .. note:: -B is meant to be used for
// development purposes. For production environment, you
// need to start celery beat separately.

# celery -B -A celery_app worker --loglevel=info

Celery 週期性任務也有多個配置項,可參考 官方文檔

Supplement
Celery Doc - First Steps with Celery
Getting started with Celery and RabbitMQ
If you have a job that's computationally intensive, it wouldn't be a great idea to keep a user waiting; rather, it's best to do that in the background. Task queues are great tools that allow for async processing, outside of an HTTP request. Since DIVEworks with potentially large amounts of data, we rely heavily on task queues to process it...

資料庫的好夥伴:Redis
Redis 是一個 in-memory 的 key-value database,因此常常被用在需要快取Cache一些資料的場合,可以減輕許多後端資料庫的壓力。這篇就來簡單介紹一下 Redis 提供哪些好用的東西,以及可以應用在什麼地方。

How To Install and Use Redis
Install and Configure Redis on CentOS 7
Redis is an open-source, in-memory, data structure store with optional disk writes for persistence. It can be used as a key-value database, or as a cache and message broker. Redis features built-in transactions, replication, and support for a variety of data structures such as strings, hashes, lists, sets, and others. Redis can be made highly available with Redis Sentinel and supports automatic partitioning with Redis Cluster.


沒有留言:

張貼留言

[Git 常見問題] error: The following untracked working tree files would be overwritten by merge

  Source From  Here 方案1: // x -----删除忽略文件已经对 git 来说不识别的文件 // d -----删除未被添加到 git 的路径中的文件 // f -----强制运行 #   git clean -d -fx 方案2: 今天在服务器上  gi...