ジェネラティブエージェンツの大嶋です。
もくもくDifyで「Difyソースコードリーディング #9 - Difyのワーカーは何をしているのか」という勉強会を開催しました。
アーカイブ動画はこちらです。
Difyのソースコードはこちらです。
今回も 戸塚さん と一緒に話しながらコードを読んでいきました!
今回のポイント
今回は「Difyのワーカーは何をしているのか」ということで、Celeryで動いているワーカーはどんな処理をしているのかをチェックしていきました。
api/tasks
Difyでは、タスクキューを介してワーカーに処理をさせるのに「Celery」が使用されています。
DifyにおけるCeleryのタスク処理の実装は api/tasks ディレクトリに配置されていました。
具体的には、ナレッジベースのDatasetの作成やドキュメントのIndexの作成などのタスクが実装されています。 ナレッジベースのDatasetなどの作成の際、UI上ではローディングが表示されると思いますが、そのときに実行される処理だと思われます。
ナレッジベース関係以外だと、ユーザー登録用のメールの送信や、トレースに関する処理などもCeleryのワーカーのタスクとして実装されているようでした。
api/schedule
api/schedule ディレクトリにも、Celeryのワーカーのタスクが2つ実装されていました。
clean_embedding_cache_taskとclean_unused_datasets_task、つまりEmbeddingのキャッシュを削除するタスクと使われてないDatasetを削除するタスクのようです。
これらのタスクは api/extensions/ext_celery.py の中で以下のようにスケジューリングされていました。
imports = [ "schedule.clean_embedding_cache_task", "schedule.clean_unused_datasets_task", ] day = dify_config.CELERY_BEAT_SCHEDULER_TIME beat_schedule = { "clean_embedding_cache_task": { "task": "schedule.clean_embedding_cache_task.clean_embedding_cache_task", "schedule": timedelta(days=day), }, "clean_unused_datasets_task": { "task": "schedule.clean_unused_datasets_task.clean_unused_datasets_task", "schedule": timedelta(days=day), }, } celery_app.conf.update(beat_schedule=beat_schedule, imports=imports)
この設定により、dify_config.CELERY_BEAT_SCHEDULER_TIME(デフォルトでは1日)ごとに、上記の2つのタスクが実行されるようです。
api/events
最後に、api/events ディレクトリの内容も見ていきました。
ここに実装されているのはCeleryのワーカーの処理ではなく、BlinkerというOSSを使ったpub/subのsubscriber的な処理でした。
BlinkerはFlaskにも組み込まれており、
app_was_created = signal("app-was-created")
のようにシグナルを定義して、
@app_was_created.connect def handle(sender, **kwargs):
のようにシグナルが送られた際に実行される関数を実装すると、
app_was_created.send(app, account=account)
のようにシグナルが発行された際に @app_was_created.connect
を設定した各関数が呼び出される、という挙動のようです。
上記のコードの引用元
- https://github.com/langgenius/dify/blob/b674c598f98b9dde18e418a9fa5b8253c6bc4062/api/events/app_event.py#L4
- https://github.com/langgenius/dify/blob/b674c598f98b9dde18e418a9fa5b8253c6bc4062/api/events/event_handlers/create_site_record_when_app_created.py#L6
- https://github.com/langgenius/dify/blob/b674c598f98b9dde18e418a9fa5b8253c6bc4062/api/services/app_dsl_service.py#L421
参考
次回のご案内
以上、今回はDifyにおけるCeleryのワーカーの処理と、少しそれますがBlinkerでの処理を見ていきました。
ここまで9回のソースコードリーディングを通して、Difyのソースコードをかなり把握できました!
次回は「Dify v0.10.0のファイルアップロードを読む」がテーマです! ご興味ある方はぜひ気軽にご参加ください!
また、水曜日にもDifyの活用についてのもくもく会があります。 こちらもご興味ある方はぜひ気軽にご参加ください!