「Difyソースコードリーディング #9 - Difyのワーカーは何をしているのか」を開催しました #もくもくDify

ジェネラティブエージェンツの大嶋です。

もくもくDifyで「Difyソースコードリーディング #9 - Difyのワーカーは何をしているのか」という勉強会を開催しました。

dify-mokumoku.connpass.com

アーカイブ動画はこちらです。

youtube.com

Difyのソースコードはこちらです。

github.com

今回も 戸塚さん と一緒に話しながらコードを読んでいきました!

今回のポイント

今回は「Difyのワーカーは何をしているのか」ということで、Celeryで動いているワーカーはどんな処理をしているのかをチェックしていきました。

api/tasks

Difyでは、タスクキューを介してワーカーに処理をさせるのに「Celery」が使用されています。

github.com

DifyにおけるCeleryのタスク処理の実装は api/tasks ディレクトリに配置されていました。

具体的には、ナレッジベースのDatasetの作成やドキュメントのIndexの作成などのタスクが実装されています。 ナレッジベースのDatasetなどの作成の際、UI上ではローディングが表示されると思いますが、そのときに実行される処理だと思われます。

ナレッジベース関係以外だと、ユーザー登録用のメールの送信や、トレースに関する処理などもCeleryのワーカーのタスクとして実装されているようでした。

api/schedule

api/schedule ディレクトリにも、Celeryのワーカーのタスクが2つ実装されていました。

clean_embedding_cache_taskとclean_unused_datasets_task、つまりEmbeddingのキャッシュを削除するタスクと使われてないDatasetを削除するタスクのようです。

これらのタスクは api/extensions/ext_celery.py の中で以下のようにスケジューリングされていました。

    imports = [
        "schedule.clean_embedding_cache_task",
        "schedule.clean_unused_datasets_task",
    ]
    day = dify_config.CELERY_BEAT_SCHEDULER_TIME
    beat_schedule = {
        "clean_embedding_cache_task": {
            "task": "schedule.clean_embedding_cache_task.clean_embedding_cache_task",
            "schedule": timedelta(days=day),
        },
        "clean_unused_datasets_task": {
            "task": "schedule.clean_unused_datasets_task.clean_unused_datasets_task",
            "schedule": timedelta(days=day),
        },
    }
    celery_app.conf.update(beat_schedule=beat_schedule, imports=imports)

この設定により、dify_config.CELERY_BEAT_SCHEDULER_TIME(デフォルトでは1日)ごとに、上記の2つのタスクが実行されるようです。

api/events

最後に、api/events ディレクトリの内容も見ていきました。

ここに実装されているのはCeleryのワーカーの処理ではなく、BlinkerというOSSを使ったpub/subのsubscriber的な処理でした。

github.com

BlinkerはFlaskにも組み込まれており、

app_was_created = signal("app-was-created")

のようにシグナルを定義して、

@app_was_created.connect
def handle(sender, **kwargs):

のようにシグナルが送られた際に実行される関数を実装すると、

app_was_created.send(app, account=account)

のようにシグナルが発行された際に @app_was_created.connect を設定した各関数が呼び出される、という挙動のようです。

上記のコードの引用元

参考

flask.palletsprojects.com

次回のご案内

以上、今回はDifyにおけるCeleryのワーカーの処理と、少しそれますがBlinkerでの処理を見ていきました。

ここまで9回のソースコードリーディングを通して、Difyのソースコードをかなり把握できました!

次回は「Dify v0.10.0のファイルアップロードを読む」がテーマです! ご興味ある方はぜひ気軽にご参加ください!

dify-mokumoku.connpass.com

また、水曜日にもDifyの活用についてのもくもく会があります。 こちらもご興味ある方はぜひ気軽にご参加ください!

dify-mokumoku.connpass.com