ジェネラティブエージェンツの大嶋です。
「AIエージェントキャッチアップ #42 - GenAI Processors」という勉強会を開催しました。
generative-agents.connpass.com
アーカイブ動画はこちらです。
GenAI Processors
今回は、Google DeepMindが公開したAIパイプラインのPythonライブラリ「GenAI Processors」をキャッチアップしました。
今回のポイント
GenAI Processorsとは
GenAI Processorsは、Google DeepMindが開発したPythonライブラリで、AIパイプラインを構築するためのフレームワークです。
テキスト・画像・音声といったマルチモーダルの入力と出力を統一されたインターフェースで扱い、ストリーミング処理の連鎖をサポートしているのが特徴です。
GenAI Processorsの基本概念
GenAI Processorsには以下の2つの中核概念があります。
- Processor
- ProcessorPart
Processor
Processorは、ProcessorPartのストリームを入力として受け取り、ProcessorPartを出力します。
GenAI Processorsでは、たとえばLLMはProcessorの一種です。
from genai_processors.core import genai_model p = genai_model.GenaiModel( api_key=API_KEY, model_name='gemini-2.0-flash-lite', )
また、以下のようにProcessorを自作することもできます。
from collections.abc import AsyncIterable from genai_processors import content_api, processor @processor.processor_function async def my_processor( content: AsyncIterable[content_api.ProcessorPart], ) -> AsyncIterable[content_api.ProcessorPart]: """Replaces dots with '[EoS]'.""" async for part in content: yield part
この実装は、LangChainにおけるRunnableGeneratorと似ています。
ProcessorPart
Processorの入力と出力で使用するのが、ProcessorPartです。 ProcessorPartは、テキスト・画像・音声など、様々なモダリティのデータを格納できます。
ProcessorPartは、以下のようにして作成できます。
# テキスト text_part = ProcessorPart("Hello") # 画像 image_bytes_part = ProcessorPart(image_bytes, mimetype="image/png")
このように、異なるモダリティのデータをProcessorPartとして統一的に扱えるは、GenAI Processorsの大きな特徴です。
Processorの連結
GenAI Processorsでは、複数のProcessorを連結してパイプラインを構築できます。
+で連結
最も基本的な連結方法は、+演算子を使った連結です。
pipeline = processor1 + processor2
LangChainのLCELで|で処理を連鎖するのと似ていますね。
DeepWikiで確認したところ、LCELの|と同じく演算子オーバーロードで実装されているようでした。
並列
複数のプロセッサーを並列に実行することも可能です。
from genai_processors import processor parallel_processor = processor.parallel_concat([ processor_a, processor_b, processor_c ])
条件分岐
条件分岐を使ったパイプラインの構築も可能です。
from genai_processors import switch switch_processor = ( switch.Switch(content_api.get_substream_name) .case("a", processor_a) .case("b", processor_b) .default(processor.passthrough()) )
上記の例では、ProcessorPartのsubstream_nameがaの場合はprocessor_a、bの場合はprocessor_b、それ以外の場合はpassthroughで処理します。
ループの実装
GenAI Processorsでは、Queueを使うことでループ処理も実装できます。 ループの実装はなかなか面白かったので、少し詳しく説明していきます。
Queueを含むストリームを作成
まずは以下のように、0.2秒空けて「Hello」「World」と出力するストリームを作成します。
input_stream = streams.stream_content(
[
content_api.ProcessorPart("Hello"),
content_api.ProcessorPart("World"),
],
with_delay_sec=0.2,
)
Queueを用意して、input_streamとマージします。
input_queue = asyncio.Queue()
stream_loop = streams.merge(
[input_stream, streams.dequeue(input_queue)],
stop_on_first=True,
)
これで、input_streamまたはinput_queueから値を取り出し続ける入力ストリームができました。
ストリームを処理しながらQueueに値を追加
最後に、my_processorでそのストリームを処理します。 その際、0.09秒空けてinput_queueに値を追加します。
async for part in my_processor(stream_loop): print(part.text) await asyncio.sleep(0.09) await input_queue.put(content_api.ProcessorPart("new_part"))
すると、以下のように表示されます。
Hello new_part new_part World new_part new_part

ストリームを処理しながらQueueに値を追加することで、ループ処理を実現しているということです。
通常のPythonなどのループの書き方とはだいぶ頭の使い方が違うので慣れが必要そうですが、面白い実現方法ですね。
次回のご案内
以上、今回は「GenAI Processors」をキャッチアップしました。
次回は「AIエージェントキャッチアップ #43 - BrowserGym」ということで、Webエージェント研究のためのフレームワーク「BrowserGym」がテーマです!
generative-agents.connpass.com
ご興味・お時間ある方はぜひご参加ください!
また、その次の回以降のテーマも募集しているので、気になるエージェントのOSSなどあれば教えてください!