95:Celeryのタスクにはプリミティブなデータを渡そう

Celery のようなジョブキューシステムを利用するとき、ジョブに渡すデータが大きいと、 思わぬ不具合に遭遇します。ここでは、なるべく不具合になりにくいデータの渡し方についてご紹介します。

具体的な失敗例

下記のコードはDjangoのProductItemというモデルのデータを オブジェクトそのままにCeleryのタスクに渡しているコードです。

# Celeryのタスク
@shared_task
def update_items_task(items, new_attr):
    for item in items:
       if item.attr != new_attr:
          item.attr = new_attr
          item.save()


# タスクの呼び出し元
def some_process(product_item_ids, new_attr):
    target_items = ProductItem.objects.filter(id__in=product_item_ids)
    update_items_task.delay(target_items, new_attr)

コードとしてはシンプルですが、DjangoからCeleryへの通信コストという点では、 複雑なデータ構造を持つPythonのオブジェクトはあまり良くありません。

cover

(中略)詳細は書籍 自走プログラマー をご参照ください

ベストプラクティス

Celeryのような専用のデーモンを立ち上げて処理するようなシステムにデータを送るときは、なるべくプリミティブ(原始的)なデータにしましょう。 たとえば intstr などのシンプルな値です。受け取った側ではでプリミティブなデータから、本当に必要なデータを取り出して利用しましょう。

@shared_task
def update_items_task(item_ids, new_attr):
    for item in ProductItem.objects.filter(id__in=item_ids): # <- 受け取ったIDから必要なデータを取得する
       if item.attr != new_attr:
          item.attr = new_attr
          item.save()

def some_process(product_item_ids, new_attr):
    target_items = ProductItem.objects.filter(id__in=product_item_ids)
    update_items_task.delay([t.id for t in target_items], new_attr) # <- id(int)のリストだけを渡す

ここでは id のリストだけをCeleryに渡し、受け取ったタスク側で id を元に最新のモデル情報を取得しています。 こうすることで、送信するデータ量を抑えつつ、常に最新の状態でタスクを処理できます。