「AIエージェントキャッチアップ #42 - GenAI Processors」を開催しました

ジェネラティブエージェンツの大嶋です。

「AIエージェントキャッチアップ #42 - GenAI Processors」という勉強会を開催しました。

generative-agents.connpass.com

アーカイブ動画はこちらです。

www.youtube.com

GenAI Processors

今回は、Google DeepMindが公開したAIパイプラインのPythonライブラリ「GenAI Processors」をキャッチアップしました。

github.com

developers.googleblog.com

今回のポイント

GenAI Processorsとは

GenAI Processorsは、Google DeepMindが開発したPythonライブラリで、AIパイプラインを構築するためのフレームワークです。

テキスト・画像・音声といったマルチモーダルの入力と出力を統一されたインターフェースで扱い、ストリーミング処理の連鎖をサポートしているのが特徴です。

GenAI Processorsの基本概念

GenAI Processorsには以下の2つの中核概念があります。

  • Processor
  • ProcessorPart

Processor

Processorは、ProcessorPartのストリームを入力として受け取り、ProcessorPartを出力します。

GenAI Processorsでは、たとえばLLMはProcessorの一種です。

from genai_processors.core import genai_model

p = genai_model.GenaiModel(
    api_key=API_KEY,
    model_name='gemini-2.0-flash-lite',
)

また、以下のようにProcessorを自作することもできます。

from collections.abc import AsyncIterable
from genai_processors import content_api, processor

@processor.processor_function
async def my_processor(
    content: AsyncIterable[content_api.ProcessorPart],
) -> AsyncIterable[content_api.ProcessorPart]:
  """Replaces dots with '[EoS]'."""
  async for part in content:
    yield part

この実装は、LangChainにおけるRunnableGeneratorと似ています。

ProcessorPart

Processorの入力と出力で使用するのが、ProcessorPartです。 ProcessorPartは、テキスト・画像・音声など、様々なモダリティのデータを格納できます。

ProcessorPartは、以下のようにして作成できます。

# テキスト
text_part = ProcessorPart("Hello")

# 画像
image_bytes_part = ProcessorPart(image_bytes, mimetype="image/png")

このように、異なるモダリティのデータをProcessorPartとして統一的に扱えるは、GenAI Processorsの大きな特徴です。

Processorの連結

GenAI Processorsでは、複数のProcessorを連結してパイプラインを構築できます。

+で連結

最も基本的な連結方法は、+演算子を使った連結です。

pipeline = processor1 + processor2

LangChainのLCELで|で処理を連鎖するのと似ていますね。

DeepWikiで確認したところ、LCELの|と同じく演算子オーバーロードで実装されているようでした。

並列

複数のプロセッサーを並列に実行することも可能です。

from genai_processors import processor

parallel_processor = processor.parallel_concat([
    processor_a,
    processor_b,
    processor_c
])

条件分岐

条件分岐を使ったパイプラインの構築も可能です。

from genai_processors import switch

switch_processor = (
    switch.Switch(content_api.get_substream_name)
    .case("a", processor_a)
    .case("b", processor_b)
    .default(processor.passthrough())
)

上記の例では、ProcessorPartのsubstream_nameaの場合はprocessor_abの場合はprocessor_b、それ以外の場合はpassthroughで処理します。

ループの実装

GenAI Processorsでは、Queueを使うことでループ処理も実装できます。 ループの実装はなかなか面白かったので、少し詳しく説明していきます。

参考: https://colab.research.google.com/github/google-gemini/genai-processors/blob/main/notebooks/processor_intro.ipynb

Queueを含むストリームを作成

まずは以下のように、0.2秒空けて「Hello」「World」と出力するストリームを作成します。

input_stream = streams.stream_content(
    [
        content_api.ProcessorPart("Hello"),
        content_api.ProcessorPart("World"),
    ],
    with_delay_sec=0.2,
)

Queueを用意して、input_streamとマージします。

input_queue = asyncio.Queue()
stream_loop = streams.merge(
    [input_stream, streams.dequeue(input_queue)],
    stop_on_first=True,
)

これで、input_streamまたはinput_queueから値を取り出し続ける入力ストリームができました。

ストリームを処理しながらQueueに値を追加

最後に、my_processorでそのストリームを処理します。 その際、0.09秒空けてinput_queueに値を追加します。

async for part in my_processor(stream_loop):
    print(part.text)
    await asyncio.sleep(0.09)
    await input_queue.put(content_api.ProcessorPart("new_part"))

すると、以下のように表示されます。

Hello
new_part
new_part
World
new_part
new_part

ストリームを処理しながらQueueに値を追加することで、ループ処理を実現しているということです。

通常のPythonなどのループの書き方とはだいぶ頭の使い方が違うので慣れが必要そうですが、面白い実現方法ですね。

次回のご案内

以上、今回は「GenAI Processors」をキャッチアップしました。

次回は「AIエージェントキャッチアップ #43 - BrowserGym」ということで、Webエージェント研究のためのフレームワーク「BrowserGym」がテーマです!

generative-agents.connpass.com

ご興味・お時間ある方はぜひご参加ください!

また、その次の回以降のテーマも募集しているので、気になるエージェントのOSSなどあれば教えてください!