grasys blog

GenAI Processorsを触ってみる 3/4 ~ カスタムProcessorについて ~

はじめまして!エンジニアのUemaです。

今回は、AIパイプラインをシンプルに書けるように設計された、Google DeepMind の新しいオープンソース Python ライブラリである「GenAI Processors」をGet Startedやexampleを参考にしながら触ってみたいと思います。

シンプルに書けるとはいえ、ライブラリの構成要素やコンセプトなどを理解しないと混乱するため、それらもまとめてみました。

4部構成となっていて、今回はPart3となっています。

Part3は、Part2で少し出ていた processor.Processor を継承して作成するカスタムProcessorについて、まとめてみました。

シリーズの他の記事はこちらから見られます。
『GenAI Processorsを触ってみる』シリーズ一覧

GenAI Processorsとは

Part 1の方でも書きましたが、AIパイプラインを構築でき、効率的な並列コンテンツ処理を可能にする軽量の Python ライブラリです。

テキスト、音声、画像といった複数のデータ形式(マルチモーダル)を組み合わせ、リアルタイムで処理するような複雑なアプリケーションの開発を効率化することに特化しています。

カスタムProcessor

Create Your Own Processor より

Processorのタイプを決める

Part2でもありましたが、GenAI Processorsには下記2つのProcessorのタイプがあります。

  • ProcessorPart のストリームを処理するProcessor
  • 単一のProcessorPartを処理するPartProcessor

それぞれ、call メソッドを実装するのですが、パラメータが少し異なっています。Processor は、ProcessorPart のストリームを処理するため、AsyncIterableであるProcessorPartを引数として受け取ります。PartProcessorは、単一のProcessorPartを処理するため、ProcessorPartを引数として受け取ります。

# Processorのcallメソッド
async def call(
     self, content: AsyncIterable[ProcessorPart]
) -> AsyncIterable[ProcessorPartTypes]:
  ...


# PartProcessorのcallメソッド
async def call(
     self, part: ProcessorPart
) -> AsyncIterable[ProcessorPartTypes]:
  ...

Processor よりも PartProcessor が短い時間で処理されます。

これは、Processor がストリームを call メソッド内で順次処理をするのに対し、PartProcessor は各ProcessorPartが並列で処理されるためです。

そのため、PartProcessor で実装するのがおすすめなようです。また、+ 演算子と// 演算子とで直列並列を組み合わせることができるためでもあります。

ただし、処理の順序が重要な場合は、Processor でする必要があります。

Processorをクラス、関数として実装する

@processor.processor_function デコレータや @processor.part_processor_function デコレータで関数をラップすることにより関数でProcessorを定義できます。

Processorにパラメータが存在する場合は、クラスとして定義するほうが良いです。processor.Processor または processor.PartProcessor を継承して、call メソッドを実装します。

import asyncio
from collections.abc import AsyncIterable

from genai_processors import content_api, processor, streams


class PreambleProcessor(processor.Processor):
    """コンテンツに前文を追加します。"""

    def __init__(self, preamble: content_api.ProcessorContent):
        self._preamble = preamble

    async def call(
        self,
        content: AsyncIterable[content_api.ProcessorPart],
    ) -> AsyncIterable[content_api.ProcessorPartTypes]:
        for part in self._preamble:
            yield part
        async for part in content:
            yield part


async def main():
    p = PreambleProcessor(
        [
            "取扱説明書:RP-60は回転式のレトロな電話機です。番号をダイヤルするには、",
            "目的の数字の反対側にある穴に指を入れ、ディスクを時計回りに回します...",
        ],
    )
    input_stream = streams.stream_content(["ボタンはどこですか?"])

    async for part in p(input_stream):
        print(part)


if __name__ == "__main__":
    asyncio.run(main())

状態の管理について

内部状態を管理する場合には、call メソッド内で管理するのがベストプラクティスのようです。同じProcessorを異なる入力ストリームで呼び出しても、状態変数に副作用が生じることがなくなります。

async def call(
    self,
    content: AsyncIterable[content_api.ProcessorPart],
) -> AsyncIterable[content_api.ProcessorPartTypes]:
    # 状態変数を定義する
    state = ...

    for part in self._preamble:
        # 状態変数を更新する
        state.update()

クラスレベルで状態変数を作成する必要がある場合は、call メソッドで状態変数に2回アクセスされたときに例外を発生させるのが推奨されるプラクティスです。Processorが一度に1つの入力ストリームでしか呼び出されないことが保証され、共有状態による予期せぬ副作用を防ぐことができます。

class MyProcessor(processor.Processor):
    def __init__(self):
        self._queue: asyncio.Queue | None = None

    async def call(
        self,
        content: AsyncIterable[content_api.ProcessorPart],
    ) -> AsyncIterable[content_api.ProcessorPartTypes]:
        if self._queue is not None:
            raise ValueError("My Processorは一度しか呼び出せません。")
        self._queue = asyncio.Queue()
        try:
            async for part in content:
                ...
        finally:
            self._queue = None

Processor内でタスクを作成

Processor内で、並列に処理するためasyncioタスクを作成する場合があります。GenAI Processorでは、タスクを管理するための processor.create_task メソッドを提供しています。タスクのキャンセルと例外を適切に管理します。

import asyncio
import time
from collections.abc import AsyncIterable

from genai_processors import content_api, processor, streams


class TeaProcessor(processor.Processor):
    def _prepare_tea(self):
        # 長時間実行される操作 - 同期モード。
        print("紅茶を入れています...")
        time.sleep(1)

    async def _wait_for_tea_to_brew(self):
        # 長時間実行される操作を持つ同期メソッド
        await asyncio.to_thread(self._prepare_tea)
        print("Acme社の超特急紅茶の準備ができました!")

    async def call(
        self,
        content: AsyncIterable[content_api.ProcessorPart],
    ) -> AsyncIterable[content_api.ProcessorPartTypes]:
        yield "リクエストを処理している間、紅茶でもどうぞ"
        tea_task = processor.create_task(self._wait_for_tea_to_brew())
        async for part in content:
            if content_api.is_text(part.mimetype):
                print(part.text)
        await tea_task


async def main():
    input_stream = streams.stream_content(
        ["実は ", " コーヒーが欲しかったのですが。"],
        with_delay_sec=0.6,
    )
    async for _ in TeaProcessor()(input_stream):
        pass


if __name__ == "__main__":
    asyncio.run(main())

Processorを組み合わせる

これまでやってきたProcessorの連結は、Processorの call メソッド内でも + 演算子使うことができます。また、PartProcessorでは、 // 演算子をサポートしており、並列で実行できます。

class UpperGenAI(processor.Processor):
    def __init__(self):
        self._preamble = preamble.Preamble(content=["次の定義は何ですか: "])
        self._model = genai_model.GenaiModel(
            # ここにAPIキーを使用します
            api_key=userdata.get("GOOGLE_API_KEY"),
            model_name="gemini-2.0-flash",
            generate_content_config=genai_types.GenerateContentConfig(
                temperature=0.7,
            ),
        )
        self._post_processing = upper_case_processor

    async def call(
        self,
        content: AsyncIterable[content_api.ProcessorPart],
    ) -> AsyncIterable[content_api.ProcessorPartTypes]:
        p = self._preamble + self._model + self._post_processing  # Processorの連結
        async for part in p(content):
            yield part

デバッグ

debug モジュールを使用することで、ProcessorPart の内容を確認することができます。Processor後に、debug.print_stream()debug.log_stream() を連結することで使用できます。

import asyncio
from collections.abc import AsyncIterable

from genai_processors import content_api, debug, processor, streams
from genai_processors.core import genai_model, preamble
from google.genai import types


@processor.processor_function
async def upper_case_processor(
    content: AsyncIterable[content_api.ProcessorPart],
) -> AsyncIterable[content_api.ProcessorPartTypes]:
    async for part in content:
        if content_api.is_text(part.mimetype):
            yield part.text.upper()
        else:
            yield part
        # より計算量の多いタスクをシミュレートするために少しスリープ
        await asyncio.sleep(0.001)


class UpperGenAIWithLogs(processor.Processor):
    def __init__(self):
        self._preamble = preamble.Preamble(
            content=["In two sentences, what is the definition of: "],
        )
        self._model = genai_model.GenaiModel(
            api_key=API_KEY,
            model_name="gemini-2.0-flash",
            generate_content_config=types.GenerateContentConfig(
                temperature=0.7,
            ),
        )
        self._post_processing = upper_case_processor

    async def call(
        self,
        content: AsyncIterable[content_api.ProcessorPart],
    ) -> AsyncIterable[content_api.ProcessorPartTypes]:
        p = (
            self._preamble
            + debug.print_stream("Before Model")
            + self._model
            + debug.print_stream("After Model")
            + self._post_processing
        )
        async for part in p(content):
            yield part


async def main():
    input_stream = streams.stream_content(["processor"])
    async for part in UpperGenAIWithLogs()(input_stream):
        print(part.text)


if __name__ == "__main__":
    asyncio.run(main())

テスト

Processorのテストは、標準ライブラリのunittest.IsolatedAsyncioTestCase や pytestの pytest-asyncio を使用し、標準のasync for で結果を収集することで簡単に行うことができます。

また、processor.apply_syncメソッドを使用して同期モードでテストを行うこともできます。

import asyncio
from collections.abc import AsyncIterable

import pytest
from genai_processors import content_api, processor, streams


@processor.processor_function
async def upper_case_processor(
    content: AsyncIterable[content_api.ProcessorPart],
) -> AsyncIterable[content_api.ProcessorPartTypes]:
    async for part in content:
        if content_api.is_text(part.mimetype):
            yield part.text.upper()
        else:
            yield part
        # より計算量の多いタスクをシミュレートするために少しスリープ
        await asyncio.sleep(0.001)


@pytest.mark.asyncio
async def test_to_upper_case_ok():
    """非同期"""
    expected = "HELLO WORLD!"
    input_stream = streams.stream_content(["hello ", "world!"])
    actual = content_api.ProcessorContent()
    async for part in upper_case_processor(input_stream):
        actual += part
    # ステータスやデバッグ文をフィルタリングするために、デフォルトのサブストリームからのみ
    # プロセッサの出力を収集します。
    assert actual.as_text(substream_name="") == expected


def test_sync_to_upper_case_ok():
    """同期"""
    expected = "HELLO WORLD!"
    actual = content_api.ProcessorContent(
        processor.apply_sync(upper_case_processor, ["hello ", "world!"]),
    )
    assert actual.as_text(substream_name="") == expected

終わりに

カスタムProcessorを作成することができました。

関数を使ってのProcessorやクラスでのProcessorなどいろいろな作成方法があるので、その時にあったカスタムProcessorを作成するとよいかと思います。

また、デバッグやテストの方法なども公式が用意してあるColab

次回は、今までやってきたことを合わせて、githubにあるExampleの1つを試してみたいと思います。

参考

こちらのリンクからシリーズ記事を一覧で見られます。よろしければこちらもご覧ください!
『GenAI Processorsを触ってみる』シリーズ一覧


採用情報
お問い合わせ